package org.apache.samza.execution;

import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metadatastore.MetadataStoreFactory;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.zk.ZkMetadataStoreFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/execution/LocalJobPlanner.class */
public class LocalJobPlanner extends JobPlanner {
    private static final String STREAM_CREATION_METADATA_STORE = "StreamCreationCoordinationStore";
    private static final String METADATA_STORE_FACTORY_CONFIG = "metadata.store.factory";
    private static final String STREAM_CREATED_STATE_KEY = "StreamCreated_%s";
    private final String processorId;
    private final CoordinationUtils coordinationUtils;
    private final String runId;
    private static final Logger LOG = LoggerFactory.getLogger(LocalJobPlanner.class);
    public static final String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName();

    public LocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> applicationDescriptorImpl, String str) {
        super(applicationDescriptorImpl);
        this.processorId = str;
        this.coordinationUtils = new JobCoordinatorConfig(this.userConfig).getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX, str, this.userConfig);
        this.runId = null;
    }

    public LocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> applicationDescriptorImpl, CoordinationUtils coordinationUtils, String str, String str2) {
        super(applicationDescriptorImpl);
        this.coordinationUtils = coordinationUtils;
        this.processorId = str;
        this.runId = str2;
    }

    @Override // org.apache.samza.execution.JobPlanner
    public List<JobConfig> prepareJobs() {
        ExecutionPlan executionPlan = getExecutionPlan(this.runId);
        try {
            String planAsJson = executionPlan.getPlanAsJson();
            writePlanJsonFile(planAsJson);
            LOG.info("Execution Plan: \n" + planAsJson);
            String valueOf = String.valueOf(planAsJson.hashCode());
            List<JobConfig> jobConfigs = executionPlan.getJobConfigs();
            if (jobConfigs.isEmpty()) {
                throw new SamzaException("No jobs in the plan.");
            }
            StreamManager streamManager = null;
            try {
                streamManager = buildAndStartStreamManager(jobConfigs.get(0));
                createStreams(valueOf, executionPlan.getIntermediateStreams(), streamManager);
                if (streamManager != null) {
                    streamManager.stop();
                }
                return jobConfigs;
            } catch (Throwable th) {
                if (streamManager != null) {
                    streamManager.stop();
                }
                throw th;
            }
        } catch (Exception e) {
            throw new SamzaException("Failed to create plan JSON.", e);
        }
    }

    private void createStreams(String str, List<StreamSpec> list, StreamManager streamManager) {
        if (list.isEmpty()) {
            LOG.info("Set of intermediate streams is empty. Nothing to create.");
            return;
        }
        LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", this.processorId);
        if (this.coordinationUtils == null) {
            LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", this.processorId);
            streamManager.createStreams(list);
            return;
        }
        boolean z = new ApplicationConfig(this.userConfig).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
        String str2 = str;
        if (z && this.runId != null) {
            str2 = this.runId;
        }
        try {
            try {
                checkAndCreateStreams(str2, list, streamManager);
                if (z || this.coordinationUtils == null) {
                    return;
                }
                this.coordinationUtils.close();
            } catch (TimeoutException e) {
                throw new SamzaException(String.format("Processor {} failed to get the lock for stream initialization within timeout.", this.processorId), e);
            }
        } catch (Throwable th) {
            if (!z && this.coordinationUtils != null) {
                this.coordinationUtils.close();
            }
            throw th;
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:15:0x0087, code lost:
    
        org.apache.samza.execution.LocalJobPlanner.LOG.info("lock acquired for streams creation by Processor " + r8.processorId);
        r11.createStreams(r10);
        r0.put(java.lang.String.format(org.apache.samza.execution.LocalJobPlanner.STREAM_CREATED_STATE_KEY, r9), ("Streams created by processor " + r8.processorId).getBytes("UTF-8"));
        r0.flush();
        r0.unlock();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void checkAndCreateStreams(java.lang.String r9, java.util.List<org.apache.samza.system.StreamSpec> r10, org.apache.samza.execution.StreamManager r11) throws java.util.concurrent.TimeoutException {
        /*
            Method dump skipped, instructions count: 332
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.samza.execution.LocalJobPlanner.checkAndCreateStreams(java.lang.String, java.util.List, org.apache.samza.execution.StreamManager):void");
    }

    private MetadataStore getMetadataStore() {
        String str = (String) this.appDesc.getConfig().get("metadata.store.factory");
        if (str == null) {
            str = DEFAULT_METADATA_STORE_FACTORY;
        }
        return ((MetadataStoreFactory) ReflectionUtil.getObj(str, MetadataStoreFactory.class)).getMetadataStore(STREAM_CREATION_METADATA_STORE, this.appDesc.getConfig(), new MetricsRegistryMap());
    }
}
