package org.apache.samza.container;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.scheduler.EpochTimeScheduler;
import org.apache.samza.system.DrainMessage;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.MessageType;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.CoordinatorRequests;
import org.apache.samza.task.ReadableCoordinator;
import org.apache.samza.task.TaskCallback;
import org.apache.samza.task.TaskCallbackFactory;
import org.apache.samza.task.TaskCallbackImpl;
import org.apache.samza.task.TaskCallbackListener;
import org.apache.samza.task.TaskCallbackManager;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.HighResolutionClock;
import org.apache.samza.util.Throttleable;
import org.apache.samza.util.ThrottlingScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/container/RunLoop.class */
public class RunLoop implements Runnable, Throttleable {
    private static final Logger log = LoggerFactory.getLogger(RunLoop.class);
    private final List<AsyncTaskWorker> taskWorkers;
    private final SystemConsumers consumerMultiplexer;
    private final Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToTaskWorkerMapping;
    private final ExecutorService threadPool;
    private final CoordinatorRequests coordinatorRequests;
    private final Object latch;
    private final int maxConcurrency;
    private final long windowMs;
    private final long commitMs;
    private final long callbackTimeoutMs;
    private final long drainCallbackTimeoutMs;
    private final long maxIdleMs;
    private final SamzaContainerMetrics containerMetrics;
    private final ScheduledExecutorService workerTimer;
    private final ScheduledExecutorService callbackTimer;
    private final ThrottlingScheduler callbackExecutor;
    private volatile boolean shutdownNow;
    private volatile Throwable throwable;
    private final HighResolutionClock clock;
    private boolean isAsyncCommitEnabled;
    private volatile boolean runLoopResumedSinceLastChecked;
    private final int elasticityFactor;
    private final String runId;
    private final boolean isHighLevelApiJob;
    private boolean isDraining;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.samza.container.RunLoop$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/samza/container/RunLoop$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$samza$container$RunLoop$WorkerOp = new int[WorkerOp.values().length];

