package org.apache.samza.zk;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.I0Itec.zkclient.IZkStateListener;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.JobModelCalculator;
import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.coordinator.MetadataResourceUtil;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
import org.apache.samza.coordinator.StreamPartitionCountMonitorFactory;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.JobModelUtil;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.runtime.LocationId;
import org.apache.samza.runtime.LocationIdProviderFactory;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.SystemClock;
import org.apache.samza.zk.ZkBarrierForVersionUpgrade;
import org.apache.samza.zk.ZkUtils;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/zk/ZkJobCoordinator.class */
public class ZkJobCoordinator implements JobCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinator.class);
    private static final int METADATA_CACHE_TTL_MS = 5000;
    private static final int NUM_VERSIONS_TO_LEAVE = 10;
    private static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange";
    private static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
    private static final String ON_ZK_CLEANUP = "OnCleanUp";
    static final String START_WORK_WITH_LAST_ACTIVE_JOB_MODEL = "StartWorkWithLastActiveJobModel";
    private final ZkUtils zkUtils;
    private final String processorId;
    private final Config config;
    private final ZkJobCoordinatorMetrics metrics;
    private final StreamMetadataCache streamMetadataCache;
    private final SystemAdmins systemAdmins;
    private final int debounceTimeMs;
    private final LocationId locationId;
    private final MetadataStore jobModelMetadataStore;
    private final CoordinatorStreamStore coordinatorStreamStore;
    private JobModel activeJobModel;
    private JobModel latestJobModel;
    private ZkBarrierForVersionUpgrade barrier;
    private ZkLeaderElector leaderElector;

    @VisibleForTesting
    ZkSessionMetrics zkSessionMetrics;

    @VisibleForTesting
    ScheduleAfterDebounceTime debounceTimer;
    private final AtomicBoolean initiatedShutdown = new AtomicBoolean(false);
    private final Map<TaskName, Integer> changeLogPartitionMap = new HashMap();
    private final AtomicBoolean jobModelExpired = new AtomicBoolean(false);
    private JobCoordinatorListener coordinatorListener = null;
    private boolean hasLoadedMetadataResources = false;
    private String cachedJobModelVersion = null;

    @VisibleForTesting
    StreamPartitionCountMonitor streamPartitionCountMonitor = null;

    /* renamed from: org.apache.samza.zk.ZkJobCoordinator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/samza/zk/ZkJobCoordinator$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$zookeeper$Watcher$Event$KeeperState = new int[Watcher.Event.KeeperState.values().length];

        static {
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$KeeperState[Watcher.Event.KeeperState.Expired.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$KeeperState[Watcher.Event.KeeperState.Disconnected.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$KeeperState[Watcher.Event.KeeperState.AuthFailed.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$KeeperState[Watcher.Event.KeeperState.NoSyncConnected.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$KeeperState[Watcher.Event.KeeperState.Unknown.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$KeeperState[Watcher.Event.KeeperState.SyncConnected.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* loaded from: input_file:org/apache/samza/zk/ZkJobCoordinator$LeaderElectorListenerImpl.class */
    class LeaderElectorListenerImpl implements LeaderElectorListener {
        LeaderElectorListenerImpl() {
        }

        @Override // org.apache.samza.coordinator.LeaderElectorListener
        public void onBecomingLeader() {
            ZkJobCoordinator.LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader");
            ZkJobCoordinator.this.metrics.isLeader.set(1);
            ZkJobCoordinator.this.zkUtils.subscribeToProcessorChange(new ProcessorChangeHandler(ZkJobCoordinator.this.zkUtils));
            if (!new StorageConfig(ZkJobCoordinator.this.config).hasDurableStores()) {
                if (ZkJobCoordinator.this.streamPartitionCountMonitor != null) {
                    ZkJobCoordinator.this.streamPartitionCountMonitor.stop();
                }
                ZkJobCoordinator.this.streamPartitionCountMonitor = ZkJobCoordinator.this.getPartitionCountMonitor();
                ZkJobCoordinator.this.streamPartitionCountMonitor.start();
            }
            ScheduleAfterDebounceTime scheduleAfterDebounceTime = ZkJobCoordinator.this.debounceTimer;
            long j = ZkJobCoordinator.this.debounceTimeMs;
            ZkJobCoordinator zkJobCoordinator = ZkJobCoordinator.this;
            scheduleAfterDebounceTime.scheduleAfterDebounceTime(ZkJobCoordinator.ON_PROCESSOR_CHANGE, j, zkJobCoordinator::doOnProcessorChange);
        }
    }

    /* loaded from: input_file:org/apache/samza/zk/ZkJobCoordinator$ProcessorChangeHandler.class */
    class ProcessorChangeHandler extends ZkUtils.GenerationAwareZkChildListener {
        public ProcessorChangeHandler(ZkUtils zkUtils) {
            super(zkUtils, "ProcessorChangeHandler");
        }

        @Override // org.apache.samza.zk.ZkUtils.GenerationAwareZkChildListener
        public void doHandleChildChange(String str, List<String> list) throws Exception {
            if (list == null) {
                ZkJobCoordinator.LOG.info("handleChildChange on path " + str + " was invoked with NULL list of children");
            } else {
                ZkJobCoordinator.LOG.info("ProcessorChangeHandler::handleChildChange - Path: {} Current Children: {} ", str, list);
                ZkJobCoordinator.this.onProcessorChange(list);
            }
        }
    }

    /* loaded from: input_file:org/apache/samza/zk/ZkJobCoordinator$ZkBarrierListenerImpl.class */
    class ZkBarrierListenerImpl implements ZkBarrierListener {
        private final String barrierAction = "BarrierAction";
        private long startTime = 0;

        ZkBarrierListenerImpl() {
        }

        @Override // org.apache.samza.zk.ZkBarrierListener
        public void onBarrierCreated(String str) {
            this.startTime = System.nanoTime();
            ZkJobCoordinator.this.metrics.barrierCreation.inc();
            if (ZkJobCoordinator.this.leaderElector.amILeader()) {
                ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime("BarrierAction", new ZkConfig(ZkJobCoordinator.this.config).getZkBarrierTimeoutMs(), () -> {
                    ZkJobCoordinator.this.barrier.expire(str);
                });
            }
        }

        @Override // org.apache.samza.zk.ZkBarrierListener
        public void onBarrierStateChanged(String str, ZkBarrierForVersionUpgrade.State state) {
            ZkJobCoordinator.LOG.info("JobModel version " + str + " obtained consensus successfully!");
            ZkJobCoordinator.this.metrics.barrierStateChange.inc();
            ZkJobCoordinator.this.metrics.singleBarrierRebalancingTime.update(System.nanoTime() - this.startTime);
            if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
                ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime("BarrierAction", 0L, () -> {
                    ZkJobCoordinator.LOG.info("pid=" + ZkJobCoordinator.this.processorId + "new version " + str + " of the job model got confirmed");
                    if (ZkJobCoordinator.this.leaderElector.amILeader()) {
                        ZkJobCoordinator.this.zkUtils.publishActiveJobModelVersion(str);
                    }
                    ZkJobCoordinator.this.onNewJobModel(ZkJobCoordinator.this.getJobModel());
                });
                return;
            }
            if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
                ZkJobCoordinator.LOG.warn("Barrier for version " + str + " timed out.");
                if (ZkJobCoordinator.this.leaderElector.amILeader()) {
                    ZkJobCoordinator.LOG.info("Leader will schedule a new job model generation");
                    ScheduleAfterDebounceTime scheduleAfterDebounceTime = ZkJobCoordinator.this.debounceTimer;
                    long j = ZkJobCoordinator.this.debounceTimeMs;
                    ZkJobCoordinator zkJobCoordinator = ZkJobCoordinator.this;
                    scheduleAfterDebounceTime.scheduleAfterDebounceTime(ZkJobCoordinator.ON_PROCESSOR_CHANGE, j, zkJobCoordinator::doOnProcessorChange);
                }
            }
        }

        @Override // org.apache.samza.zk.ZkBarrierListener
        public void onBarrierError(String str, Throwable th) {
            ZkJobCoordinator.LOG.error("Encountered error while attaining consensus on JobModel version " + str);
            ZkJobCoordinator.this.metrics.barrierError.inc();
            ZkJobCoordinator.this.stop();
        }
    }

    /* loaded from: input_file:org/apache/samza/zk/ZkJobCoordinator$ZkJobModelVersionChangeHandler.class */
    class ZkJobModelVersionChangeHandler extends ZkUtils.GenerationAwareZkDataListener {
        public ZkJobModelVersionChangeHandler(ZkUtils zkUtils) {
            super(zkUtils, "ZkJobModelVersionChangeHandler");
        }

        @Override // org.apache.samza.zk.ZkUtils.GenerationAwareZkDataListener
        public void doHandleDataChange(String str, Object obj) {
            ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime(ZkJobCoordinator.JOB_MODEL_VERSION_CHANGE, 0L, () -> {
                String str2 = (String) obj;
                ZkJobCoordinator.LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", str, obj);
                ZkJobCoordinator.this.latestJobModel = ZkJobCoordinator.this.readJobModelFromMetadataStore(str2);
                ZkJobCoordinator.LOG.info("pid=" + ZkJobCoordinator.this.processorId + ": new JobModel is available. Version =" + str2 + "; JobModel = " + ZkJobCoordinator.this.latestJobModel);
                ZkJobCoordinator.this.checkAndExpireJobModel(ZkJobCoordinator.this.latestJobModel);
                ZkJobCoordinator.this.barrier.join(str2, ZkJobCoordinator.this.processorId);
            });
        }

        @Override // org.apache.samza.zk.ZkUtils.GenerationAwareZkDataListener
        public void doHandleDataDeleted(String str) {
            ZkJobCoordinator.LOG.warn("JobModel version z-node has been deleted. Shutting down the coordinator" + str);
            ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime("JOB_MODEL_VERSION_DELETED", 0L, () -> {
                ZkJobCoordinator.this.stop();
            });
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/samza/zk/ZkJobCoordinator$ZkSessionStateChangedListener.class */
    class ZkSessionStateChangedListener implements IZkStateListener {
        private static final String ZK_SESSION_ERROR = "ZK_SESSION_ERROR";
        private static final String ZK_SESSION_EXPIRED = "ZK_SESSION_EXPIRED";

        ZkSessionStateChangedListener() {
        }

        public void handleStateChanged(Watcher.Event.KeeperState keeperState) {
            switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$Watcher$Event$KeeperState[keeperState.ordinal()]) {
                case 1:
                    ZkJobCoordinator.this.zkSessionMetrics.zkSessionExpirations.inc();
                    ZkJobCoordinator.LOG.warn("Got " + keeperState.toString() + " event for processor=" + ZkJobCoordinator.this.processorId + ". Stopping the container and unregister the processor node.");
                    ZkJobCoordinator.this.zkUtils.incGeneration();
                    ZkJobCoordinator.this.zkUtils.unregister();
                    if (ZkJobCoordinator.this.leaderElector.amILeader()) {
                        ZkJobCoordinator.this.leaderElector.resignLeadership();
                    }
                    if (ZkJobCoordinator.this.streamPartitionCountMonitor != null) {
                        ZkJobCoordinator.this.streamPartitionCountMonitor.stop();
                    }
                    ZkJobCoordinator.LOG.info("Cancelling all scheduled actions in session expiration for processorId: {}.", ZkJobCoordinator.this.processorId);
                    ZkJobCoordinator.this.debounceTimer.cancelAllScheduledActions();
                    ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_EXPIRED, 0L, () -> {
                        if (ZkJobCoordinator.this.coordinatorListener != null) {
                            ZkJobCoordinator.this.coordinatorListener.onJobModelExpired();
                        }
                    });
                    return;
                case CoordinatorStreamMessage.KEY_INDEX /* 2 */:
                    ZkJobCoordinator.this.zkSessionMetrics.zkSessionDisconnects.inc();
                    ZkJobCoordinator.LOG.warn("Got " + keeperState.toString() + " event for processor=" + ZkJobCoordinator.this.processorId + ". Scheduling a coordinator stop.");
                    ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, new ZkConfig(ZkJobCoordinator.this.config).getZkSessionTimeoutMs(), () -> {
                        ZkJobCoordinator.this.stop();
                    });
                    return;
                case 3:
                case 4:
                case 5:
                    ZkJobCoordinator.this.zkSessionMetrics.zkSessionErrors.inc();
                    ZkJobCoordinator.LOG.warn("Got unexpected failure event " + keeperState.toString() + " for processor=" + ZkJobCoordinator.this.processorId + ". Stopping the job coordinator.");
                    ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0L, () -> {
                        ZkJobCoordinator.this.stop();
                    });
                    return;
                case 6:
                    ZkJobCoordinator.this.zkSessionMetrics.zkSyncConnected.inc();
                    ZkJobCoordinator.LOG.info("Got syncconnected event for processor=" + ZkJobCoordinator.this.processorId + ".");
                    ZkJobCoordinator.this.debounceTimer.cancelAction(ZK_SESSION_ERROR);
                    return;
                default:
                    ZkJobCoordinator.LOG.info("Got ZK event " + keeperState.toString() + " for processor=" + ZkJobCoordinator.this.processorId + ". Continue");
                    return;
            }
        }

        public void handleNewSession() {
            ZkJobCoordinator.this.zkSessionMetrics.zkNewSessions.inc();
            ZkJobCoordinator.LOG.info("Got new session created event for processor=" + ZkJobCoordinator.this.processorId);
            ZkJobCoordinator.this.debounceTimer.cancelAllScheduledActions();
            ZkJobCoordinator.LOG.info("register zk controller for the new session");
            ZkJobCoordinator.this.leaderElector.tryBecomeLeader();
            ZkJobCoordinator.this.zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(ZkJobCoordinator.this.zkUtils));
        }

        public void handleSessionEstablishmentError(Throwable th) {
            ZkJobCoordinator.this.zkSessionMetrics.zkSessionErrors.inc();
            ZkJobCoordinator.LOG.info("handleSessionEstablishmentError received for processor=" + ZkJobCoordinator.this.processorId, th);
            ZkJobCoordinator.this.debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0L, () -> {
                ZkJobCoordinator.this.stop();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZkJobCoordinator(String str, Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils, MetadataStore metadataStore, MetadataStore metadataStore2) {
        Preconditions.checkArgument(metadataStore2 instanceof CoordinatorStreamStore);
        this.config = config;
        this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
        this.zkSessionMetrics = new ZkSessionMetrics(metricsRegistry);
        this.processorId = str;
        this.zkUtils = zkUtils;
        zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
        this.leaderElector = new ZkLeaderElector(str, zkUtils);
        this.leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
        this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
        this.debounceTimer = new ScheduleAfterDebounceTime(str);
        this.debounceTimer.setScheduledTaskCallback(th -> {
            LOG.error("Received exception in debounce timer! Stopping the job coordinator", th);
            stop();
        });
        this.barrier = new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), this.debounceTimer);
        this.systemAdmins = new SystemAdmins(config, getClass().getSimpleName());
        this.streamMetadataCache = new StreamMetadataCache(this.systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
        this.locationId = ((LocationIdProviderFactory) ReflectionUtil.getObj(new JobConfig(config).getLocationIdProviderFactory(), LocationIdProviderFactory.class)).getLocationIdProvider(config).getLocationId();
        this.coordinatorStreamStore = (CoordinatorStreamStore) metadataStore2;
        this.jobModelMetadataStore = metadataStore;
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public void start() {
        ZkKeyBuilder keyBuilder = this.zkUtils.getKeyBuilder();
        this.zkUtils.validateZkVersion();
        this.zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder.getActiveJobModelVersionPath(), keyBuilder.getJobModelPathPrefix(), keyBuilder.getTaskLocalityPath()});
        this.jobModelMetadataStore.init();
        this.systemAdmins.start();
        this.leaderElector.tryBecomeLeader();
        this.zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(this.zkUtils));
        if (new ZkConfig(this.config).getEnableStartupWithActiveJobModel()) {
            this.debounceTimer.scheduleAfterDebounceTime(START_WORK_WITH_LAST_ACTIVE_JOB_MODEL, 0L, this::startWorkWithLastActiveJobModel);
        }
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public void stop() {
        if (!this.initiatedShutdown.compareAndSet(false, true)) {
            LOG.info("Job Coordinator shutdown is in progress!");
            return;
        }
        LOG.info("Shutting down JobCoordinator.");
        boolean z = false;
        this.metrics.isLeader.set(0);
        try {
            try {
                if (this.coordinatorListener != null) {
                    this.coordinatorListener.onJobModelExpired();
                }
                this.debounceTimer.stopScheduler();
                if (this.leaderElector.amILeader()) {
                    LOG.info("Resigning leadership for processorId: " + this.processorId);
                    this.leaderElector.resignLeadership();
                }
                LOG.info("Shutting down ZkUtils.");
                if (this.zkUtils != null) {
                    this.zkUtils.close();
                }
                LOG.debug("Shutting down system admins.");
                this.systemAdmins.stop();
                if (this.streamPartitionCountMonitor != null) {
                    this.streamPartitionCountMonitor.stop();
                }
                if (this.coordinatorListener != null) {
                    this.coordinatorListener.onCoordinatorStop();
                }
                this.jobModelMetadataStore.close();
                z = true;
                LOG.info("Job Coordinator shutdown finished with ShutdownComplete=true");
            } catch (Throwable th) {
                LOG.error("Encountered errors during job coordinator stop.", th);
                if (this.coordinatorListener != null) {
                    this.coordinatorListener.onCoordinatorFailure(th);
                }
                LOG.info("Job Coordinator shutdown finished with ShutdownComplete=" + z);
            }
        } catch (Throwable th2) {
            LOG.info("Job Coordinator shutdown finished with ShutdownComplete=" + z);
            throw th2;
        }
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public void setListener(JobCoordinatorListener jobCoordinatorListener) {
        this.coordinatorListener = jobCoordinatorListener;
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public JobModel getJobModel() {
        return this.latestJobModel;
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public String getProcessorId() {
        return this.processorId;
    }

    public void onProcessorChange(List<String> list) {
        if (this.leaderElector.amILeader()) {
            LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed. List size=" + list.size());
            this.debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, this.debounceTimeMs, this::doOnProcessorChange);
        }
    }

    void doOnProcessorChange() {
        List<ZkUtils.ProcessorNode> allProcessorNodes = this.zkUtils.getAllProcessorNodes();
        ArrayList arrayList = new ArrayList();
        Iterator<ZkUtils.ProcessorNode> it = allProcessorNodes.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getProcessorData().getProcessorId());
        }
        if (arrayList.size() != new HashSet(arrayList).size()) {
            LOG.info("Processors: {} has duplicates. Not generating JobModel.", arrayList);
            return;
        }
        LOG.info("Generating new JobModel with processors: {}.", arrayList);
        JobModel generateNewJobModel = generateNewJobModel(allProcessorNodes);
        if (new ZkConfig(this.config).getEnableStartupWithActiveJobModel() && JobModelUtil.compareContainerModels(generateNewJobModel, this.activeJobModel)) {
            LOG.info("Skipping rebalance since there are no changes in work assignment");
            return;
        }
        if (!this.hasLoadedMetadataResources) {
            loadMetadataResources(generateNewJobModel);
            this.hasLoadedMetadataResources = true;
        }
        String jobModelVersion = this.zkUtils.getJobModelVersion();
        String nextJobModelVersion = this.zkUtils.getNextJobModelVersion(jobModelVersion);
        LOG.info("pid=" + this.processorId + "Generated new JobModel with version: " + nextJobModelVersion + " and processors: " + arrayList);
        publishJobModelToMetadataStore(generateNewJobModel, nextJobModelVersion);
        this.barrier.create(nextJobModelVersion, arrayList);
        this.zkUtils.publishJobModelVersion(jobModelVersion, nextJobModelVersion);
        LOG.info("pid=" + this.processorId + "Published new Job Model. Version = " + nextJobModelVersion);
        this.debounceTimer.scheduleAfterDebounceTime(ON_ZK_CLEANUP, 0L, () -> {
            this.zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE);
        });
    }

    @VisibleForTesting
    void publishJobModelToMetadataStore(JobModel jobModel, String str) {
        JobModelUtil.writeJobModel(jobModel, str, this.jobModelMetadataStore);
    }

    @VisibleForTesting
    JobModel readJobModelFromMetadataStore(String str) {
        return JobModelUtil.readJobModel(str, this.jobModelMetadataStore);
    }

    @VisibleForTesting
    void loadMetadataResources(JobModel jobModel) {
        try {
            createMetadataResourceUtil(jobModel, this.config).createResources();
            if (this.coordinatorStreamStore != null) {
                CoordinatorStreamValueSerde coordinatorStreamValueSerde = new CoordinatorStreamValueSerde("set-config");
                NamespaceAwareCoordinatorStreamStore namespaceAwareCoordinatorStreamStore = new NamespaceAwareCoordinatorStreamStore(this.coordinatorStreamStore, "set-config");
                for (Map.Entry entry : this.config.entrySet()) {
                    namespaceAwareCoordinatorStreamStore.put((String) entry.getKey(), coordinatorStreamValueSerde.toBytes((String) entry.getValue()));
                }
                namespaceAwareCoordinatorStreamStore.flush();
                if (new JobConfig(this.config).getStartpointEnabled()) {
                    StartpointManager createStartpointManager = createStartpointManager();
                    createStartpointManager.start();
                    try {
                        createStartpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
                        createStartpointManager.stop();
                    } catch (Throwable th) {
                        createStartpointManager.stop();
                        throw th;
                    }
                }
            } else {
                LOG.warn("No metadata store registered to this job coordinator. Config not written to the metadata store and no Startpoints fan out.");
            }
        } catch (IOException e) {
            throw new SamzaException(String.format("IO exception while loading metadata resources.", new Object[0]), e);
        }
    }

    @VisibleForTesting
    MetadataResourceUtil createMetadataResourceUtil(JobModel jobModel, Config config) {
        return new MetadataResourceUtil(jobModel, this.metrics.getMetricsRegistry(), config);
    }

    @VisibleForTesting
    JobModel generateNewJobModel(List<ZkUtils.ProcessorNode> list) {
        String jobModelVersion = this.zkUtils.getJobModelVersion();
        if (jobModelVersion != null && !Objects.equals(this.cachedJobModelVersion, jobModelVersion)) {
            Iterator it = readJobModelFromMetadataStore(jobModelVersion).getContainers().values().iterator();
            while (it.hasNext()) {
                ((ContainerModel) it.next()).getTasks().forEach((taskName, taskModel) -> {
                    this.changeLogPartitionMap.put(taskName, Integer.valueOf(taskModel.getChangelogPartition().getPartitionId()));
                });
            }
            this.cachedJobModelVersion = jobModelVersion;
        }
        return new JobModel(new MapConfig(), JobModelCalculator.INSTANCE.calculateJobModel(this.config, this.changeLogPartitionMap, this.streamMetadataCache, getGrouperMetadata(jobModelVersion, list)).getContainers());
    }

    @VisibleForTesting
    StreamPartitionCountMonitor getPartitionCountMonitor() {
        return new StreamPartitionCountMonitorFactory(new StreamMetadataCache(this.systemAdmins, 0, SystemClock.instance()), this.metrics.getMetricsRegistry()).build(this.config, set -> {
            if (this.leaderElector.amILeader()) {
                this.debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, 0L, this::doOnProcessorChange);
            }
        });
    }

    @VisibleForTesting
    StartpointManager createStartpointManager() {
        return new StartpointManager(this.coordinatorStreamStore);
    }

    @VisibleForTesting
    void checkAndExpireJobModel(JobModel jobModel) {
        Preconditions.checkNotNull(jobModel, "JobModel cannot be null");
        if (this.coordinatorListener == null) {
            LOG.info("Skipping job model expiration since there are no active listeners");
            return;
        }
        LOG.info("Checking for work assignment changes for processor {} between active job model {} and new job model {}", new Object[]{this.processorId, this.activeJobModel, jobModel});
        if (JobModelUtil.compareContainerModelForProcessor(this.processorId, this.activeJobModel, jobModel)) {
            LOG.info("Skipping job model expiration for processor {} due to no change in work assignment.", this.processorId);
            return;
        }
        LOG.info("Work assignment changed for the processor {}. Notifying job model expiration to coordinator listener", this.processorId);
        this.coordinatorListener.onJobModelExpired();
        this.jobModelExpired.set(true);
    }

    @VisibleForTesting
    void onNewJobModel(JobModel jobModel) {
        Preconditions.checkNotNull(jobModel, "JobModel cannot be null. Failing onNewJobModel");
        if (this.jobModelExpired.compareAndSet(true, false)) {
            LOG.info("Work assignment changed for the processor {}. Updating task locality and notifying coordinator listener", this.processorId);
            if (jobModel.getContainers().containsKey(this.processorId)) {
                Iterator<TaskName> it = JobModelUtil.getTaskNamesForProcessor(this.processorId, jobModel).iterator();
                while (it.hasNext()) {
                    this.zkUtils.writeTaskLocality(it.next(), this.locationId);
                }
                if (this.coordinatorListener != null) {
                    this.coordinatorListener.onNewJobModel(this.processorId, jobModel);
                }
            }
        } else {
            LOG.info("Skipping onNewJobModel since there are no changes in work assignment.");
        }
        this.activeJobModel = jobModel;
    }

    @VisibleForTesting
    JobModel getActiveJobModel() {
        return this.activeJobModel;
    }

    @VisibleForTesting
    void setActiveJobModel(JobModel jobModel) {
        this.activeJobModel = jobModel;
    }

    @VisibleForTesting
    boolean getJobModelExpired() {
        return this.jobModelExpired.get();
    }

    @VisibleForTesting
    void setJobModelExpired(boolean z) {
        this.jobModelExpired.set(z);
    }

    @VisibleForTesting
    void setDebounceTimer(ScheduleAfterDebounceTime scheduleAfterDebounceTime) {
        this.debounceTimer = scheduleAfterDebounceTime;
    }

    @VisibleForTesting
    void setLeaderElector(ZkLeaderElector zkLeaderElector) {
        this.leaderElector = zkLeaderElector;
    }

    @VisibleForTesting
    void setZkBarrierUpgradeForVersion(ZkBarrierForVersionUpgrade zkBarrierForVersionUpgrade) {
        this.barrier = zkBarrierForVersionUpgrade;
    }

    @VisibleForTesting
    void startWorkWithLastActiveJobModel() {
        LOG.info("Starting the processor with the recent active job model");
        String lastActiveJobModelVersion = this.zkUtils.getLastActiveJobModelVersion();
        String jobModelVersion = this.zkUtils.getJobModelVersion();
        if (lastActiveJobModelVersion == null || !lastActiveJobModelVersion.equals(jobModelVersion)) {
            return;
        }
        JobModel readJobModelFromMetadataStore = readJobModelFromMetadataStore(lastActiveJobModelVersion);
        checkAndExpireJobModel(readJobModelFromMetadataStore);
        onNewJobModel(readJobModelFromMetadataStore);
    }

    private GrouperMetadataImpl getGrouperMetadata(String str, List<ZkUtils.ProcessorNode> list) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        if (str != null) {
            for (ContainerModel containerModel : readJobModelFromMetadataStore(str).getContainers().values()) {
                for (TaskModel taskModel : containerModel.getTasks().values()) {
                    hashMap.put(taskModel.getTaskName(), containerModel.getId());
                    for (SystemStreamPartition systemStreamPartition : taskModel.getSystemStreamPartitions()) {
                        hashMap2.computeIfAbsent(taskModel.getTaskName(), taskName -> {
                            return new ArrayList();
                        });
                        ((List) hashMap2.get(taskModel.getTaskName())).add(systemStreamPartition);
                    }
                }
            }
        }
        HashMap hashMap3 = new HashMap();
        Iterator<ZkUtils.ProcessorNode> it = list.iterator();
        while (it.hasNext()) {
            ProcessorData processorData = it.next().getProcessorData();
            hashMap3.put(processorData.getProcessorId(), processorData.getLocationId());
        }
        return new GrouperMetadataImpl(hashMap3, this.zkUtils.readTaskLocality(), hashMap2, hashMap);
    }

    @VisibleForTesting
    public ZkUtils getZkUtils() {
        return this.zkUtils;
    }
}
