package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.base.Function;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.HelixPropertyListener;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.HLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.core.query.utils.Pair;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.class */
public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkChildListener, IZkDataListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSegmentManager.class);
    private static final String TABLE_CONFIG = "/CONFIGS/TABLE";
    private static final String SEGMENTS_PATH = "/SEGMENTS";
    private static final String REALTIME_SEGMENT_PROPERTY_STORE_PATH_PATTERN = ".*/SEGMENTS/.*_REALTIME|.*/SEGMENTS/.*_REALTIME/.*";
    private static final String REALTIME_TABLE_CONFIG_PROPERTY_STORE_PATH_PATTERN = ".*/TABLE/.*REALTIME";
    private static final String CONTROLLER_LEADER_CHANGE = "CONTROLLER LEADER CHANGE";
    private final String _propertyStorePath;
    private final String _tableConfigPath;
    private final PinotHelixResourceManager _pinotHelixResourceManager;
    private ZkClient _zkClient;
    private ControllerMetrics _controllerMetrics;
    private final LeadControllerManager _leadControllerManager;

    public PinotRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager) {
        this._pinotHelixResourceManager = pinotHelixResourceManager;
        this._leadControllerManager = leadControllerManager;
        this._propertyStorePath = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, this._pinotHelixResourceManager.getHelixClusterName(), new String[0]);
        this._tableConfigPath = this._propertyStorePath + TABLE_CONFIG;
    }

    public void start(ControllerMetrics controllerMetrics) {
        this._controllerMetrics = controllerMetrics;
        LOGGER.info("Starting realtime segments manager, adding a listener on the property store table configs path.");
        this._zkClient = new ZkClient(this._pinotHelixResourceManager.getHelixZkURL(), 30000, 60000);
        this._zkClient.setZkSerializer(new ZNRecordSerializer());
        this._zkClient.waitUntilConnected(60L, TimeUnit.SECONDS);
        this._zkClient.subscribeChildChanges(this._tableConfigPath, this);
        this._zkClient.subscribeDataChanges(this._tableConfigPath, this);
        processPropertyStoreChange(this._tableConfigPath);
    }

    public void stop() {
        LOGGER.info("Stopping realtime segments manager, stopping property store.");
        this._pinotHelixResourceManager.getPropertyStore().stop();
    }

    private synchronized void assignRealtimeSegmentsToServerInstancesIfNecessary() {
        IdealState idealState;
        HashMap hashMap = new HashMap();
        for (String str : this._pinotHelixResourceManager.getAllRealtimeTables()) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(str);
            if (tableConfig != null && this._leadControllerManager.isLeaderForTable(str)) {
                if (new StreamConfig(str, IngestionConfigUtils.getStreamConfigMap(tableConfig)).hasHighLevelConsumerType()) {
                    hashMap.put(str, this._pinotHelixResourceManager.getHelixAdmin().getResourceIdealState(this._pinotHelixResourceManager.getHelixClusterName(), str));
                } else {
                    LOGGER.debug("Not considering table {} for realtime segment assignment", str);
                }
            }
        }
        ArrayList<Pair> arrayList = new ArrayList();
        for (String str2 : hashMap.keySet()) {
            try {
                idealState = (IdealState) hashMap.get(str2);
            } catch (Exception e) {
                LOGGER.warn("Caught exception while processing resource {}, skipping.", str2, e);
                this._controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_ERROR, 1L);
            }
            if (idealState.getPartitionSet().size() == 0) {
                ArrayList<String> arrayList2 = new ArrayList();
                try {
                    arrayList2.addAll(this._pinotHelixResourceManager.getServerInstancesForTable(str2, TableType.REALTIME));
                } catch (Exception e2) {
                    LOGGER.error("Caught exception while fetching instances for resource {}", str2, e2);
                    this._controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_ERROR, 1L);
                }
                for (String str3 : arrayList2) {
                    InstanceZKMetadata instanceZKMetadata = this._pinotHelixResourceManager.getInstanceZKMetadata(str3);
                    if (instanceZKMetadata == null) {
                        LOGGER.warn("Instance {} has no associated instance metadata in ZK, ignoring for segment assignment.", str3);
                        this._controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_ERROR, 1L);
                    } else {
                        String groupId = instanceZKMetadata.getGroupId(str2);
                        String partition = instanceZKMetadata.getPartition(str2);
                        if (groupId == null || groupId.isEmpty() || partition == null || partition.isEmpty()) {
                            LOGGER.warn("Instance {} has invalid groupId ({}) and/or partitionId ({}) for resource {}, ignoring for segment assignment.", new Object[]{str3, groupId, partition, str2});
                            this._controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_ERROR, 1L);
                        } else {
                            arrayList.add(new Pair(new HLCSegmentName(groupId, partition, String.valueOf(System.currentTimeMillis())).getSegmentName(), str3));
                        }
                    }
                }
            } else {
                HashSet<String> hashSet = new HashSet();
                try {
                    hashSet.addAll(this._pinotHelixResourceManager.getServerInstancesForTable(str2, TableType.REALTIME));
                } catch (Exception e3) {
                    LOGGER.error("Caught exception while fetching instances for resource {}", str2, e3);
                    this._controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_ERROR, 1L);
                }
                for (String str4 : idealState.getPartitionSet()) {
                    if (SegmentName.isHighLevelConsumerSegmentName(str4)) {
                        RealtimeSegmentZKMetadata realtimeSegmentZKMetadata = ZKMetadataProvider.getRealtimeSegmentZKMetadata(this._pinotHelixResourceManager.getPropertyStore(), new HLCSegmentName(str4).getTableName(), str4);
                        if (realtimeSegmentZKMetadata != null) {
                            if (realtimeSegmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
                                hashSet.removeAll(idealState.getInstanceSet(str4));
                            }
                        }
                    }
                }
                for (String str5 : hashSet) {
                    InstanceZKMetadata instanceZKMetadata2 = this._pinotHelixResourceManager.getInstanceZKMetadata(str5);
                    arrayList.add(new Pair(new HLCSegmentName(instanceZKMetadata2.getGroupId(str2), instanceZKMetadata2.getPartition(str2), String.valueOf(System.currentTimeMillis())).getSegmentName(), str5));
                }
            }
        }
        LOGGER.info("Computed list of new segments to add : " + Arrays.toString(arrayList.toArray()));
        for (Pair pair : arrayList) {
            final String str6 = (String) pair.getFirst();
            final String str7 = (String) pair.getSecond();
            try {
                String tableName = new HLCSegmentName(str6).getTableName();
                if (!((IdealState) hashMap.get(tableName)).getPartitionSet().contains(str6)) {
                    RealtimeSegmentZKMetadata realtimeSegmentZKMetadata2 = new RealtimeSegmentZKMetadata();
                    realtimeSegmentZKMetadata2.setTableName(tableName);
                    realtimeSegmentZKMetadata2.setSegmentType(CommonConstants.Segment.SegmentType.REALTIME);
                    realtimeSegmentZKMetadata2.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
                    realtimeSegmentZKMetadata2.setSegmentName(str6);
                    ZKMetadataProvider.setRealtimeSegmentZKMetadata(this._pinotHelixResourceManager.getPropertyStore(), tableName, realtimeSegmentZKMetadata2);
                    HelixHelper.updateIdealState(this._pinotHelixResourceManager.getHelixZkManager(), tableName, new Function<IdealState, IdealState>() { // from class: org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager.1
                        public IdealState apply(IdealState idealState2) {
                            return PinotTableIdealStateBuilder.addNewRealtimeSegmentToIdealState(str6, idealState2, str7);
                        }
                    }, RetryPolicies.exponentialBackoffRetryPolicy(5, 500L, 2.0d));
                }
            } catch (Exception e4) {
                LOGGER.warn("Caught exception while processing segment {} for instance {}, skipping.", new Object[]{str6, str7, e4});
                this._controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_ERROR, 1L);
            }
        }
    }

    public synchronized void onDataChange(String str) {
        LOGGER.info("PinotRealtimeSegmentManager.onDataChange: {}", str);
        processPropertyStoreChange(str);
    }

    public synchronized void onDataCreate(String str) {
        LOGGER.info("PinotRealtimeSegmentManager.onDataCreate: {}", str);
        processPropertyStoreChange(str);
    }

    public synchronized void onDataDelete(String str) {
        LOGGER.info("PinotRealtimeSegmentManager.onDataDelete: {}", str);
        processPropertyStoreChange(str);
    }

    private void processPropertyStoreChange(String str) {
        try {
            LOGGER.info("Processing change notification for path: {}", str);
            refreshWatchers(str);
            if (str.matches(REALTIME_SEGMENT_PROPERTY_STORE_PATH_PATTERN) || str.matches(REALTIME_TABLE_CONFIG_PROPERTY_STORE_PATH_PATTERN) || str.equals(CONTROLLER_LEADER_CHANGE)) {
                assignRealtimeSegmentsToServerInstancesIfNecessary();
            }
        } catch (Exception e) {
            LOGGER.error("Caught exception while processing change for path {}", str, e);
            Utils.rethrowException(e);
        }
    }

    private void refreshWatchers(String str) {
        LOGGER.info("Received change notification for path: {}", str);
        List<ZNRecord> children = this._pinotHelixResourceManager.getPropertyStore().getChildren(TABLE_CONFIG, new ArrayList(), 0);
        if (children == null) {
            return;
        }
        for (ZNRecord zNRecord : children) {
            if (zNRecord != null) {
                try {
                    if (TableNameBuilder.getTableTypeFromTableName(zNRecord.getId()) == TableType.REALTIME) {
                        TableConfig fromZNRecord = TableConfigUtils.fromZNRecord(zNRecord);
                        if (new StreamConfig(fromZNRecord.getTableName(), IngestionConfigUtils.getStreamConfigMap(fromZNRecord)).hasHighLevelConsumerType()) {
                            String tableName = fromZNRecord.getTableName();
                            String str2 = this._propertyStorePath + SEGMENTS_PATH + "/" + tableName;
                            LOGGER.info("Setting data/child changes watch for real-time table '{}'", tableName);
                            this._zkClient.subscribeDataChanges(str2, this);
                            this._zkClient.subscribeChildChanges(str2, this);
                            List<String> childNames = this._pinotHelixResourceManager.getPropertyStore().getChildNames("/SEGMENTS/" + tableName, 0);
                            if (childNames != null && !childNames.isEmpty()) {
                                for (String str3 : childNames) {
                                    if (SegmentName.isHighLevelConsumerSegmentName(str3)) {
                                        String str4 = str2 + "/" + str3;
                                        RealtimeSegmentZKMetadata realtimeSegmentZKMetadata = ZKMetadataProvider.getRealtimeSegmentZKMetadata(this._pinotHelixResourceManager.getPropertyStore(), fromZNRecord.getTableName(), str3);
                                        if (realtimeSegmentZKMetadata != null) {
                                            if (realtimeSegmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
                                                LOGGER.info("Setting data change watch for real-time segment currently being consumed: {}", str4);
                                                this._zkClient.subscribeDataChanges(str4, this);
                                            } else {
                                                this._zkClient.unsubscribeDataChanges(str4, this);
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                } catch (Exception e) {
                    LOGGER.error("Caught exception while processing ZNRecord id: {}. Skipping node to continue setting watches", zNRecord.getId(), e);
                }
            }
        }
    }

    public void handleChildChange(String str, List<String> list) throws Exception {
        LOGGER.info("PinotRealtimeSegmentManager.handleChildChange: {}", str);
        processPropertyStoreChange(str);
        if (list != null) {
            for (String str2 : list) {
                if (str2.endsWith("_REALTIME")) {
                    LOGGER.info("PinotRealtimeSegmentManager.handleChildChange with table: {}", str + "/" + str2);
                    processPropertyStoreChange(str + "/" + str2);
                }
            }
        }
    }

    public void handleDataChange(String str, Object obj) throws Exception {
        LOGGER.info("PinotRealtimeSegmentManager.handleDataChange: {}", str);
        processPropertyStoreChange(str);
    }

    public void handleDataDeleted(String str) throws Exception {
        LOGGER.info("PinotRealtimeSegmentManager.handleDataDeleted: {}", str);
        processPropertyStoreChange(str);
    }
}
