package org.apache.samza.runtime;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
import org.apache.samza.application.SamzaApplication;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.RunIdGenerator;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory;
import org.apache.samza.execution.LocalJobPlanner;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metadatastore.MetadataStoreFactory;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.processor.StreamProcessor;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.TaskFactory;
import org.apache.samza.task.TaskFactoryUtil;
import org.apache.samza.util.ConfigUtil;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.apache.samza.zk.ZkMetadataStoreFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/runtime/LocalApplicationRunner.class */
public class LocalApplicationRunner implements ApplicationRunner {
    private static final String RUN_ID_METADATA_STORE = "RunIdCoordinationStore";
    private static final String METADATA_STORE_FACTORY_CONFIG = "metadata.store.factory";
    private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
    private final Set<Pair<StreamProcessor, MetadataStore>> processors;
    private final CountDownLatch shutdownLatch;
    private final AtomicInteger numProcessorsToStart;
    private final AtomicReference<Throwable> failure;
    private final boolean isAppModeBatch;
    private final Optional<CoordinationUtils> coordinationUtils;
    private final Optional<MetadataStoreFactory> metadataStoreFactory;
    private Optional<String> runId;
    private Optional<RunIdGenerator> runIdGenerator;
    private ApplicationStatus appStatus;
    private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class);
    private static final String PROCESSOR_ID = UUID.randomUUID().toString();
    private static final String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName();

    /* loaded from: input_file:org/apache/samza/runtime/LocalApplicationRunner$LocalApplicationRunnerContext.class */
    private static final class LocalApplicationRunnerContext {
        SamzaApplication app;
        Config config;
        Optional<CoordinationUtils> coordinationUtils = Optional.empty();
        Optional<MetadataStoreFactory> metadataStoreFactory = Optional.empty();

        LocalApplicationRunnerContext(SamzaApplication samzaApplication, Config config) {
            this.app = samzaApplication;
            this.config = config;
        }

        LocalApplicationRunnerContext setCoordinationUtils(CoordinationUtils coordinationUtils) {
            this.coordinationUtils = Optional.of(coordinationUtils);
            return this;
        }

        LocalApplicationRunnerContext setMetadataStoreFactory(MetadataStoreFactory metadataStoreFactory) {
            this.metadataStoreFactory = Optional.of(metadataStoreFactory);
            return this;
        }
    }

    /* loaded from: input_file:org/apache/samza/runtime/LocalApplicationRunner$LocalStreamProcessorLifecycleListener.class */
    private final class LocalStreamProcessorLifecycleListener implements ProcessorLifecycleListener {
        private final StreamProcessor processor;
        private final ProcessorLifecycleListener userDefinedProcessorLifecycleListener;

        LocalStreamProcessorLifecycleListener(StreamProcessor streamProcessor, Config config) {
            this.userDefinedProcessorLifecycleListener = LocalApplicationRunner.this.appDesc.getProcessorLifecycleListenerFactory().createInstance(new ProcessorContext() { // from class: org.apache.samza.runtime.LocalApplicationRunner.LocalStreamProcessorLifecycleListener.1
            }, config);
            this.processor = streamProcessor;
        }

        public void beforeStart() {
            this.userDefinedProcessorLifecycleListener.beforeStart();
        }

        public void afterStart() {
            if (LocalApplicationRunner.this.numProcessorsToStart.decrementAndGet() == 0) {
                LocalApplicationRunner.this.appStatus = ApplicationStatus.Running;
            }
            this.userDefinedProcessorLifecycleListener.afterStart();
        }

        private void closeAndRemoveProcessor() {
            LocalApplicationRunner.this.processors.forEach(pair -> {
                if (((StreamProcessor) pair.getLeft()).equals(this.processor)) {
                    ((StreamProcessor) pair.getLeft()).stop();
                    if (pair.getRight() != null) {
                        ((MetadataStore) pair.getRight()).close();
                    }
                }
            });
            LocalApplicationRunner.this.processors.removeIf(pair2 -> {
                return ((StreamProcessor) pair2.getLeft()).equals(this.processor);
            });
        }

        public void afterStop() {
            closeAndRemoveProcessor();
            handleProcessorShutdown(null);
        }

        public void afterFailure(Throwable th) {
            closeAndRemoveProcessor();
            if (LocalApplicationRunner.this.failure.compareAndSet(null, th)) {
                LocalApplicationRunner.this.processors.forEach(pair -> {
                    ((StreamProcessor) pair.getLeft()).stop();
                    if (pair.getRight() != null) {
                        ((MetadataStore) pair.getRight()).close();
                    }
                });
            }
            handleProcessorShutdown(th);
        }

        private void handleProcessorShutdown(Throwable th) {
            if (LocalApplicationRunner.this.processors.isEmpty()) {
                setApplicationFinalStatus();
            }
            if (th != null) {
                this.userDefinedProcessorLifecycleListener.afterFailure(th);
            } else {
                this.userDefinedProcessorLifecycleListener.afterStop();
            }
            if (LocalApplicationRunner.this.processors.isEmpty()) {
                LocalApplicationRunner.this.cleanup();
                LocalApplicationRunner.this.shutdownLatch.countDown();
            }
        }

        private void setApplicationFinalStatus() {
            if (LocalApplicationRunner.this.failure.get() != null) {
                LocalApplicationRunner.this.appStatus = ApplicationStatus.unsuccessfulFinish((Throwable) LocalApplicationRunner.this.failure.get());
            } else if (LocalApplicationRunner.this.appStatus == ApplicationStatus.Running) {
                LocalApplicationRunner.this.appStatus = ApplicationStatus.SuccessfulFinish;
            } else if (LocalApplicationRunner.this.appStatus == ApplicationStatus.New) {
                LocalApplicationRunner.this.appStatus = ApplicationStatus.UnsuccessfulFinish;
            }
        }
    }

    public LocalApplicationRunner(SamzaApplication samzaApplication, Config config) {
        this(new LocalApplicationRunnerContext(samzaApplication, config));
    }

    public LocalApplicationRunner(SamzaApplication samzaApplication, Config config, MetadataStoreFactory metadataStoreFactory) {
        this(new LocalApplicationRunnerContext(samzaApplication, config).setMetadataStoreFactory(metadataStoreFactory));
    }

    @VisibleForTesting
    LocalApplicationRunner(SamzaApplication samzaApplication, Config config, CoordinationUtils coordinationUtils) {
        this(new LocalApplicationRunnerContext(samzaApplication, config).setCoordinationUtils(coordinationUtils));
    }

    private LocalApplicationRunner(LocalApplicationRunnerContext localApplicationRunnerContext) {
        this.processors = ConcurrentHashMap.newKeySet();
        this.shutdownLatch = new CountDownLatch(1);
        this.numProcessorsToStart = new AtomicInteger();
        this.failure = new AtomicReference<>();
        this.runId = Optional.empty();
        this.runIdGenerator = Optional.empty();
        this.appStatus = ApplicationStatus.New;
        Config config = localApplicationRunnerContext.config;
        config = new JobConfig(localApplicationRunnerContext.config).getConfigLoaderFactory().isPresent() ? ConfigUtil.loadConfig(config) : config;
        this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(localApplicationRunnerContext.app, config);
        this.isAppModeBatch = isAppModeBatch(config);
        this.coordinationUtils = localApplicationRunnerContext.coordinationUtils.isPresent() ? localApplicationRunnerContext.coordinationUtils : getCoordinationUtils(config);
        this.metadataStoreFactory = localApplicationRunnerContext.metadataStoreFactory.isPresent() ? localApplicationRunnerContext.metadataStoreFactory : getDefaultCoordinatorStreamStoreFactory(config);
    }

    @VisibleForTesting
    static Optional<MetadataStoreFactory> getDefaultCoordinatorStreamStoreFactory(Config config) {
        JobConfig jobConfig = new JobConfig(config);
        String coordinatorSystemNameOrNull = jobConfig.getCoordinatorSystemNameOrNull();
        String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(jobConfig).getJobCoordinatorFactoryClassName();
        if (StringUtils.isNotBlank(coordinatorSystemNameOrNull) && ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) {
            return Optional.of(new CoordinatorStreamMetadataStoreFactory());
        }
        LOG.warn("{} or {} not configured, or {} is not {}. No default coordinator stream metadata store will be created.", new Object[]{JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM, JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ZkJobCoordinatorFactory.class.getName()});
        return Optional.empty();
    }

    private static Optional<CoordinationUtils> getCoordinationUtils(Config config) {
        return !isAppModeBatch(config) ? Optional.empty() : Optional.ofNullable(new JobCoordinatorConfig(config).getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX, PROCESSOR_ID, config));
    }

    private static boolean isAppModeBatch(Config config) {
        return new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
    }

    @VisibleForTesting
    LocalJobPlanner getPlanner() {
        if (new ApplicationConfig(this.appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH) {
            return new LocalJobPlanner(this.appDesc, this.coordinationUtils.orElse(null), PROCESSOR_ID, this.runId.orElse(null));
        }
        return new LocalJobPlanner(this.appDesc, PROCESSOR_ID);
    }

    private void initializeRunId() {
        if (!this.isAppModeBatch) {
            LOG.info("Not BATCH mode and hence not generating run id");
            return;
        }
        if (!this.coordinationUtils.isPresent()) {
            LOG.warn("Coordination utils not present. Aborting run id generation. Will continue execution without a run id.");
            return;
        }
        try {
            this.runIdGenerator = Optional.of(new RunIdGenerator(this.coordinationUtils.get(), getMetadataStoreForRunID()));
            this.runId = this.runIdGenerator.flatMap((v0) -> {
                return v0.getRunId();
            });
        } catch (Exception e) {
            LOG.warn("Failed to generate run id. Will continue execution without a run id.", e);
        }
    }

    public Optional<String> getRunId() {
        return this.runId;
    }

    public void run(ExternalContext externalContext) {
        initializeRunId();
        try {
            List<JobConfig> prepareJobs = getPlanner().prepareJobs();
            if (prepareJobs.isEmpty()) {
                throw new SamzaException("No jobs to run.");
            }
            prepareJobs.forEach(jobConfig -> {
                LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
                MetadataStore createCoordinatorStreamStore = createCoordinatorStreamStore(jobConfig);
                if (createCoordinatorStreamStore != null) {
                    createCoordinatorStreamStore.init();
                }
                this.processors.add(Pair.of(createStreamProcessor(jobConfig, this.appDesc, streamProcessor -> {
                    return new LocalStreamProcessorLifecycleListener(streamProcessor, jobConfig);
                }, Optional.ofNullable(externalContext), createCoordinatorStreamStore), createCoordinatorStreamStore));
            });
            this.numProcessorsToStart.set(this.processors.size());
            this.processors.forEach(pair -> {
                ((StreamProcessor) pair.getLeft()).start();
            });
        } catch (Throwable th) {
            cleanup();
            this.appStatus = ApplicationStatus.unsuccessfulFinish(th);
            this.shutdownLatch.countDown();
            throw new SamzaException(String.format("Failed to start application: %s", new ApplicationConfig(this.appDesc.getConfig()).getGlobalAppId()), th);
        }
    }

    public void kill() {
        this.processors.forEach(pair -> {
            ((StreamProcessor) pair.getLeft()).stop();
            if (pair.getRight() != null) {
                ((MetadataStore) pair.getRight()).close();
            }
        });
        cleanup();
    }

    public ApplicationStatus status() {
        return this.appStatus;
    }

    public void waitForFinish() {
        waitForFinish(Duration.ofSeconds(0L));
    }

    public boolean waitForFinish(Duration duration) {
        long millis = duration.toMillis();
        boolean z = true;
        try {
            if (millis < 1) {
                this.shutdownLatch.await();
            } else {
                z = this.shutdownLatch.await(millis, TimeUnit.MILLISECONDS);
                if (!z) {
                    LOG.warn("Timed out waiting for application to finish.");
                }
            }
            cleanup();
            return z;
        } catch (Exception e) {
            LOG.error("Error waiting for application to finish", e);
            throw new SamzaException(e);
        }
    }

    @VisibleForTesting
    protected Set<StreamProcessor> getProcessors() {
        return (Set) this.processors.stream().map((v0) -> {
            return v0.getLeft();
        }).collect(Collectors.toSet());
    }

    @VisibleForTesting
    CountDownLatch getShutdownLatch() {
        return this.shutdownLatch;
    }

    @VisibleForTesting
    MetadataStore createCoordinatorStreamStore(Config config) {
        if (this.metadataStoreFactory.isPresent()) {
            if (!(this.metadataStoreFactory.get() instanceof CoordinatorStreamMetadataStoreFactory)) {
                MetadataStore metadataStore = this.metadataStoreFactory.get().getMetadataStore("NoOp", config, new MetricsRegistryMap());
                LOG.info("Created alternative coordinator stream store of type: {}", metadataStore.getClass().getSimpleName());
                return metadataStore;
            }
            if (createUnderlyingCoordinatorStream(config)) {
                MetadataStore metadataStore2 = this.metadataStoreFactory.get().getMetadataStore("NoOp", config, new MetricsRegistryMap());
                LOG.info("Created coordinator stream store of type: {}", metadataStore2.getClass().getSimpleName());
                return metadataStore2;
            }
        }
        LOG.warn("No coordinator stream store created.");
        return null;
    }

    @VisibleForTesting
    boolean createUnderlyingCoordinatorStream(Config config) {
        if (new JobConfig(config).getCoordinatorSystemNameOrNull() == null) {
            LOG.warn("{} or {} not configured. Coordinator stream not created.", JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);
            return false;
        }
        SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
        SystemAdmins systemAdmins = new SystemAdmins(config, getClass().getSimpleName());
        systemAdmins.start();
        try {
            CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem()));
            systemAdmins.stop();
            return true;
        } catch (Throwable th) {
            systemAdmins.stop();
            throw th;
        }
    }

    @VisibleForTesting
    StreamProcessor createStreamProcessor(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> applicationDescriptorImpl, StreamProcessor.StreamProcessorLifecycleListenerFactory streamProcessorLifecycleListenerFactory, Optional<ExternalContext> optional, MetadataStore metadataStore) {
        TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(applicationDescriptorImpl);
        HashMap hashMap = new HashMap();
        String createProcessorId = createProcessorId(new ApplicationConfig(config));
        applicationDescriptorImpl.getMetricsReporterFactories().forEach((str, metricsReporterFactory) -> {
        });
        return new StreamProcessor(createProcessorId, config, hashMap, taskFactory, applicationDescriptorImpl.getApplicationContainerContextFactory(), applicationDescriptorImpl.getApplicationTaskContextFactory(), optional, streamProcessorLifecycleListenerFactory, null, metadataStore);
    }

    @VisibleForTesting
    static String createProcessorId(ApplicationConfig applicationConfig) {
        if (StringUtils.isNotBlank(applicationConfig.getProcessorId())) {
            return applicationConfig.getProcessorId();
        }
        if (StringUtils.isNotBlank(applicationConfig.getAppProcessorIdGeneratorClass())) {
            return ((ProcessorIdGenerator) ReflectionUtil.getObj(applicationConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class)).generateProcessorId(applicationConfig);
        }
        throw new ConfigException(String.format("Expected either %s or %s to be configured", "processor.id", ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanup() {
        this.runIdGenerator.ifPresent((v0) -> {
            v0.close();
        });
        this.coordinationUtils.ifPresent((v0) -> {
            v0.close();
        });
    }

    private MetadataStore getMetadataStoreForRunID() {
        return ((MetadataStoreFactory) ReflectionUtil.getObj((String) this.appDesc.getConfig().getOrDefault("metadata.store.factory", DEFAULT_METADATA_STORE_FACTORY), MetadataStoreFactory.class)).getMetadataStore(RUN_ID_METADATA_STORE, this.appDesc.getConfig(), new MetricsRegistryMap());
    }
}
