package org.apache.samza.runtime;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.Latch;
import org.apache.samza.coordinator.LeaderElector;
import org.apache.samza.execution.ExecutionPlan;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.processor.StreamProcessor;
import org.apache.samza.processor.StreamProcessorLifecycleListener;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.task.AsyncStreamTaskFactory;
import org.apache.samza.task.StreamTaskFactory;
import org.apache.samza.task.TaskFactoryUtil;
import org.apache.samza.zk.ZkCoordinationServiceFactory;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/runtime/LocalApplicationRunner.class */
public class LocalApplicationRunner extends AbstractApplicationRunner {
    private static final Logger log = LoggerFactory.getLogger(LocalApplicationRunner.class);
    private static final String INIT_LATCH_ID = "init";
    private static final long LATCH_TIMEOUT_MINUTES = 10;
    private final String uid;
    private final CoordinationUtils coordinationUtils;
    private final Set<StreamProcessor> processors;
    private final CountDownLatch shutdownLatch;
    private final AtomicInteger numProcessorsToStart;
    private final AtomicReference<Throwable> failure;
    private ApplicationStatus appStatus;

    /* loaded from: input_file:org/apache/samza/runtime/LocalApplicationRunner$LocalStreamProcessorLifeCycleListener.class */
    final class LocalStreamProcessorLifeCycleListener implements StreamProcessorLifecycleListener {
        StreamProcessor processor;

        LocalStreamProcessorLifeCycleListener() {
        }

        void setProcessor(StreamProcessor streamProcessor) {
            this.processor = streamProcessor;
        }

        @Override // org.apache.samza.processor.StreamProcessorLifecycleListener
        public void onStart() {
            if (LocalApplicationRunner.this.numProcessorsToStart.decrementAndGet() == 0) {
                LocalApplicationRunner.this.appStatus = ApplicationStatus.Running;
            }
        }

        @Override // org.apache.samza.processor.StreamProcessorLifecycleListener
        public void onShutdown() {
            LocalApplicationRunner.this.processors.remove(this.processor);
            this.processor = null;
            if (LocalApplicationRunner.this.processors.isEmpty()) {
                shutdownAndNotify();
            }
        }

        @Override // org.apache.samza.processor.StreamProcessorLifecycleListener
        public void onFailure(Throwable th) {
            LocalApplicationRunner.this.processors.remove(this.processor);
            this.processor = null;
            if (LocalApplicationRunner.this.failure.compareAndSet(null, th)) {
                LocalApplicationRunner.this.processors.forEach((v0) -> {
                    v0.stop();
                });
            }
            if (LocalApplicationRunner.this.processors.isEmpty()) {
                shutdownAndNotify();
            }
        }

        private void shutdownAndNotify() {
            if (LocalApplicationRunner.this.failure.get() != null) {
                LocalApplicationRunner.this.appStatus = ApplicationStatus.unsuccessfulFinish((Throwable) LocalApplicationRunner.this.failure.get());
            } else if (LocalApplicationRunner.this.appStatus == ApplicationStatus.Running) {
                LocalApplicationRunner.this.appStatus = ApplicationStatus.SuccessfulFinish;
            } else if (LocalApplicationRunner.this.appStatus == ApplicationStatus.New) {
                LocalApplicationRunner.this.appStatus = ApplicationStatus.UnsuccessfulFinish;
            }
            if (LocalApplicationRunner.this.coordinationUtils != null) {
                LocalApplicationRunner.this.coordinationUtils.reset();
            }
            LocalApplicationRunner.this.shutdownLatch.countDown();
        }
    }

    public LocalApplicationRunner(Config config) {
        super(config);
        this.processors = ConcurrentHashMap.newKeySet();
        this.shutdownLatch = new CountDownLatch(1);
        this.numProcessorsToStart = new AtomicInteger();
        this.failure = new AtomicReference<>();
        this.appStatus = ApplicationStatus.New;
        this.uid = UUID.randomUUID().toString();
        this.coordinationUtils = createCoordinationUtils();
    }

    public void run(StreamApplication streamApplication) {
        try {
            ExecutionPlan executionPlan = getExecutionPlan(streamApplication);
            writePlanJsonFile(executionPlan.getPlanAsJson());
            createStreams(executionPlan.getIntermediateStreams());
            if (executionPlan.getJobConfigs().isEmpty()) {
                throw new SamzaException("No jobs to run.");
            }
            executionPlan.getJobConfigs().forEach(jobConfig -> {
                log.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
                LocalStreamProcessorLifeCycleListener localStreamProcessorLifeCycleListener = new LocalStreamProcessorLifeCycleListener();
                StreamProcessor createStreamProcessor = createStreamProcessor(jobConfig, streamApplication, localStreamProcessorLifeCycleListener);
                localStreamProcessorLifeCycleListener.setProcessor(createStreamProcessor);
                this.processors.add(createStreamProcessor);
            });
            this.numProcessorsToStart.set(this.processors.size());
            this.processors.forEach((v0) -> {
                v0.start();
            });
        } catch (Exception e) {
            throw new SamzaException("Failed to start application", e);
        }
    }

    public void kill(StreamApplication streamApplication) {
        this.processors.forEach((v0) -> {
            v0.stop();
        });
    }

    public ApplicationStatus status(StreamApplication streamApplication) {
        return this.appStatus;
    }

    public void waitForFinish() {
        try {
            this.shutdownLatch.await();
        } catch (Exception e) {
            log.error("Wait is interrupted by exception", e);
            throw new SamzaException(e);
        }
    }

    CoordinationUtils createCoordinationUtils() {
        if (!ZkJobCoordinatorFactory.class.getName().equals(this.config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ""))) {
            return null;
        }
        return new ZkCoordinationServiceFactory().getCoordinationService(new ApplicationConfig(this.config).getGlobalAppId(), this.uid, this.config);
    }

    void createStreams(List<StreamSpec> list) throws Exception {
        if (list.isEmpty()) {
            return;
        }
        if (this.coordinationUtils == null) {
            getStreamManager().createStreams(list);
            return;
        }
        Latch latch = this.coordinationUtils.getLatch(1, INIT_LATCH_ID);
        LeaderElector leaderElector = this.coordinationUtils.getLeaderElector();
        leaderElector.setLeaderElectorListener(() -> {
            getStreamManager().createStreams(list);
            latch.countDown();
        });
        leaderElector.tryBecomeLeader();
        latch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES);
    }

    StreamProcessor createStreamProcessor(Config config, StreamApplication streamApplication, StreamProcessorLifecycleListener streamProcessorLifecycleListener) {
        Object createTaskFactory = TaskFactoryUtil.createTaskFactory(config, streamApplication, this);
        if (createTaskFactory instanceof StreamTaskFactory) {
            return new StreamProcessor(config, (Map<String, MetricsReporter>) new HashMap(), (StreamTaskFactory) createTaskFactory, streamProcessorLifecycleListener);
        }
        if (createTaskFactory instanceof AsyncStreamTaskFactory) {
            return new StreamProcessor(config, (Map<String, MetricsReporter>) new HashMap(), (AsyncStreamTaskFactory) createTaskFactory, streamProcessorLifecycleListener);
        }
        throw new SamzaException(String.format("%s is not a valid task factory", createTaskFactory.getClass().getCanonicalName()));
    }
}
