package org.apache.samza.zk;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.runtime.ProcessorIdGenerator;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.util.ClassLoaderHelper;
import org.apache.samza.zk.ZkBarrierForVersionUpgrade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/zk/ZkJobCoordinator.class */
public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
    private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinator.class);
    private static final int METADATA_CACHE_TTL_MS = 5000;
    private final ZkUtils zkUtils;
    private final String processorId;
    private final ZkController zkController;
    private final Config config;
    private final ZkBarrierForVersionUpgrade barrier;
    private StreamMetadataCache streamMetadataCache = null;
    private ScheduleAfterDebounceTime debounceTimer = null;
    private JobCoordinatorListener coordinatorListener = null;
    private JobModel newJobModel;
    private int debounceTimeMs;

    /* loaded from: input_file:org/apache/samza/zk/ZkJobCoordinator$LeaderElectorListenerImpl.class */
    class LeaderElectorListenerImpl implements LeaderElectorListener {
        LeaderElectorListenerImpl() {
        }

        @Override // org.apache.samza.coordinator.LeaderElectorListener
        public void onBecomingLeader() {
            ZkJobCoordinator.LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
            ZkJobCoordinator.this.zkController.subscribeToProcessorChange();
            ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, ZkJobCoordinator.this.debounceTimeMs, () -> {
                ZkJobCoordinator.this.doOnProcessorChange(new ArrayList());
            });
        }
    }

    /* loaded from: input_file:org/apache/samza/zk/ZkJobCoordinator$ZkBarrierListenerImpl.class */
    class ZkBarrierListenerImpl implements ZkBarrierListener {
        private final String barrierAction = "BarrierAction";

        ZkBarrierListenerImpl() {
        }

        @Override // org.apache.samza.zk.ZkBarrierListener
        public void onBarrierCreated(String str) {
            ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime("BarrierAction", new ZkConfig(ZkJobCoordinator.this.config).getZkBarrierTimeoutMs(), () -> {
                ZkJobCoordinator.this.barrier.expire(str);
            });
        }

        @Override // org.apache.samza.zk.ZkBarrierListener
        public void onBarrierStateChanged(String str, ZkBarrierForVersionUpgrade.State state) {
            ZkJobCoordinator.LOG.info("JobModel version " + str + " obtained consensus successfully!");
            if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
                ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime("BarrierAction", 0L, () -> {
                    ZkJobCoordinator.this.onNewJobModelConfirmed(str);
                });
            } else if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
                ZkJobCoordinator.LOG.info("Barrier for version " + str + " timed out.");
            }
        }

        @Override // org.apache.samza.zk.ZkBarrierListener
        public void onBarrierError(String str, Throwable th) {
            ZkJobCoordinator.LOG.error("Encountered error while attaining consensus on JobModel version " + str);
            ZkJobCoordinator.this.stop();
        }
    }

    public ZkJobCoordinator(Config config) {
        this.config = config;
        ZkConfig zkConfig = new ZkConfig(config);
        ZkKeyBuilder zkKeyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getAppId());
        this.zkUtils = new ZkUtils(zkKeyBuilder, ZkCoordinationServiceFactory.createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()), zkConfig.getZkConnectionTimeoutMs());
        this.processorId = createProcessorId(config);
        ZkLeaderElector zkLeaderElector = new ZkLeaderElector(this.processorId, this.zkUtils);
        zkLeaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
        this.zkController = new ZkControllerImpl(this.processorId, this.zkUtils, this, zkLeaderElector);
        this.barrier = new ZkBarrierForVersionUpgrade(zkKeyBuilder.getJobModelVersionBarrierPrefix(), this.zkUtils, new ZkBarrierListenerImpl());
        this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public void start() {
        this.streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, this.config);
        this.debounceTimer = new ScheduleAfterDebounceTime(th -> {
            LOG.error("Received exception from in JobCoordinator Processing!", th);
            stop();
        });
        this.zkController.register();
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public synchronized void stop() {
        if (this.coordinatorListener != null) {
            this.coordinatorListener.onJobModelExpired();
        }
        this.debounceTimer.stopScheduler();
        this.zkController.stop();
        if (this.coordinatorListener != null) {
            this.coordinatorListener.onCoordinatorStop();
        }
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public void setListener(JobCoordinatorListener jobCoordinatorListener) {
        this.coordinatorListener = jobCoordinatorListener;
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public JobModel getJobModel() {
        return this.newJobModel;
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public String getProcessorId() {
        return this.processorId;
    }

    @Override // org.apache.samza.zk.ZkControllerListener
    public void onProcessorChange(List<String> list) {
        LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + list.size());
        this.debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, this.debounceTimeMs, () -> {
            doOnProcessorChange(list);
        });
    }

    void doOnProcessorChange(List<String> list) {
        List<String> actualProcessorIds = getActualProcessorIds(list);
        JobModel generateNewJobModel = generateNewJobModel(actualProcessorIds);
        String jobModelVersion = this.zkUtils.getJobModelVersion();
        String num = jobModelVersion == null ? "1" : Integer.toString(Integer.valueOf(jobModelVersion).intValue() + 1);
        LOG.info("pid=" + this.processorId + "Generated new Job Model. Version = " + num);
        this.zkUtils.publishJobModel(num, generateNewJobModel);
        this.barrier.create(num, actualProcessorIds);
        this.zkUtils.publishJobModelVersion(jobModelVersion, num);
        LOG.info("pid=" + this.processorId + "Published new Job Model. Version = " + num);
    }

    @Override // org.apache.samza.zk.ZkControllerListener
    public void onNewJobModelAvailable(String str) {
        this.debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0L, () -> {
            LOG.info("pid=" + this.processorId + "new JobModel available");
            if (this.coordinatorListener != null) {
                this.coordinatorListener.onJobModelExpired();
            }
            this.newJobModel = this.zkUtils.getJobModel(str);
            LOG.info("pid=" + this.processorId + ": new JobModel available. ver=" + str + "; jm = " + this.newJobModel);
            this.barrier.join(str, this.processorId);
        });
    }

    @Override // org.apache.samza.zk.ZkControllerListener
    public void onNewJobModelConfirmed(String str) {
        LOG.info("pid=" + this.processorId + "new version " + str + " of the job model got confirmed");
        JobModel jobModel = getJobModel();
        if (this.coordinatorListener != null) {
            this.coordinatorListener.onNewJobModel(this.processorId, jobModel);
        }
    }

    private String createProcessorId(Config config) {
        ApplicationConfig applicationConfig = new ApplicationConfig(config);
        if (applicationConfig.getProcessorId() != null) {
            return applicationConfig.getProcessorId();
        }
        if (applicationConfig.getAppProcessorIdGeneratorClass() != null) {
            return ((ProcessorIdGenerator) ClassLoaderHelper.fromClassName(applicationConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class)).generateProcessorId(config);
        }
        throw new ConfigException(String.format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID, ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
    }

    private List<String> getActualProcessorIds(List<String> list) {
        return list.size() > 0 ? this.zkUtils.getActiveProcessorsIDs(list) : this.zkUtils.getSortedActiveProcessorsIDs();
    }

    private JobModel generateNewJobModel(List<String> list) {
        return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, this.streamMetadataCache, list);
    }
}