        static {
            try {
                $SwitchMap$org$apache$samza$container$RunLoop$WorkerOp[WorkerOp.PROCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$samza$container$RunLoop$WorkerOp[WorkerOp.WINDOW.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$samza$container$RunLoop$WorkerOp[WorkerOp.SCHEDULER.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$samza$container$RunLoop$WorkerOp[WorkerOp.COMMIT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$samza$container$RunLoop$WorkerOp[WorkerOp.END_OF_STREAM.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$samza$container$RunLoop$WorkerOp[WorkerOp.DRAIN.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/container/RunLoop$AsyncTaskState.class */
    public final class AsyncTaskState {
        private volatile boolean needWindow = false;
        private volatile boolean needCommit = false;
        private volatile boolean needScheduler = false;
        private volatile boolean complete = false;
        private volatile boolean endOfStream = false;
        private volatile boolean shouldDrain = false;
        private volatile boolean windowInFlight = false;
        private volatile boolean commitInFlight = false;
        private volatile boolean schedulerInFlight = false;
        private final AtomicInteger messagesInFlight = new AtomicInteger(0);
        private final ArrayDeque<PendingEnvelope> pendingEnvelopeQueue = new ArrayDeque<>();
        private final Set<SystemStreamPartition> processingSspSet;
        private final Set<SystemStreamPartition> processingSspSetToDrain;
        private final TaskName taskName;
        private final TaskInstanceMetrics taskMetrics;
        private final boolean hasIntermediateStreams;
        private final boolean isHighLevelApiJob;
        private final String runId;

        AsyncTaskState(TaskName taskName, TaskInstanceMetrics taskInstanceMetrics, Set<SystemStreamPartition> set, boolean z, boolean z2, String str) {
            this.taskName = taskName;
            this.taskMetrics = taskInstanceMetrics;
            this.processingSspSet = set;
            this.processingSspSetToDrain = new HashSet(set);
            this.hasIntermediateStreams = z;
            this.isHighLevelApiJob = z2;
            this.runId = str;
        }

        private boolean checkEndOfStream() {
            if (this.pendingEnvelopeQueue.size() == 1) {
                IncomingMessageEnvelope incomingMessageEnvelope = this.pendingEnvelopeQueue.peek().envelope;
                if (incomingMessageEnvelope.isEndOfStream()) {
                    this.processingSspSet.remove(incomingMessageEnvelope.getSystemStreamPartition(RunLoop.this.elasticityFactor));
                    if (!this.hasIntermediateStreams) {
                        this.pendingEnvelopeQueue.remove();
                    }
                }
            }
            return this.processingSspSet.isEmpty();
        }

        private boolean shouldDrain() {
            if (this.endOfStream) {
                return false;
            }
            if (this.pendingEnvelopeQueue.size() > 0) {
                IncomingMessageEnvelope incomingMessageEnvelope = this.pendingEnvelopeQueue.peek().envelope;
                if (incomingMessageEnvelope.isDrain()) {
                    if (((DrainMessage) incomingMessageEnvelope.getMessage()).getRunId().equals(this.runId)) {
                        if (!RunLoop.this.isDraining) {
                            RunLoop.this.drain();
                        }
                        if (!this.isHighLevelApiJob) {
                            this.processingSspSetToDrain.remove(incomingMessageEnvelope.getSystemStreamPartition());
                            this.pendingEnvelopeQueue.remove();
                        }
                    } else {
                        RunLoop.this.consumerMultiplexer.tryUpdate(this.pendingEnvelopeQueue.remove().envelope.getSystemStreamPartition());
                    }
                }
            }
            return this.processingSspSetToDrain.isEmpty();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isReady() {
            if (checkEndOfStream()) {
                this.endOfStream = true;
            }
            if (shouldDrain()) {
                this.shouldDrain = true;
            }
            if (RunLoop.this.coordinatorRequests.commitRequests().remove(this.taskName)) {
                this.needCommit = true;
            }
            boolean z = this.windowInFlight || this.commitInFlight || this.schedulerInFlight;
            return this.needCommit ? (this.messagesInFlight.get() == 0 || RunLoop.this.isAsyncCommitEnabled) && !z : (this.needWindow || this.needScheduler || this.endOfStream || this.shouldDrain) ? this.messagesInFlight.get() == 0 && !z : this.messagesInFlight.get() < RunLoop.this.maxConcurrency && !this.windowInFlight && !this.schedulerInFlight && (RunLoop.this.isAsyncCommitEnabled || !this.commitInFlight);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public WorkerOp nextOp() {
            if (this.complete) {
                return WorkerOp.NO_OP;
            }
            if (isReady()) {
                if (this.needCommit) {
                    return WorkerOp.COMMIT;
                }
                if (this.needWindow) {
                    return WorkerOp.WINDOW;
                }
                if (this.needScheduler) {
                    return WorkerOp.SCHEDULER;
                }
                if (this.endOfStream && this.pendingEnvelopeQueue.isEmpty()) {
                    return WorkerOp.END_OF_STREAM;
                }
                if (this.shouldDrain && this.pendingEnvelopeQueue.isEmpty()) {
                    return WorkerOp.DRAIN;
                }
                if (!this.pendingEnvelopeQueue.isEmpty()) {
                    return WorkerOp.PROCESS;
                }
            }
            return WorkerOp.NO_OP;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void needWindow() {
            this.needWindow = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void needCommit() {
            this.needCommit = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void needScheduler() {
            this.needScheduler = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startWindow() {
            this.needWindow = false;
            this.windowInFlight = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startDrain() {
            this.shouldDrain = false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startCommit() {
            this.needCommit = false;
            this.commitInFlight = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startProcess() {
            this.taskMetrics.messagesInFlight().set(Integer.valueOf(this.messagesInFlight.incrementAndGet()));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startScheduler() {
            this.needScheduler = false;
            this.schedulerInFlight = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doneCommit() {
            this.commitInFlight = false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doneWindow() {
            this.windowInFlight = false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doneProcess() {
            this.taskMetrics.messagesInFlight().set(Integer.valueOf(this.messagesInFlight.decrementAndGet()));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doneScheduler() {
            this.schedulerInFlight = false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void insertEnvelope(PendingEnvelope pendingEnvelope) {
            this.pendingEnvelopeQueue.add(pendingEnvelope);
            int size = this.pendingEnvelopeQueue.size();
            this.taskMetrics.pendingMessages().set(Integer.valueOf(size));
            RunLoop.log.trace("Insert envelope to task {} queue.", this.taskName);
            RunLoop.log.trace("Insert envelope to task {} queue.", this.taskName);
            RunLoop.log.debug("Task {} pending envelope count is {} after insertion.", this.taskName, Integer.valueOf(size));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public IncomingMessageEnvelope fetchEnvelope() {
            PendingEnvelope remove = this.pendingEnvelopeQueue.remove();
            int size = this.pendingEnvelopeQueue.size();
            this.taskMetrics.pendingMessages().set(Integer.valueOf(size));
            RunLoop.log.trace("fetch envelope ssp {} offset {} to process.", remove.envelope.getSystemStreamPartition(RunLoop.this.elasticityFactor), remove.envelope.getOffset());
            RunLoop.log.debug("Task {} pending envelopes count is {} after fetching.", this.taskName, Integer.valueOf(size));
            if (remove.markProcessed()) {
                SystemStreamPartition systemStreamPartition = remove.envelope.getSystemStreamPartition(RunLoop.this.elasticityFactor);
                RunLoop.this.consumerMultiplexer.tryUpdate(systemStreamPartition);
                RunLoop.log.debug("Update chooser for {}", systemStreamPartition);
            }
            return remove.envelope;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/container/RunLoop$AsyncTaskWorker.class */
    public class AsyncTaskWorker implements TaskCallbackListener {
        private final RunLoopTask task;
        private final TaskCallbackManager callbackManager;
        private volatile AsyncTaskState state;

        AsyncTaskWorker(RunLoopTask runLoopTask) {
            this.task = runLoopTask;
            this.callbackManager = new TaskCallbackManager(this, RunLoop.this.callbackTimer, RunLoop.this.callbackTimeoutMs, RunLoop.this.maxConcurrency, RunLoop.this.clock);
            this.state = new AsyncTaskState(runLoopTask.taskName(), runLoopTask.metrics(), getWorkingSSPSet(runLoopTask), !runLoopTask.intermediateStreams().isEmpty(), RunLoop.this.isHighLevelApiJob, RunLoop.this.runId);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void init() {
            if (this.task.isWindowableTask() && RunLoop.this.windowMs > 0) {
                RunLoop.this.workerTimer.scheduleAtFixedRate(new Runnable() { // from class: org.apache.samza.container.RunLoop.AsyncTaskWorker.1
                    @Override // java.lang.Runnable
                    public void run() {
                        RunLoop.log.trace("Task {} need window", AsyncTaskWorker.this.task.taskName());
                        AsyncTaskWorker.this.state.needWindow();
                        RunLoop.this.resume();
                    }
                }, RunLoop.this.windowMs, RunLoop.this.windowMs, TimeUnit.MILLISECONDS);
            }
            if (RunLoop.this.commitMs > 0) {
                RunLoop.this.workerTimer.scheduleAtFixedRate(new Runnable() { // from class: org.apache.samza.container.RunLoop.AsyncTaskWorker.2
                    @Override // java.lang.Runnable
                    public void run() {
                        RunLoop.log.trace("Task {} need commit", AsyncTaskWorker.this.task.taskName());
                        AsyncTaskWorker.this.state.needCommit();
                        RunLoop.this.resume();
                    }
                }, RunLoop.this.commitMs, RunLoop.this.commitMs, TimeUnit.MILLISECONDS);
            }
            EpochTimeScheduler epochTimeScheduler = this.task.epochTimeScheduler();
            if (epochTimeScheduler != null) {
                epochTimeScheduler.registerListener(() -> {
                    this.state.needScheduler();
                });
            }
        }

        private Set<SystemStreamPartition> getWorkingSSPSet(RunLoopTask runLoopTask) {
            return (Set) runLoopTask.systemStreamPartitions().stream().filter(systemStreamPartition -> {
                return !RunLoop.this.consumerMultiplexer.isEndOfStream(systemStreamPartition);
            }).collect(Collectors.toSet());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void run() {
            switch (AnonymousClass1.$SwitchMap$org$apache$samza$container$RunLoop$WorkerOp[this.state.nextOp().ordinal()]) {
                case 1:
                    process();
                    return;
                case CoordinatorStreamMessage.KEY_INDEX /* 2 */:
                    window();
                    return;
                case 3:
                    scheduler();
                    return;
                case 4:
                    commit();
                    return;
                case 5:
                    endOfStream();
                    return;
                case 6:
                    drain();
                    return;
                default:
                    return;
            }
        }

        private void drain() {
            this.state.complete = true;
            this.state.startDrain();
            try {
                ReadableCoordinator readableCoordinator = new ReadableCoordinator(this.task.taskName());
                this.task.drain(readableCoordinator);
                readableCoordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
                RunLoop.this.coordinatorRequests.update(readableCoordinator);
                commit();
            } finally {
                RunLoop.this.resumeAfterDrain();
            }
        }

        private void endOfStream() {
            this.state.complete = true;
            try {
                ReadableCoordinator readableCoordinator = new ReadableCoordinator(this.task.taskName());
                this.task.endOfStream(readableCoordinator);
                readableCoordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
                RunLoop.this.coordinatorRequests.update(readableCoordinator);
                if (RunLoop.this.coordinatorRequests.commitRequests().remove(this.task.taskName())) {
                    this.task.commit();
                }
            } finally {
                RunLoop.this.resume();
            }
        }

        private void process() {
            final IncomingMessageEnvelope fetchEnvelope = this.state.fetchEnvelope();
            RunLoop.log.trace("Process ssp {} offset {}", fetchEnvelope.getSystemStreamPartition(RunLoop.this.elasticityFactor), fetchEnvelope.getOffset());
            final ReadableCoordinator readableCoordinator = new ReadableCoordinator(this.task.taskName());
            this.task.process(fetchEnvelope, readableCoordinator, new TaskCallbackFactory() { // from class: org.apache.samza.container.RunLoop.AsyncTaskWorker.3
                @Override // org.apache.samza.task.TaskCallbackFactory
                public TaskCallback createCallback() {
                    AsyncTaskWorker.this.state.startProcess();
                    RunLoop.this.containerMetrics.processes().inc();
                    return (RunLoop.this.isDraining && (fetchEnvelope.isDrain() || fetchEnvelope.isWatermark())) ? AsyncTaskWorker.this.callbackManager.createCallbackForDrain(AsyncTaskWorker.this.task.taskName(), fetchEnvelope, readableCoordinator, RunLoop.this.drainCallbackTimeoutMs) : AsyncTaskWorker.this.callbackManager.createCallback(AsyncTaskWorker.this.task.taskName(), fetchEnvelope, readableCoordinator);
                }
            });
        }

        private void window() {
            this.state.startWindow();
            Runnable runnable = new Runnable() { // from class: org.apache.samza.container.RunLoop.AsyncTaskWorker.4
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            RunLoop.this.containerMetrics.windows().inc();
                            ReadableCoordinator readableCoordinator = new ReadableCoordinator(AsyncTaskWorker.this.task.taskName());
                            long nanoTime = RunLoop.this.clock.nanoTime();
                            AsyncTaskWorker.this.task.window(readableCoordinator);
                            RunLoop.this.containerMetrics.windowNs().update(RunLoop.this.clock.nanoTime() - nanoTime);
                            long millis = TimeUnit.NANOSECONDS.toMillis((long) RunLoop.this.containerMetrics.windowNs().getSnapshot().getAverage());
                            if (millis >= RunLoop.this.windowMs) {
                                RunLoop.log.warn("Average window call duration {} is greater than the configured task.window.ms {}. This can starve process calls, so consider setting task.window.ms >> {} ms.", new Object[]{Long.valueOf(millis), Long.valueOf(RunLoop.this.windowMs), Long.valueOf(millis)});
                            }
                            RunLoop.this.coordinatorRequests.update(readableCoordinator);
                            AsyncTaskWorker.this.state.doneWindow();
                            RunLoop.log.trace("Task {} window completed", AsyncTaskWorker.this.task.taskName());
                            RunLoop.this.resume();
                        } catch (Throwable th) {
                            RunLoop.log.error("Task {} window failed", AsyncTaskWorker.this.task.taskName(), th);
                            RunLoop.this.abort(th);
                            RunLoop.log.trace("Task {} window completed", AsyncTaskWorker.this.task.taskName());
                            RunLoop.this.resume();
                        }
                    } catch (Throwable th2) {
                        RunLoop.log.trace("Task {} window completed", AsyncTaskWorker.this.task.taskName());
                        RunLoop.this.resume();
                        throw th2;
                    }
                }
            };
            if (RunLoop.this.threadPool != null) {
                RunLoop.log.trace("Task {} window on the thread pool", this.task.taskName());
                RunLoop.this.threadPool.submit(runnable);
            } else {
                RunLoop.log.trace("Task {} window on the run loop thread", this.task.taskName());
                runnable.run();
            }
        }

        private void commit() {
            this.state.startCommit();
            Runnable runnable = new Runnable() { // from class: org.apache.samza.container.RunLoop.AsyncTaskWorker.5
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            RunLoop.this.containerMetrics.commits().inc();
                            long nanoTime = RunLoop.this.clock.nanoTime();
                            AsyncTaskWorker.this.task.commit();
                            RunLoop.this.containerMetrics.commitNs().update(RunLoop.this.clock.nanoTime() - nanoTime);
                            AsyncTaskWorker.this.state.doneCommit();
                            RunLoop.log.trace("Task {} commit completed", AsyncTaskWorker.this.task.taskName());
                            RunLoop.this.resume();
                        } catch (Throwable th) {
                            RunLoop.log.error("Task {} commit failed", AsyncTaskWorker.this.task.taskName(), th);
                            RunLoop.this.abort(th);
                            RunLoop.log.trace("Task {} commit completed", AsyncTaskWorker.this.task.taskName());
                            RunLoop.this.resume();
                        }
                    } catch (Throwable th2) {
                        RunLoop.log.trace("Task {} commit completed", AsyncTaskWorker.this.task.taskName());
                        RunLoop.this.resume();
                        throw th2;
                    }
                }
            };
            if (RunLoop.this.threadPool != null && !RunLoop.this.isDraining) {
                RunLoop.log.trace("Task {} commits on the thread pool", this.task.taskName());
                RunLoop.this.threadPool.submit(runnable);
            } else if (RunLoop.this.isDraining) {
                RunLoop.log.trace("Task {} commits on the run loop thread as task is draining", this.task.taskName());
                runnable.run();
            } else {
                RunLoop.log.trace("Task {} commits on the run loop thread", this.task.taskName());
                runnable.run();
            }
        }

        private void scheduler() {
            this.state.startScheduler();
            Runnable runnable = new Runnable() { // from class: org.apache.samza.container.RunLoop.AsyncTaskWorker.6
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            ReadableCoordinator readableCoordinator = new ReadableCoordinator(AsyncTaskWorker.this.task.taskName());
                            long nanoTime = RunLoop.this.clock.nanoTime();
                            AsyncTaskWorker.this.task.scheduler(readableCoordinator);
                            RunLoop.this.containerMetrics.timerNs().update(RunLoop.this.clock.nanoTime() - nanoTime);
                            RunLoop.this.coordinatorRequests.update(readableCoordinator);
                            AsyncTaskWorker.this.state.doneScheduler();
                            RunLoop.log.trace("Task {} scheduler completed", AsyncTaskWorker.this.task.taskName());
                            RunLoop.this.resume();
                        } catch (Throwable th) {
                            RunLoop.log.error("Task {} scheduler failed", AsyncTaskWorker.this.task.taskName(), th);
                            RunLoop.this.abort(th);
                            RunLoop.log.trace("Task {} scheduler completed", AsyncTaskWorker.this.task.taskName());
                            RunLoop.this.resume();
                        }
                    } catch (Throwable th2) {
                        RunLoop.log.trace("Task {} scheduler completed", AsyncTaskWorker.this.task.taskName());
                        RunLoop.this.resume();
                        throw th2;
                    }
                }
            };
            if (RunLoop.this.threadPool != null) {
                RunLoop.log.trace("Task {} scheduler runs on the thread pool", this.task.taskName());
                RunLoop.this.threadPool.submit(runnable);
            } else {
                RunLoop.log.trace("Task {} scheduler runs on the run loop thread", this.task.taskName());
                runnable.run();
            }
        }

        @Override // org.apache.samza.task.TaskCallbackListener
        public void onComplete(final TaskCallback taskCallback) {
            RunLoop.this.callbackExecutor.schedule(new Runnable() { // from class: org.apache.samza.container.RunLoop.AsyncTaskWorker.7
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            AsyncTaskWorker.this.state.doneProcess();
                            AsyncTaskWorker.this.state.taskMetrics.asyncCallbackCompleted().inc();
                            TaskCallbackImpl taskCallbackImpl = (TaskCallbackImpl) taskCallback;
                            RunLoop.this.containerMetrics.processNs().update(RunLoop.this.clock.nanoTime() - taskCallbackImpl.getTimeCreatedNs());
                            RunLoop.log.trace("Got callback complete for task {}, ssp {}", taskCallbackImpl.getTaskName(), taskCallbackImpl.getSystemStreamPartition());
                            for (TaskCallbackImpl taskCallbackImpl2 : AsyncTaskWorker.this.callbackManager.updateCallback(taskCallbackImpl)) {
                                RunLoop.log.trace("Update offset for ssp {}, offset {}", taskCallbackImpl2.getSystemStreamPartition(), taskCallbackImpl2.getOffset());
                                if (AsyncTaskWorker.this.task.offsetManager() != null) {
                                    AsyncTaskWorker.this.task.offsetManager().update(AsyncTaskWorker.this.task.taskName(), taskCallbackImpl2.getSystemStreamPartition(), taskCallbackImpl2.getOffset());
                                }
                                RunLoop.this.coordinatorRequests.update(taskCallbackImpl2.getCoordinator());
                            }
                        } catch (Throwable th) {
                            RunLoop.log.error("Error marking process as complete.", th);
                            RunLoop.this.abort(th);
                            RunLoop.this.resume();
                        }
                    } finally {
                        RunLoop.this.resume();
                    }
                }
            }, RunLoop.this.clock.nanoTime() - ((TaskCallbackImpl) taskCallback).getTimeCreatedNs());
        }

        @Override // org.apache.samza.task.TaskCallbackListener
        public void onFailure(TaskCallback taskCallback, Throwable th) {
            try {
                try {
                    RunLoop.this.abort(th);
                    this.state.doneProcess();
                    RunLoop.log.error("Got callback failure for task {}", ((TaskCallbackImpl) taskCallback).getTaskName(), th);
                    RunLoop.this.resume();
                } catch (Throwable th2) {
                    RunLoop.log.error("Error marking process as failed.", th2);
                    RunLoop.this.resume();
                }
            } catch (Throwable th3) {
                RunLoop.this.resume();
                throw th3;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/container/RunLoop$PendingEnvelope.class */
    public static final class PendingEnvelope {
        private final IncomingMessageEnvelope envelope;
        private boolean processed = false;

        PendingEnvelope(IncomingMessageEnvelope incomingMessageEnvelope) {
            this.envelope = incomingMessageEnvelope;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean markProcessed() {
            boolean z = this.processed;
            this.processed = true;
            return !z;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/container/RunLoop$WorkerOp.class */
    public enum WorkerOp {
        WINDOW,
        COMMIT,
        PROCESS,
        END_OF_STREAM,
        DRAIN,
        SCHEDULER,
        NO_OP
    }

    public RunLoop(Map<TaskName, RunLoopTask> map, ExecutorService executorService, SystemConsumers systemConsumers, int i, long j, long j2, long j3, long j4, long j5, long j6, SamzaContainerMetrics samzaContainerMetrics, HighResolutionClock highResolutionClock, boolean z) {
        this(map, executorService, systemConsumers, i, j, j2, j3, j4, j5, j6, samzaContainerMetrics, highResolutionClock, z, 1, null, false);
    }

    public RunLoop(Map<TaskName, RunLoopTask> map, ExecutorService executorService, SystemConsumers systemConsumers, int i, long j, long j2, long j3, long j4, long j5, long j6, SamzaContainerMetrics samzaContainerMetrics, HighResolutionClock highResolutionClock, boolean z, int i2, String str, boolean z2) {
        this.shutdownNow = false;
        this.throwable = null;
        this.isDraining = false;
        this.threadPool = executorService;
        this.consumerMultiplexer = systemConsumers;
        this.containerMetrics = samzaContainerMetrics;
        this.windowMs = j;
        this.commitMs = j2;
        this.maxConcurrency = i;
        this.callbackTimeoutMs = j3;
        this.drainCallbackTimeoutMs = j4;
        this.maxIdleMs = j6;
        this.callbackTimer = j3 > 0 ? Executors.newSingleThreadScheduledExecutor() : null;
        this.callbackExecutor = new ThrottlingScheduler(j5);
        this.coordinatorRequests = new CoordinatorRequests(map.keySet());
        this.latch = new Object();
        this.workerTimer = Executors.newSingleThreadScheduledExecutor();
        this.clock = highResolutionClock;
        this.runId = str;
        this.isHighLevelApiJob = z2;
        this.isAsyncCommitEnabled = z;
        this.elasticityFactor = i2;
        HashMap hashMap = new HashMap();
        for (RunLoopTask runLoopTask : map.values()) {
            hashMap.put(runLoopTask.taskName(), new AsyncTaskWorker(runLoopTask));
        }
        this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(map, hashMap));
        this.taskWorkers = Collections.unmodifiableList(new ArrayList(hashMap.values()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void drain() {
        log.info("Setting the RunLoop to drain mode.");
        this.isDraining = true;
        log.debug("Disabling async commit when the pipeline is draining.");
        this.isAsyncCommitEnabled = false;
    }

    private static Map<SystemStreamPartition, List<AsyncTaskWorker>> getSspToAsyncTaskWorkerMap(Map<TaskName, RunLoopTask> map, Map<TaskName, AsyncTaskWorker> map2) {
        HashMap hashMap = new HashMap();
        for (RunLoopTask runLoopTask : map.values()) {
            for (SystemStreamPartition systemStreamPartition : runLoopTask.systemStreamPartitions()) {
                hashMap.putIfAbsent(systemStreamPartition, new ArrayList());
                ((List) hashMap.get(systemStreamPartition)).add(map2.get(runLoopTask.taskName()));
            }
        }
        return hashMap;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            Iterator<AsyncTaskWorker> it = this.taskWorkers.iterator();
            while (it.hasNext()) {
                it.next().init();
            }
            long nanoTime = this.clock.nanoTime();
            while (!this.shutdownNow && this.throwable == null) {
                long nanoTime2 = this.clock.nanoTime();
                IncomingMessageEnvelope chooseEnvelope = chooseEnvelope();
                long nanoTime3 = this.clock.nanoTime();
                this.containerMetrics.chooseNs().update(nanoTime3 - nanoTime2);
                blockIfBusyOrNoNewWork(chooseEnvelope);
                long nanoTime4 = this.clock.nanoTime();
                this.containerMetrics.blockNs().update(nanoTime4 - nanoTime3);
                runTasks(chooseEnvelope);
                long nanoTime5 = this.clock.nanoTime();
                long j = nanoTime5 - nanoTime4;
                long j2 = nanoTime5 - nanoTime;
                nanoTime = nanoTime5;
                if (j2 != 0) {
                    this.containerMetrics.utilization().set(Double.valueOf(j / j2));
                }
            }
            if (this.throwable != null) {
                log.error("Caught throwable and stopping run loop", this.throwable);
                throw new SamzaException(this.throwable);
            }
        } finally {
            this.workerTimer.shutdown();
            this.callbackExecutor.shutdown();
            if (this.callbackTimer != null) {
                this.callbackTimer.shutdown();
            }
        }
    }

    @Override // org.apache.samza.util.Throttleable
    public void setWorkFactor(double d) {
        this.callbackExecutor.setWorkFactor(d);
    }

    @Override // org.apache.samza.util.Throttleable
    public double getWorkFactor() {
        return this.callbackExecutor.getWorkFactor();
    }

    public void shutdown() {
        this.shutdownNow = true;
        resume();
    }

    private IncomingMessageEnvelope chooseEnvelope() {
        IncomingMessageEnvelope choose = this.consumerMultiplexer.choose(false);
        if (choose != null) {
            log.trace("Choose envelope ssp {} offset {} for processing", choose.getSystemStreamPartition(this.elasticityFactor), choose.getOffset());
            this.containerMetrics.envelopes().inc();
        } else {
            log.trace("No envelope is available");
            this.containerMetrics.nullEnvelopes().inc();
        }
        return choose;
    }

    private void runTasks(IncomingMessageEnvelope incomingMessageEnvelope) {
        if (this.shutdownNow) {
            return;
        }
        if (incomingMessageEnvelope != null) {
            PendingEnvelope pendingEnvelope = new PendingEnvelope(incomingMessageEnvelope);
            List<AsyncTaskWorker> workersForEnvelope = getWorkersForEnvelope(incomingMessageEnvelope);
            if (workersForEnvelope != null) {
                Iterator<AsyncTaskWorker> it = workersForEnvelope.iterator();
                while (it.hasNext()) {
                    it.next().state.insertEnvelope(pendingEnvelope);
                }
            } else if (this.elasticityFactor > 1) {
                this.consumerMultiplexer.tryUpdate(incomingMessageEnvelope.getSystemStreamPartition(this.elasticityFactor));
                log.trace("updating the system consumers for ssp keyBucket {} not processed by this runloop", incomingMessageEnvelope.getSystemStreamPartition(this.elasticityFactor));
                this.containerMetrics.envelopes().dec();
                this.containerMetrics.skippedEnvelopes().inc();
            }
        }
        Iterator<AsyncTaskWorker> it2 = this.taskWorkers.iterator();
        while (it2.hasNext()) {
            it2.next().run();
        }
    }

    private List<AsyncTaskWorker> getWorkersForEnvelope(IncomingMessageEnvelope incomingMessageEnvelope) {
        if (this.elasticityFactor <= 1) {
            return this.sspToTaskWorkerMapping.get(incomingMessageEnvelope.getSystemStreamPartition());
        }
        SystemStreamPartition systemStreamPartition = incomingMessageEnvelope.getSystemStreamPartition(this.elasticityFactor);
        MessageType of = MessageType.of(incomingMessageEnvelope.getMessage());
        return (incomingMessageEnvelope.isEndOfStream() || incomingMessageEnvelope.isDrain() || of == MessageType.END_OF_STREAM || of == MessageType.DRAIN || of == MessageType.WATERMARK) ? (List) this.sspToTaskWorkerMapping.entrySet().stream().filter(entry -> {
            return ((SystemStreamPartition) entry.getKey()).getSystemStream().equals(systemStreamPartition.getSystemStream()) && ((SystemStreamPartition) entry.getKey()).getPartition().equals(systemStreamPartition.getPartition());
        }).map(entry2 -> {
            return (List) entry2.getValue();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList()) : this.sspToTaskWorkerMapping.get(systemStreamPartition);
    }

    private void blockIfBusyOrNoNewWork(IncomingMessageEnvelope incomingMessageEnvelope) {
        synchronized (this.latch) {
            if (incomingMessageEnvelope == null) {
                if (!this.runLoopResumedSinceLastChecked) {
                    try {
                        log.trace("Start no work wait");
                        this.latch.wait(this.maxIdleMs);
                        log.trace("End no work wait");
                    } catch (InterruptedException e) {
                        throw new SamzaException("Run loop is interrupted", e);
                    }
                }
            }
            this.runLoopResumedSinceLastChecked = false;
            while (!this.shutdownNow && this.throwable == null) {
                Iterator<AsyncTaskWorker> it = this.taskWorkers.iterator();
                while (it.hasNext()) {
                    if (it.next().state.isReady()) {
                        return;
                    }
                }
                try {
                    log.trace("Block loop thread");
                    this.latch.wait();
                } catch (InterruptedException e2) {
                    throw new SamzaException("Run loop is interrupted", e2);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resume() {
        log.trace("Resume loop thread");
        if (this.coordinatorRequests.shouldShutdownNow() && this.coordinatorRequests.commitRequests().isEmpty()) {
            this.shutdownNow = true;
        }
        synchronized (this.latch) {
            this.latch.notifyAll();
            this.runLoopResumedSinceLastChecked = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resumeAfterDrain() {
        log.trace("Resume loop thread");
        if (this.coordinatorRequests.shouldShutdownNow()) {
            this.shutdownNow = true;
        }
        synchronized (this.latch) {
            this.latch.notifyAll();
            this.runLoopResumedSinceLastChecked = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void abort(Throwable th) {
        this.throwable = th;
    }
}
