package org.apache.samza.processor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.samza.SamzaContainerStatus;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.container.IllegalContainerStateException;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.container.SamzaContainerListener;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.task.AsyncStreamTaskFactory;
import org.apache.samza.task.StreamTaskFactory;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/samza/processor/StreamProcessor.class */
public class StreamProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamProcessor.class);
    private final JobCoordinator jobCoordinator;
    private final StreamProcessorLifecycleListener processorListener;
    private final Object taskFactory;
    private final Map<String, MetricsReporter> customMetricsReporter;
    private final Config config;
    private final long taskShutdownMs;
    private final String processorId;
    private ExecutorService executorService;
    private volatile SamzaContainer container;
    private volatile Throwable containerException;
    volatile CountDownLatch jcContainerShutdownLatch;
    private volatile boolean processorOnStartCalled;

    @VisibleForTesting
    JobCoordinatorListener jobCoordinatorListener;

    public StreamProcessor(Config config, Map<String, MetricsReporter> map, AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener streamProcessorLifecycleListener) {
        this(config, map, asyncStreamTaskFactory, streamProcessorLifecycleListener, null);
    }

    public StreamProcessor(Config config, Map<String, MetricsReporter> map, StreamTaskFactory streamTaskFactory, StreamProcessorLifecycleListener streamProcessorLifecycleListener) {
        this(config, map, streamTaskFactory, streamProcessorLifecycleListener, null);
    }

    JobCoordinator getJobCoordinator() {
        return ((JobCoordinatorFactory) Util.getObj(new JobCoordinatorConfig(this.config).getJobCoordinatorFactoryClassName())).getJobCoordinator(this.config);
    }

    @VisibleForTesting
    JobCoordinator getCurrentJobCoordinator() {
        return this.jobCoordinator;
    }

    StreamProcessor(Config config, Map<String, MetricsReporter> map, Object obj, StreamProcessorLifecycleListener streamProcessorLifecycleListener, JobCoordinator jobCoordinator) {
        this.container = null;
        this.containerException = null;
        this.processorOnStartCalled = false;
        this.jobCoordinatorListener = null;
        this.taskFactory = obj;
        this.config = config;
        this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
        this.customMetricsReporter = map;
        this.processorListener = streamProcessorLifecycleListener;
        this.jobCoordinator = jobCoordinator != null ? jobCoordinator : getJobCoordinator();
        this.jobCoordinatorListener = createJobCoordinatorListener();
        this.jobCoordinator.setListener(this.jobCoordinatorListener);
        this.processorId = this.jobCoordinator.getProcessorId();
    }

    public void start() {
        this.jobCoordinator.start();
    }

    public synchronized void stop() {
        boolean z = false;
        if (this.container != null) {
            try {
                LOGGER.info("Shutting down container " + this.container.toString() + " from StreamProcessor");
                this.container.shutdown();
                z = true;
            } catch (IllegalContainerStateException e) {
                LOGGER.info("Container was not running", e);
            }
        }
        if (z) {
            return;
        }
        LOGGER.info("Shutting down JobCoordinator from StreamProcessor");
        this.jobCoordinator.stop();
    }

    SamzaContainer createSamzaContainer(String str, JobModel jobModel) {
        return SamzaContainer.apply(str, jobModel, this.config, Util.javaMapAsScalaMap(this.customMetricsReporter), this.taskFactory);
    }

    JobCoordinatorListener createJobCoordinatorListener() {
        return new JobCoordinatorListener() { // from class: org.apache.samza.processor.StreamProcessor.1
            @Override // org.apache.samza.coordinator.JobCoordinatorListener
            public void onJobModelExpired() {
                if (StreamProcessor.this.container == null) {
                    StreamProcessor.LOGGER.debug("Container is not instantiated yet.");
                    return;
                }
                SamzaContainerStatus status = StreamProcessor.this.container.getStatus();
                if (!SamzaContainerStatus.NOT_STARTED.equals(status) && !SamzaContainerStatus.STARTED.equals(status)) {
                    StreamProcessor.LOGGER.debug("Container " + StreamProcessor.this.container.toString() + " is not running.");
                    return;
                }
                boolean z = false;
                try {
                    StreamProcessor.LOGGER.info("Shutting down container in onJobModelExpired for processor:" + StreamProcessor.this.processorId);
                    StreamProcessor.this.container.pause();
                    z = StreamProcessor.this.jcContainerShutdownLatch.await(StreamProcessor.this.taskShutdownMs, TimeUnit.MILLISECONDS);
                    StreamProcessor.LOGGER.info("ShutdownComplete=" + z);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    StreamProcessor.LOGGER.warn("Container shutdown was interrupted!" + StreamProcessor.this.container.toString(), e);
                } catch (IllegalContainerStateException e2) {
                    StreamProcessor.LOGGER.info("Container was not running.", e2);
                    z = true;
                }
                StreamProcessor.LOGGER.info("Shutting down container done for pid=" + StreamProcessor.this.processorId + "; complete =" + z);
                if (z) {
                    StreamProcessor.LOGGER.debug("Container " + StreamProcessor.this.container.toString() + " shutdown successfully");
                    return;
                }
                StreamProcessor.LOGGER.warn("Container " + StreamProcessor.this.container.toString() + " may not have shutdown successfully. Stopping the processor.");
                StreamProcessor.this.container = null;
                StreamProcessor.this.stop();
            }

            @Override // org.apache.samza.coordinator.JobCoordinatorListener
            public void onNewJobModel(String str, JobModel jobModel) {
                StreamProcessor.this.jcContainerShutdownLatch = new CountDownLatch(1);
                SamzaContainerListener samzaContainerListener = new SamzaContainerListener() { // from class: org.apache.samza.processor.StreamProcessor.1.1
                    @Override // org.apache.samza.container.SamzaContainerListener
                    public void onContainerStart() {
                        if (StreamProcessor.this.processorOnStartCalled) {
                            StreamProcessor.LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time.");
                            return;
                        }
                        StreamProcessor.this.processorOnStartCalled = true;
                        if (StreamProcessor.this.processorListener != null) {
                            StreamProcessor.this.processorListener.onStart();
                        }
                    }

                    @Override // org.apache.samza.container.SamzaContainerListener
                    public void onContainerStop(boolean z) {
                        if (!z) {
                            StreamProcessor.LOGGER.info("Container " + StreamProcessor.this.container.toString() + " stopped.");
                            StreamProcessor.this.container = null;
                            StreamProcessor.this.stop();
                        } else {
                            StreamProcessor.LOGGER.info("Container " + StreamProcessor.this.container.toString() + " stopped due to a request from JobCoordinator.");
                            if (StreamProcessor.this.jcContainerShutdownLatch != null) {
                                StreamProcessor.this.jcContainerShutdownLatch.countDown();
                            }
                        }
                    }

                    @Override // org.apache.samza.container.SamzaContainerListener
                    public void onContainerFailed(Throwable th) {
                        if (StreamProcessor.this.jcContainerShutdownLatch != null) {
                            StreamProcessor.this.jcContainerShutdownLatch.countDown();
                        } else {
                            StreamProcessor.LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
                        }
                        StreamProcessor.this.containerException = th;
                        StreamProcessor.LOGGER.error("Container failed. Stopping the processor.", StreamProcessor.this.containerException);
                        StreamProcessor.this.container = null;
                        StreamProcessor.this.stop();
                    }
                };
                StreamProcessor.this.container = StreamProcessor.this.createSamzaContainer(str, jobModel);
                StreamProcessor.this.container.setContainerListener(samzaContainerListener);
                StreamProcessor.LOGGER.info("Starting container " + StreamProcessor.this.container.toString());
                StreamProcessor.this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("p-" + str + "-container-thread-%d").build());
                ExecutorService executorService = StreamProcessor.this.executorService;
                SamzaContainer samzaContainer = StreamProcessor.this.container;
                samzaContainer.getClass();
                executorService.submit(samzaContainer::run);
            }

            @Override // org.apache.samza.coordinator.JobCoordinatorListener
            public void onCoordinatorStop() {
                if (StreamProcessor.this.executorService != null) {
                    StreamProcessor.LOGGER.info("Shutting down the executor service.");
                    StreamProcessor.this.executorService.shutdownNow();
                }
                if (StreamProcessor.this.processorListener != null) {
                    if (StreamProcessor.this.containerException != null) {
                        StreamProcessor.this.processorListener.onFailure(StreamProcessor.this.containerException);
                    } else {
                        StreamProcessor.this.processorListener.onShutdown();
                    }
                }
            }

            @Override // org.apache.samza.coordinator.JobCoordinatorListener
            public void onCoordinatorFailure(Throwable th) {
                StreamProcessor.LOGGER.info("Coordinator Failed. Stopping the processor.");
                StreamProcessor.this.stop();
                if (StreamProcessor.this.processorListener != null) {
                    StreamProcessor.this.processorListener.onFailure(th);
                }
            }
        };
    }

    SamzaContainer getContainer() {
        return this.container;
    }
}
