package org.apache.samza.zk;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.I0Itec.zkclient.IZkStateListener;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.runtime.ProcessorIdGenerator;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.util.ClassLoaderHelper;
import org.apache.samza.util.MetricsReporterLoader;
import org.apache.samza.zk.ZkBarrierForVersionUpgrade;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/zk/ZkJobCoordinator.class */
public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
    private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinator.class);
    private static final int METADATA_CACHE_TTL_MS = 5000;
    private static final int NUM_VERSIONS_TO_LEAVE = 10;
    private static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange";
    private static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
    private static final String ON_ZK_CLEANUP = "OnCleanUp";
    private final ZkUtils zkUtils;
    private final String processorId;
    private final ZkController zkController;
    private final Config config;
    private final ZkBarrierForVersionUpgrade barrier;
    private final ZkJobCoordinatorMetrics metrics;
    private final Map<String, MetricsReporter> reporters;
    private ScheduleAfterDebounceTime debounceTimer;
    private JobModel newJobModel;
    private int debounceTimeMs;
    private StreamMetadataCache streamMetadataCache = null;
    private JobCoordinatorListener coordinatorListener = null;
    private boolean hasCreatedChangeLogStreams = false;
    private String cachedJobModelVersion = null;
    private Map<TaskName, Integer> changeLogPartitionMap = new HashMap();

    /* loaded from: input_file:org/apache/samza/zk/ZkJobCoordinator$LeaderElectorListenerImpl.class */
    class LeaderElectorListenerImpl implements LeaderElectorListener {
        LeaderElectorListenerImpl() {
        }

        @Override // org.apache.samza.coordinator.LeaderElectorListener
        public void onBecomingLeader() {
            ZkJobCoordinator.LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
            ZkJobCoordinator.this.metrics.isLeader.set(true);
            ZkJobCoordinator.this.zkController.subscribeToProcessorChange();
            ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime(ZkJobCoordinator.ON_PROCESSOR_CHANGE, ZkJobCoordinator.this.debounceTimeMs, () -> {
                ZkJobCoordinator.this.doOnProcessorChange(new ArrayList());
            });
        }
    }

    /* loaded from: input_file:org/apache/samza/zk/ZkJobCoordinator$ZkBarrierListenerImpl.class */
    class ZkBarrierListenerImpl implements ZkBarrierListener {
        private final String barrierAction = "BarrierAction";
        private long startTime = 0;

        ZkBarrierListenerImpl() {
        }

        @Override // org.apache.samza.zk.ZkBarrierListener
        public void onBarrierCreated(String str) {
            this.startTime = System.nanoTime();
            ZkJobCoordinator.this.metrics.barrierCreation.inc();
            ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime("BarrierAction", new ZkConfig(ZkJobCoordinator.this.config).getZkBarrierTimeoutMs(), () -> {
                ZkJobCoordinator.this.barrier.expire(str);
            });
        }

        @Override // org.apache.samza.zk.ZkBarrierListener
        public void onBarrierStateChanged(String str, ZkBarrierForVersionUpgrade.State state) {
            ZkJobCoordinator.LOG.info("JobModel version " + str + " obtained consensus successfully!");
            ZkJobCoordinator.this.metrics.barrierStateChange.inc();
            ZkJobCoordinator.this.metrics.singleBarrierRebalancingTime.update(System.nanoTime() - this.startTime);
            if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
                ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime("BarrierAction", 0L, () -> {
                    ZkJobCoordinator.this.onNewJobModelConfirmed(str);
                });
                return;
            }
            if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
                ZkJobCoordinator.LOG.warn("Barrier for version " + str + " timed out.");
                if (ZkJobCoordinator.this.zkController.isLeader()) {
                    ZkJobCoordinator.LOG.info("Leader will schedule a new job model generation");
                    ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime(ZkJobCoordinator.ON_PROCESSOR_CHANGE, ZkJobCoordinator.this.debounceTimeMs, () -> {
                        ZkJobCoordinator.this.doOnProcessorChange(new ArrayList());
                    });
                }
            }
        }

        @Override // org.apache.samza.zk.ZkBarrierListener
        public void onBarrierError(String str, Throwable th) {
            ZkJobCoordinator.LOG.error("Encountered error while attaining consensus on JobModel version " + str);
            ZkJobCoordinator.this.metrics.barrierError.inc();
            ZkJobCoordinator.this.stop();
        }
    }

    /* loaded from: input_file:org/apache/samza/zk/ZkJobCoordinator$ZkSessionStateChangedListener.class */
    class ZkSessionStateChangedListener implements IZkStateListener {
        private static final String ZK_SESSION_ERROR = "ZK_SESSION_ERROR";

        ZkSessionStateChangedListener() {
        }

        public void handleStateChanged(Watcher.Event.KeeperState keeperState) throws Exception {
            if (keeperState == Watcher.Event.KeeperState.Expired) {
                ZkJobCoordinator.LOG.warn("Got session expired event for processor=" + ZkJobCoordinator.this.processorId);
                ZkJobCoordinator.this.zkUtils.incGeneration();
                if (ZkJobCoordinator.this.coordinatorListener != null) {
                    ZkJobCoordinator.this.coordinatorListener.onJobModelExpired();
                }
                ZkJobCoordinator.this.zkUtils.unregister();
            }
        }

        public void handleNewSession() throws Exception {
            ZkJobCoordinator.LOG.info("Got new session created event for processor=" + ZkJobCoordinator.this.processorId);
            ZkJobCoordinator.LOG.info("register zk controller for the new session");
            ZkJobCoordinator.this.zkController.register();
        }

        public void handleSessionEstablishmentError(Throwable th) throws Exception {
            ZkJobCoordinator.LOG.info("handleSessionEstablishmentError received for processor=" + ZkJobCoordinator.this.processorId, th);
            ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0L, () -> {
                ZkJobCoordinator.this.stop();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
        this.debounceTimer = null;
        this.config = config;
        this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
        this.processorId = createProcessorId(config);
        this.zkUtils = zkUtils;
        zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
        ZkLeaderElector zkLeaderElector = new ZkLeaderElector(this.processorId, zkUtils);
        zkLeaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
        this.zkController = new ZkControllerImpl(this.processorId, zkUtils, this, zkLeaderElector);
        this.barrier = new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl());
        this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
        this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), this.processorId);
        this.debounceTimer = new ScheduleAfterDebounceTime();
        this.debounceTimer.setScheduledTaskCallback(th -> {
            LOG.error("Received exception from in JobCoordinator Processing!", th);
            stop();
        });
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public void start() {
        startMetrics();
        this.streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, this.config);
        this.zkController.register();
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public synchronized void stop() {
        if (this.coordinatorListener != null) {
            this.coordinatorListener.onJobModelExpired();
        }
        this.metrics.isLeader.set(false);
        this.debounceTimer.stopScheduler();
        this.zkController.stop();
        shutdownMetrics();
        if (this.coordinatorListener != null) {
            this.coordinatorListener.onCoordinatorStop();
        }
    }

    private void startMetrics() {
        for (MetricsReporter metricsReporter : this.reporters.values()) {
            metricsReporter.register("job-coordinator-" + this.processorId, this.metrics.getMetricsRegistry());
            metricsReporter.start();
        }
    }

    private void shutdownMetrics() {
        Iterator<MetricsReporter> it = this.reporters.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public void setListener(JobCoordinatorListener jobCoordinatorListener) {
        this.coordinatorListener = jobCoordinatorListener;
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public JobModel getJobModel() {
        return this.newJobModel;
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public String getProcessorId() {
        return this.processorId;
    }

    @Override // org.apache.samza.zk.ZkControllerListener
    public void onProcessorChange(List<String> list) {
        LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + list.size());
        this.debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, this.debounceTimeMs, () -> {
            doOnProcessorChange(list);
        });
    }

    void doOnProcessorChange(List<String> list) {
        List<String> actualProcessorIds = getActualProcessorIds(list);
        if (actualProcessorIds.size() != new HashSet(actualProcessorIds).size()) {
            LOG.info("Processors: {} has duplicates. Not generating job model.", actualProcessorIds);
            return;
        }
        JobModel generateNewJobModel = generateNewJobModel(actualProcessorIds);
        if (!this.hasCreatedChangeLogStreams) {
            JobModelManager.createChangeLogStreams(new StorageConfig(this.config), generateNewJobModel.maxChangeLogStreamPartitions);
            this.hasCreatedChangeLogStreams = true;
        }
        String jobModelVersion = this.zkUtils.getJobModelVersion();
        String num = jobModelVersion == null ? "1" : Integer.toString(Integer.valueOf(jobModelVersion).intValue() + 1);
        LOG.info("pid=" + this.processorId + "Generated new Job Model. Version = " + num);
        this.zkUtils.publishJobModel(num, generateNewJobModel);
        this.barrier.create(num, actualProcessorIds);
        this.zkUtils.publishJobModelVersion(jobModelVersion, num);
        LOG.info("pid=" + this.processorId + "Published new Job Model. Version = " + num);
        this.debounceTimer.scheduleAfterDebounceTime(ON_ZK_CLEANUP, 0L, () -> {
            this.zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE);
        });
    }

    @Override // org.apache.samza.zk.ZkControllerListener
    public void onNewJobModelAvailable(String str) {
        this.debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0L, () -> {
            LOG.info("pid=" + this.processorId + ": new JobModel available");
            this.newJobModel = this.zkUtils.getJobModel(str);
            LOG.info("pid=" + this.processorId + ": new JobModel available. ver=" + str + "; jm = " + this.newJobModel);
            if (!this.newJobModel.getContainers().containsKey(this.processorId)) {
                LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}", this.processorId, this.newJobModel);
                stop();
            } else {
                if (this.coordinatorListener != null) {
                    this.coordinatorListener.onJobModelExpired();
                }
                this.barrier.join(str, this.processorId);
            }
        });
    }

    @Override // org.apache.samza.zk.ZkControllerListener
    public void onNewJobModelConfirmed(String str) {
        LOG.info("pid=" + this.processorId + "new version " + str + " of the job model got confirmed");
        JobModel jobModel = getJobModel();
        if (this.coordinatorListener != null) {
            this.coordinatorListener.onNewJobModel(this.processorId, jobModel);
        }
    }

    private String createProcessorId(Config config) {
        ApplicationConfig applicationConfig = new ApplicationConfig(config);
        if (applicationConfig.getProcessorId() != null) {
            return applicationConfig.getProcessorId();
        }
        if (StringUtils.isNotBlank(applicationConfig.getAppProcessorIdGeneratorClass())) {
            return ((ProcessorIdGenerator) ClassLoaderHelper.fromClassName(applicationConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class)).generateProcessorId(config);
        }
        throw new ConfigException(String.format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID, ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
    }

    private List<String> getActualProcessorIds(List<String> list) {
        return list.size() > 0 ? this.zkUtils.getActiveProcessorsIDs(list) : this.zkUtils.getSortedActiveProcessorsIDs();
    }

    private JobModel generateNewJobModel(List<String> list) {
        String jobModelVersion = this.zkUtils.getJobModelVersion();
        if (jobModelVersion != null && !Objects.equals(this.cachedJobModelVersion, jobModelVersion)) {
            Iterator<ContainerModel> it = this.zkUtils.getJobModel(jobModelVersion).getContainers().values().iterator();
            while (it.hasNext()) {
                it.next().getTasks().forEach((taskName, taskModel) -> {
                    this.changeLogPartitionMap.put(taskName, Integer.valueOf(taskModel.getChangelogPartition().getPartitionId()));
                });
            }
            this.cachedJobModelVersion = jobModelVersion;
        }
        return JobModelManager.readJobModel(this.config, this.changeLogPartitionMap, null, this.streamMetadataCache, list);
    }

    @VisibleForTesting
    public ZkUtils getZkUtils() {
        return this.zkUtils;
    }
}
