package org.apache.samza.processor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.container.SamzaContainerListener;
import org.apache.samza.context.ApplicationContainerContext;
import org.apache.samza.context.ApplicationContainerContextFactory;
import org.apache.samza.context.ApplicationTaskContext;
import org.apache.samza.context.ApplicationTaskContextFactory;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.context.JobContextImpl;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.drain.DrainMonitor;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.runtime.ProcessorLifecycleListener;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.task.TaskFactory;
import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.ScalaJavaUtil;
import org.apache.samza.util.ThreadUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/samza/processor/StreamProcessor.class */
public class StreamProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamProcessor.class);
    private static final String CONTAINER_THREAD_NAME_FORMAT = "Samza StreamProcessor Container Thread-%d";
    private final JobCoordinator jobCoordinator;
    private final ProcessorLifecycleListener processorListener;
    private final TaskFactory taskFactory;
    private final Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> applicationDefinedContainerContextFactoryOptional;
    private final Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional;
    private final Optional<ExternalContext> externalContextOptional;
    private final Map<String, MetricsReporter> customMetricsReporter;
    private final Config config;
    private final long taskShutdownMs;
    private final String processorId;
    private final Object lock;
    private final MetricsRegistryMap metricsRegistry;
    private final MetadataStore metadataStore;
    private volatile Throwable containerException;
    volatile CountDownLatch containerShutdownLatch;

    @VisibleForTesting
    State state;

    @VisibleForTesting
    SamzaContainer container;

    @VisibleForTesting
    JobCoordinatorListener jobCoordinatorListener;

    @VisibleForTesting
    ExecutorService containerExecutorService;

    /* loaded from: input_file:org/apache/samza/processor/StreamProcessor$ContainerListener.class */
    class ContainerListener implements SamzaContainerListener {
        private boolean processorOnStartCalled = false;

        ContainerListener() {
        }

        @Override // org.apache.samza.container.SamzaContainerListener
        public void beforeStart() {
        }

        @Override // org.apache.samza.container.SamzaContainerListener
        public void afterStart() {
            StreamProcessor.LOGGER.warn("Received container start notification for container: {} in stream processor: {}.", StreamProcessor.this.container, StreamProcessor.this.processorId);
            if (!this.processorOnStartCalled) {
                StreamProcessor.this.processorListener.afterStart();
                this.processorOnStartCalled = true;
            }
            StreamProcessor.this.state = State.RUNNING;
        }

        @Override // org.apache.samza.container.SamzaContainerListener
        public void afterStop() {
            StreamProcessor.this.containerShutdownLatch.countDown();
            synchronized (StreamProcessor.this.lock) {
                if (StreamProcessor.this.state == State.IN_REBALANCE) {
                    StreamProcessor.LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", StreamProcessor.this.container, StreamProcessor.this.processorId);
                } else {
                    StreamProcessor.LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", StreamProcessor.this.container, StreamProcessor.this.processorId);
                    StreamProcessor.this.state = State.STOPPING;
                    StreamProcessor.this.jobCoordinator.stop();
                }
            }
        }

        @Override // org.apache.samza.container.SamzaContainerListener
        public void afterFailure(Throwable th) {
            StreamProcessor.this.containerException = th;
            StreamProcessor.this.containerShutdownLatch.countDown();
            synchronized (StreamProcessor.this.lock) {
                StreamProcessor.LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", StreamProcessor.this.container, StreamProcessor.this.processorId), th);
                StreamProcessor.this.state = State.STOPPING;
                StreamProcessor.this.jobCoordinator.stop();
            }
        }
    }

    /* loaded from: input_file:org/apache/samza/processor/StreamProcessor$State.class */
    public enum State {
        STARTED("STARTED"),
        RUNNING("RUNNING"),
        STOPPING("STOPPING"),
        STOPPED("STOPPED"),
        NEW("NEW"),
        IN_REBALANCE("IN_REBALANCE");

        private String strVal;

        State(String str) {
            this.strVal = str;
        }

        @Override // java.lang.Enum
        public String toString() {
            return this.strVal;
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/samza/processor/StreamProcessor$StreamProcessorLifecycleListenerFactory.class */
    public interface StreamProcessorLifecycleListenerFactory {
        ProcessorLifecycleListener createInstance(StreamProcessor streamProcessor);
    }

    public State getState() {
        return this.state;
    }

    @Deprecated
    public StreamProcessor(String str, Config config, Map<String, MetricsReporter> map, TaskFactory taskFactory, ProcessorLifecycleListener processorLifecycleListener) {
        this(str, config, map, taskFactory, processorLifecycleListener, null);
    }

    @Deprecated
    public StreamProcessor(String str, Config config, Map<String, MetricsReporter> map, TaskFactory taskFactory, ProcessorLifecycleListener processorLifecycleListener, JobCoordinator jobCoordinator) {
        this(str, config, map, taskFactory, Optional.empty(), Optional.empty(), Optional.empty(), streamProcessor -> {
            return processorLifecycleListener;
        }, jobCoordinator, null);
    }

    @Deprecated
    public StreamProcessor(String str, Config config, Map<String, MetricsReporter> map, TaskFactory taskFactory, Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> optional, Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> optional2, Optional<ExternalContext> optional3, StreamProcessorLifecycleListenerFactory streamProcessorLifecycleListenerFactory, JobCoordinator jobCoordinator) {
        this(str, config, map, taskFactory, optional, optional2, optional3, streamProcessorLifecycleListenerFactory, jobCoordinator, null);
    }

    public StreamProcessor(String str, Config config, Map<String, MetricsReporter> map, TaskFactory taskFactory, Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> optional, Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> optional2, Optional<ExternalContext> optional3, StreamProcessorLifecycleListenerFactory streamProcessorLifecycleListenerFactory, JobCoordinator jobCoordinator, MetadataStore metadataStore) {
        this.lock = new Object();
        this.containerException = null;
        this.containerShutdownLatch = new CountDownLatch(1);
        this.state = State.NEW;
        this.container = null;
        this.jobCoordinatorListener = null;
        Preconditions.checkNotNull(streamProcessorLifecycleListenerFactory, "StreamProcessorListenerFactory cannot be null.");
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "ProcessorId cannot be null.");
        this.config = config;
        this.processorId = str;
        this.metricsRegistry = new MetricsRegistryMap();
        this.customMetricsReporter = map;
        Iterator<MetricsReporter> it = this.customMetricsReporter.values().iterator();
        while (it.hasNext()) {
            it.next().register("StreamProcessor", this.metricsRegistry);
        }
        this.taskFactory = taskFactory;
        this.applicationDefinedContainerContextFactoryOptional = optional;
        this.applicationDefinedTaskContextFactoryOptional = optional2;
        this.externalContextOptional = optional3;
        this.taskShutdownMs = new TaskConfig(config).getShutdownMs();
        this.metadataStore = metadataStore;
        this.jobCoordinator = jobCoordinator != null ? jobCoordinator : createJobCoordinator(config, str, this.metricsRegistry, metadataStore);
        this.jobCoordinatorListener = createJobCoordinatorListener();
        this.jobCoordinator.setListener(this.jobCoordinatorListener);
        this.containerExecutorService = createExecutorService();
        this.processorListener = streamProcessorLifecycleListenerFactory.createInstance(this);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ExecutorService createExecutorService() {
        return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build());
    }

    public void start() {
        synchronized (this.lock) {
            if (this.state == State.NEW) {
                this.processorListener.beforeStart();
                this.state = State.STARTED;
                this.jobCoordinator.start();
            } else {
                LOGGER.info("Start is no-op, since the current state is {} and not {}.", this.state, State.NEW);
            }
        }
    }

    public void stop() {
        synchronized (this.lock) {
            LOGGER.info("Stopping the stream processor: {}.", this.processorId);
            if (this.state == State.STOPPING || this.state == State.STOPPED) {
                LOGGER.info("StreamProcessor state is: {}. Ignoring the stop.", this.state);
            } else {
                this.state = State.STOPPING;
                try {
                    LOGGER.info("Shutting down the container: {} of stream processor: {}.", this.container, this.processorId);
                    if (!stopSamzaContainer()) {
                        LOGGER.info("Interrupting the container: {} thread to die.", this.container);
                        this.containerExecutorService.shutdownNow();
                    }
                } catch (Throwable th) {
                    LOGGER.error(String.format("Exception occurred on container: %s shutdown of stream processor: %s.", this.container, this.processorId), th);
                }
                LOGGER.info("Shutting down JobCoordinator of stream processor: {}.", this.processorId);
                this.jobCoordinator.stop();
            }
        }
    }

    @VisibleForTesting
    JobCoordinator getCurrentJobCoordinator() {
        return this.jobCoordinator;
    }

    @VisibleForTesting
    SamzaContainer getContainer() {
        return this.container;
    }

    @VisibleForTesting
    SamzaContainer createSamzaContainer(String str, JobModel jobModel) {
        Optional<DiagnosticsManager> buildDiagnosticsManager = DiagnosticsUtil.buildDiagnosticsManager(new JobConfig(this.config).getName().get(), new JobConfig(this.config).getJobId(), jobModel, str, Optional.empty(), Optional.empty(), this.config);
        StartpointManager startpointManager = null;
        if (this.metadataStore != null && new JobConfig(this.config).getStartpointEnabled()) {
            startpointManager = new StartpointManager(this.metadataStore);
        } else if (new JobConfig(this.config).getStartpointEnabled()) {
            LOGGER.warn("StartpointManager cannot be instantiated because no metadata store defined for this stream processor");
        } else {
            LOGGER.warn("StartpointManager not instantiated because startpoints is not enabled");
        }
        MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
        DrainMonitor drainMonitor = null;
        JobConfig jobConfig = new JobConfig(this.config);
        if (this.metadataStore != null && jobConfig.getDrainMonitorEnabled()) {
            drainMonitor = new DrainMonitor(this.metadataStore, this.config);
        }
        return SamzaContainer.apply(str, jobModel, ScalaJavaUtil.toScalaMap(this.customMetricsReporter), metricsRegistryMap, this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config, jobModel), Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)), Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)), Option.apply(this.externalContextOptional.orElse(null)), null, startpointManager, Option.apply(buildDiagnosticsManager.orElse(null)), drainMonitor);
    }

    private static JobCoordinator createJobCoordinator(Config config, String str, MetricsRegistry metricsRegistry, MetadataStore metadataStore) {
        return ((JobCoordinatorFactory) ReflectionUtil.getObj(new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName(), JobCoordinatorFactory.class)).getJobCoordinator(str, config, metricsRegistry, metadataStore);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean stopSamzaContainer() {
        boolean z = true;
        if (this.container != null) {
            try {
                this.container.shutdown();
                LOGGER.info("Waiting {} ms for the container: {} to shutdown.", Long.valueOf(this.taskShutdownMs), this.container);
                z = this.containerShutdownLatch.await(this.taskShutdownMs, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                LOGGER.error("Exception occurred when shutting down the container: {}.", this.container, e);
                z = false;
                if (this.containerException != null) {
                    this.containerException = e;
                }
            }
            LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %b.", this.container, this.processorId, Boolean.valueOf(z)));
        }
        if (!z) {
            ThreadUtil.logThreadDump("Thread dump at failure for stopping container in stream processor");
            if (this.containerException == null) {
                this.containerException = new TimeoutException("Container shutdown timed out after " + this.taskShutdownMs + " ms.");
            }
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean interruptContainerAndShutdownExecutorService() {
        try {
            this.containerExecutorService.shutdownNow();
            this.containerShutdownLatch.await(this.taskShutdownMs, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOGGER.info("Received an interrupt during interrupting container. Proceeding to check if the container callback decremented the shutdown latch. ");
        }
        return this.containerShutdownLatch.getCount() == 0;
    }

    private JobCoordinatorListener createJobCoordinatorListener() {
        return new JobCoordinatorListener() { // from class: org.apache.samza.processor.StreamProcessor.1
            @Override // org.apache.samza.coordinator.JobCoordinatorListener
            public void onJobModelExpired() {
                synchronized (StreamProcessor.this.lock) {
                    if (StreamProcessor.this.state == State.STARTED || StreamProcessor.this.state == State.RUNNING) {
                        StreamProcessor.this.state = State.IN_REBALANCE;
                        StreamProcessor.LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", StreamProcessor.this.container, StreamProcessor.this.processorId);
                        if (StreamProcessor.this.stopSamzaContainer()) {
                            StreamProcessor.LOGGER.info("Container: {} shutdown completed for stream processor: {}.", StreamProcessor.this.container, StreamProcessor.this.processorId);
                        } else {
                            StreamProcessor.LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", StreamProcessor.this.container, StreamProcessor.this.processorId);
                            StreamProcessor.this.state = State.STOPPING;
                            StreamProcessor.this.jobCoordinator.stop();
                        }
                    } else if (StreamProcessor.this.state != State.IN_REBALANCE) {
                        StreamProcessor.LOGGER.info("Ignoring onJobModelExpired invocation since the current state is {} and not in {}.", StreamProcessor.this.state, ImmutableList.of(State.RUNNING, State.STARTED, State.IN_REBALANCE));
                    } else if (StreamProcessor.this.container == null) {
                        StreamProcessor.LOGGER.info("Ignoring Job model expired since a rebalance is already in progress");
                    } else if (StreamProcessor.this.interruptContainerAndShutdownExecutorService()) {
                        StreamProcessor.this.containerExecutorService = StreamProcessor.this.createExecutorService();
                    } else {
                        StreamProcessor.LOGGER.warn("Job model expire unsuccessful. Failed to interrupt container: {} safely. Stopping the stream processor: {}", StreamProcessor.this.container, StreamProcessor.this.processorId);
                        StreamProcessor.this.state = State.STOPPING;
                        StreamProcessor.this.jobCoordinator.stop();
                    }
                }
            }

            @Override // org.apache.samza.coordinator.JobCoordinatorListener
            public void onNewJobModel(String str, JobModel jobModel) {
                synchronized (StreamProcessor.this.lock) {
                    if (StreamProcessor.this.state == State.IN_REBALANCE) {
                        StreamProcessor.this.containerShutdownLatch = new CountDownLatch(1);
                        StreamProcessor.this.container = StreamProcessor.this.createSamzaContainer(str, jobModel);
                        StreamProcessor.this.container.setContainerListener(new ContainerListener());
                        StreamProcessor.LOGGER.info("Starting the container: {} for the stream processor: {}.", StreamProcessor.this.container, str);
                        StreamProcessor.this.containerExecutorService.submit(StreamProcessor.this.container);
                    } else {
                        StreamProcessor.LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", StreamProcessor.this.state, State.IN_REBALANCE);
                    }
                }
            }

            @Override // org.apache.samza.coordinator.JobCoordinatorListener
            public void onCoordinatorStop() {
                synchronized (StreamProcessor.this.lock) {
                    StreamProcessor.LOGGER.info("Shutting down the executor service of the stream processor: {}.", StreamProcessor.this.processorId);
                    if (!StreamProcessor.this.stopSamzaContainer()) {
                        StreamProcessor.this.containerExecutorService.shutdownNow();
                    }
                    StreamProcessor.this.state = State.STOPPED;
                }
                if (StreamProcessor.this.containerException != null) {
                    StreamProcessor.this.processorListener.afterFailure(StreamProcessor.this.containerException);
                } else {
                    StreamProcessor.this.processorListener.afterStop();
                }
            }

            @Override // org.apache.samza.coordinator.JobCoordinatorListener
            public void onCoordinatorFailure(Throwable th) {
                synchronized (StreamProcessor.this.lock) {
                    StreamProcessor.LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", StreamProcessor.this.jobCoordinator, StreamProcessor.this.processorId), th);
                    if (!StreamProcessor.this.stopSamzaContainer()) {
                        StreamProcessor.this.containerExecutorService.shutdownNow();
                    }
                    StreamProcessor.this.state = State.STOPPED;
                }
                StreamProcessor.this.processorListener.afterFailure(th);
            }
        };
    }
}
