package org.apache.samza.task;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.container.TaskInstance;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.HighResolutionClock;
import org.apache.samza.util.Throttleable;
import org.apache.samza.util.ThrottlingScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;

/* loaded from: input_file:org/apache/samza/task/AsyncRunLoop.class */
public class AsyncRunLoop implements Runnable, Throttleable {
    private static final Logger log = LoggerFactory.getLogger(AsyncRunLoop.class);
    private final List<AsyncTaskWorker> taskWorkers;
    private final SystemConsumers consumerMultiplexer;
    private final Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToTaskWorkerMapping;
    private final ExecutorService threadPool;
    private final CoordinatorRequests coordinatorRequests;
    private final Object latch;
    private final int maxConcurrency;
    private final long windowMs;
    private final long commitMs;
    private final long callbackTimeoutMs;
    private final SamzaContainerMetrics containerMetrics;
    private final ScheduledExecutorService workerTimer;
    private final ScheduledExecutorService callbackTimer;
    private final ThrottlingScheduler callbackExecutor;
    private volatile boolean shutdownNow = false;
    private volatile Throwable throwable = null;
    private final HighResolutionClock clock;
    private final boolean isAsyncCommitEnabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.samza.task.AsyncRunLoop$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/samza/task/AsyncRunLoop$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$samza$task$AsyncRunLoop$WorkerOp = new int[WorkerOp.values().length];

        static {
            try {
                $SwitchMap$org$apache$samza$task$AsyncRunLoop$WorkerOp[WorkerOp.PROCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$samza$task$AsyncRunLoop$WorkerOp[WorkerOp.WINDOW.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$samza$task$AsyncRunLoop$WorkerOp[WorkerOp.COMMIT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$samza$task$AsyncRunLoop$WorkerOp[WorkerOp.END_OF_STREAM.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/task/AsyncRunLoop$AsyncTaskState.class */
    public final class AsyncTaskState {
        private volatile boolean needWindow = false;
        private volatile boolean needCommit = false;
        private volatile boolean complete = false;
        private volatile boolean endOfStream = false;
        private volatile boolean windowInFlight = false;
        private volatile boolean commitInFlight = false;
        private final AtomicInteger messagesInFlight = new AtomicInteger(0);
        private final ArrayDeque<PendingEnvelope> pendingEnvelopeQueue = new ArrayDeque<>();
        private final Set<SystemStreamPartition> processingSspSet;
        private final TaskName taskName;
        private final TaskInstanceMetrics taskMetrics;

        AsyncTaskState(TaskName taskName, TaskInstanceMetrics taskInstanceMetrics, Set<SystemStreamPartition> set) {
            this.taskName = taskName;
            this.taskMetrics = taskInstanceMetrics;
            this.processingSspSet = set;
        }

        private boolean checkEndOfStream() {
            if (this.pendingEnvelopeQueue.size() == 1) {
                IncomingMessageEnvelope incomingMessageEnvelope = this.pendingEnvelopeQueue.peek().envelope;
                if (incomingMessageEnvelope.isEndOfStream()) {
                    this.processingSspSet.remove(incomingMessageEnvelope.getSystemStreamPartition());
                    this.pendingEnvelopeQueue.remove();
                }
            }
            return this.processingSspSet.isEmpty();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isReady() {
            if (checkEndOfStream()) {
                this.endOfStream = true;
            }
            if (AsyncRunLoop.this.coordinatorRequests.commitRequests().remove(this.taskName)) {
                this.needCommit = true;
            }
            boolean z = this.windowInFlight || this.commitInFlight;
            return this.needCommit ? (this.messagesInFlight.get() == 0 || AsyncRunLoop.this.isAsyncCommitEnabled) && !z : (this.needWindow || this.endOfStream) ? this.messagesInFlight.get() == 0 && !z : this.messagesInFlight.get() < AsyncRunLoop.this.maxConcurrency && !this.windowInFlight && (AsyncRunLoop.this.isAsyncCommitEnabled || !this.commitInFlight);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public WorkerOp nextOp() {
            if (this.complete) {
                return WorkerOp.NO_OP;
            }
            if (isReady()) {
                if (this.needCommit) {
                    return WorkerOp.COMMIT;
                }
                if (this.needWindow) {
                    return WorkerOp.WINDOW;
                }
                if (this.endOfStream) {
                    return WorkerOp.END_OF_STREAM;
                }
                if (!this.pendingEnvelopeQueue.isEmpty()) {
                    return WorkerOp.PROCESS;
                }
            }
            return WorkerOp.NO_OP;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void needWindow() {
            this.needWindow = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void needCommit() {
            this.needCommit = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startWindow() {
            this.needWindow = false;
            this.windowInFlight = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startCommit() {
            this.needCommit = false;
            this.commitInFlight = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startProcess() {
            this.taskMetrics.messagesInFlight().set(Integer.valueOf(this.messagesInFlight.incrementAndGet()));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doneCommit() {
            this.commitInFlight = false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doneWindow() {
            this.windowInFlight = false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doneProcess() {
            this.taskMetrics.messagesInFlight().set(Integer.valueOf(this.messagesInFlight.decrementAndGet()));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void insertEnvelope(PendingEnvelope pendingEnvelope) {
            this.pendingEnvelopeQueue.add(pendingEnvelope);
            int size = this.pendingEnvelopeQueue.size();
            this.taskMetrics.pendingMessages().set(Integer.valueOf(size));
            AsyncRunLoop.log.trace("Insert envelope to task {} queue.", this.taskName);
            AsyncRunLoop.log.debug("Task {} pending envelope count is {} after insertion.", this.taskName, Integer.valueOf(size));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public IncomingMessageEnvelope fetchEnvelope() {
            PendingEnvelope remove = this.pendingEnvelopeQueue.remove();
            int size = this.pendingEnvelopeQueue.size();
            this.taskMetrics.pendingMessages().set(Integer.valueOf(size));
            AsyncRunLoop.log.trace("fetch envelope ssp {} offset {} to process.", remove.envelope.getSystemStreamPartition(), remove.envelope.getOffset());
            AsyncRunLoop.log.debug("Task {} pending envelopes count is {} after fetching.", this.taskName, Integer.valueOf(size));
            if (remove.markProcessed()) {
                SystemStreamPartition systemStreamPartition = remove.envelope.getSystemStreamPartition();
                AsyncRunLoop.this.consumerMultiplexer.tryUpdate(systemStreamPartition);
                AsyncRunLoop.log.debug("Update chooser for {}", systemStreamPartition);
            }
            return remove.envelope;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/task/AsyncRunLoop$AsyncTaskWorker.class */
    public class AsyncTaskWorker implements TaskCallbackListener {
        private final TaskInstance task;
        private final TaskCallbackManager callbackManager;
        private volatile AsyncTaskState state;

        AsyncTaskWorker(TaskInstance taskInstance) {
            this.task = taskInstance;
            this.callbackManager = new TaskCallbackManager(this, AsyncRunLoop.this.callbackTimer, AsyncRunLoop.this.callbackTimeoutMs, AsyncRunLoop.this.maxConcurrency, AsyncRunLoop.this.clock);
            this.state = new AsyncTaskState(taskInstance.taskName(), taskInstance.metrics(), getWorkingSSPSet(taskInstance));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void init() {
            if (this.task.isWindowableTask() && AsyncRunLoop.this.windowMs > 0) {
                AsyncRunLoop.this.workerTimer.scheduleAtFixedRate(new Runnable() { // from class: org.apache.samza.task.AsyncRunLoop.AsyncTaskWorker.1
                    @Override // java.lang.Runnable
                    public void run() {
                        AsyncRunLoop.log.trace("Task {} need window", AsyncTaskWorker.this.task.taskName());
                        AsyncTaskWorker.this.state.needWindow();
                        AsyncRunLoop.this.resume();
                    }
                }, AsyncRunLoop.this.windowMs, AsyncRunLoop.this.windowMs, TimeUnit.MILLISECONDS);
            }
            if (AsyncRunLoop.this.commitMs > 0) {
                AsyncRunLoop.this.workerTimer.scheduleAtFixedRate(new Runnable() { // from class: org.apache.samza.task.AsyncRunLoop.AsyncTaskWorker.2
                    @Override // java.lang.Runnable
                    public void run() {
                        AsyncRunLoop.log.trace("Task {} need commit", AsyncTaskWorker.this.task.taskName());
                        AsyncTaskWorker.this.state.needCommit();
                        AsyncRunLoop.this.resume();
                    }
                }, AsyncRunLoop.this.commitMs, AsyncRunLoop.this.commitMs, TimeUnit.MILLISECONDS);
            }
        }

        private Set<SystemStreamPartition> getWorkingSSPSet(TaskInstance taskInstance) {
            return (Set) new HashSet((Collection) JavaConverters.setAsJavaSetConverter(taskInstance.systemStreamPartitions()).asJava()).stream().filter(systemStreamPartition -> {
                return !AsyncRunLoop.this.consumerMultiplexer.isEndOfStream(systemStreamPartition);
            }).collect(Collectors.toSet());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void run() {
            switch (AnonymousClass1.$SwitchMap$org$apache$samza$task$AsyncRunLoop$WorkerOp[this.state.nextOp().ordinal()]) {
                case 1:
                    process();
                    return;
                case CoordinatorStreamMessage.KEY_INDEX /* 2 */:
                    window();
                    return;
                case 3:
                    commit();
                    return;
                case 4:
                    endOfStream();
                    return;
                default:
                    return;
            }
        }

        private void endOfStream() {
            this.state.complete = true;
            try {
                ReadableCoordinator readableCoordinator = new ReadableCoordinator(this.task.taskName());
                this.task.endOfStream(readableCoordinator);
                readableCoordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
                AsyncRunLoop.this.coordinatorRequests.update(readableCoordinator);
                if (AsyncRunLoop.this.coordinatorRequests.commitRequests().remove(this.task.taskName())) {
                    this.task.commit();
                }
            } finally {
                AsyncRunLoop.this.resume();
            }
        }

        private void process() {
            final IncomingMessageEnvelope fetchEnvelope = this.state.fetchEnvelope();
            AsyncRunLoop.log.trace("Process ssp {} offset {}", fetchEnvelope.getSystemStreamPartition(), fetchEnvelope.getOffset());
            final ReadableCoordinator readableCoordinator = new ReadableCoordinator(this.task.taskName());
            this.task.process(fetchEnvelope, readableCoordinator, new TaskCallbackFactory() { // from class: org.apache.samza.task.AsyncRunLoop.AsyncTaskWorker.3
                @Override // org.apache.samza.task.TaskCallbackFactory
                public TaskCallback createCallback() {
                    AsyncTaskWorker.this.state.startProcess();
                    AsyncRunLoop.this.containerMetrics.processes().inc();
                    return AsyncTaskWorker.this.callbackManager.createCallback(AsyncTaskWorker.this.task.taskName(), fetchEnvelope, readableCoordinator);
                }
            });
        }

        private void window() {
            this.state.startWindow();
            Runnable runnable = new Runnable() { // from class: org.apache.samza.task.AsyncRunLoop.AsyncTaskWorker.4
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            AsyncRunLoop.this.containerMetrics.windows().inc();
                            ReadableCoordinator readableCoordinator = new ReadableCoordinator(AsyncTaskWorker.this.task.taskName());
                            long nanoTime = AsyncRunLoop.this.clock.nanoTime();
                            AsyncTaskWorker.this.task.window(readableCoordinator);
                            AsyncRunLoop.this.containerMetrics.windowNs().update(AsyncRunLoop.this.clock.nanoTime() - nanoTime);
                            AsyncRunLoop.this.coordinatorRequests.update(readableCoordinator);
                            AsyncTaskWorker.this.state.doneWindow();
                            AsyncRunLoop.log.trace("Task {} window completed", AsyncTaskWorker.this.task.taskName());
                            AsyncRunLoop.this.resume();
                        } catch (Throwable th) {
                            AsyncRunLoop.log.error("Task {} window failed", AsyncTaskWorker.this.task.taskName(), th);
                            AsyncRunLoop.this.abort(th);
                            AsyncRunLoop.log.trace("Task {} window completed", AsyncTaskWorker.this.task.taskName());
                            AsyncRunLoop.this.resume();
                        }
                    } catch (Throwable th2) {
                        AsyncRunLoop.log.trace("Task {} window completed", AsyncTaskWorker.this.task.taskName());
                        AsyncRunLoop.this.resume();
                        throw th2;
                    }
                }
            };
            if (AsyncRunLoop.this.threadPool != null) {
                AsyncRunLoop.log.trace("Task {} window on the thread pool", this.task.taskName());
                AsyncRunLoop.this.threadPool.submit(runnable);
            } else {
                AsyncRunLoop.log.trace("Task {} window on the run loop thread", this.task.taskName());
                runnable.run();
            }
        }

        private void commit() {
            this.state.startCommit();
            Runnable runnable = new Runnable() { // from class: org.apache.samza.task.AsyncRunLoop.AsyncTaskWorker.5
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            AsyncRunLoop.this.containerMetrics.commits().inc();
                            long nanoTime = AsyncRunLoop.this.clock.nanoTime();
                            AsyncTaskWorker.this.task.commit();
                            AsyncRunLoop.this.containerMetrics.commitNs().update(AsyncRunLoop.this.clock.nanoTime() - nanoTime);
                            AsyncTaskWorker.this.state.doneCommit();
                            AsyncRunLoop.log.trace("Task {} commit completed", AsyncTaskWorker.this.task.taskName());
                            AsyncRunLoop.this.resume();
                        } catch (Throwable th) {
                            AsyncRunLoop.log.error("Task {} commit failed", AsyncTaskWorker.this.task.taskName(), th);
                            AsyncRunLoop.this.abort(th);
                            AsyncRunLoop.log.trace("Task {} commit completed", AsyncTaskWorker.this.task.taskName());
                            AsyncRunLoop.this.resume();
                        }
                    } catch (Throwable th2) {
                        AsyncRunLoop.log.trace("Task {} commit completed", AsyncTaskWorker.this.task.taskName());
                        AsyncRunLoop.this.resume();
                        throw th2;
                    }
                }
            };
            if (AsyncRunLoop.this.threadPool != null) {
                AsyncRunLoop.log.trace("Task {} commits on the thread pool", this.task.taskName());
                AsyncRunLoop.this.threadPool.submit(runnable);
            } else {
                AsyncRunLoop.log.trace("Task {} commits on the run loop thread", this.task.taskName());
                runnable.run();
            }
        }

        @Override // org.apache.samza.task.TaskCallbackListener
        public void onComplete(final TaskCallback taskCallback) {
            AsyncRunLoop.this.callbackExecutor.schedule(new Runnable() { // from class: org.apache.samza.task.AsyncRunLoop.AsyncTaskWorker.6
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            AsyncTaskWorker.this.state.doneProcess();
                            TaskCallbackImpl taskCallbackImpl = (TaskCallbackImpl) taskCallback;
                            AsyncRunLoop.this.containerMetrics.processNs().update(AsyncRunLoop.this.clock.nanoTime() - taskCallbackImpl.timeCreatedNs);
                            AsyncRunLoop.log.trace("Got callback complete for task {}, ssp {}", taskCallbackImpl.taskName, taskCallbackImpl.envelope.getSystemStreamPartition());
                            for (TaskCallbackImpl taskCallbackImpl2 : AsyncTaskWorker.this.callbackManager.updateCallback(taskCallbackImpl)) {
                                IncomingMessageEnvelope incomingMessageEnvelope = taskCallbackImpl2.envelope;
                                AsyncRunLoop.log.trace("Update offset for ssp {}, offset {}", incomingMessageEnvelope.getSystemStreamPartition(), incomingMessageEnvelope.getOffset());
                                AsyncTaskWorker.this.task.offsetManager().update(AsyncTaskWorker.this.task.taskName(), incomingMessageEnvelope.getSystemStreamPartition(), incomingMessageEnvelope.getOffset());
                                AsyncRunLoop.this.coordinatorRequests.update(taskCallbackImpl2.coordinator);
                            }
                            AsyncRunLoop.this.resume();
                        } catch (Throwable th) {
                            AsyncRunLoop.log.error(th.getMessage(), th);
                            AsyncRunLoop.this.abort(th);
                            AsyncRunLoop.this.resume();
                        }
                    } catch (Throwable th2) {
                        AsyncRunLoop.this.resume();
                        throw th2;
                    }
                }
            }, AsyncRunLoop.this.clock.nanoTime() - ((TaskCallbackImpl) taskCallback).timeCreatedNs);
        }

        @Override // org.apache.samza.task.TaskCallbackListener
        public void onFailure(TaskCallback taskCallback, Throwable th) {
            try {
                try {
                    this.state.doneProcess();
                    AsyncRunLoop.this.abort(th);
                    AsyncRunLoop.log.error("Got callback failure for task {}", ((TaskCallbackImpl) taskCallback).taskName);
                    AsyncRunLoop.this.resume();
                } catch (Throwable th2) {
                    AsyncRunLoop.log.error(th2.getMessage(), th2);
                    AsyncRunLoop.this.resume();
                }
            } catch (Throwable th3) {
                AsyncRunLoop.this.resume();
                throw th3;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/task/AsyncRunLoop$PendingEnvelope.class */
    public static final class PendingEnvelope {
        private final IncomingMessageEnvelope envelope;
        private boolean processed = false;

        PendingEnvelope(IncomingMessageEnvelope incomingMessageEnvelope) {
            this.envelope = incomingMessageEnvelope;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean markProcessed() {
            boolean z = this.processed;
            this.processed = true;
            return !z;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/task/AsyncRunLoop$WorkerOp.class */
    public enum WorkerOp {
        WINDOW,
        COMMIT,
        PROCESS,
        END_OF_STREAM,
        NO_OP
    }

    public AsyncRunLoop(Map<TaskName, TaskInstance> map, ExecutorService executorService, SystemConsumers systemConsumers, int i, long j, long j2, long j3, long j4, SamzaContainerMetrics samzaContainerMetrics, HighResolutionClock highResolutionClock, boolean z) {
        this.threadPool = executorService;
        this.consumerMultiplexer = systemConsumers;
        this.containerMetrics = samzaContainerMetrics;
        this.windowMs = j;
        this.commitMs = j2;
        this.maxConcurrency = i;
        this.callbackTimeoutMs = j3;
        this.callbackTimer = j3 > 0 ? Executors.newSingleThreadScheduledExecutor() : null;
        this.callbackExecutor = new ThrottlingScheduler(j4);
        this.coordinatorRequests = new CoordinatorRequests(map.keySet());
        this.latch = new Object();
        this.workerTimer = Executors.newSingleThreadScheduledExecutor();
        this.clock = highResolutionClock;
        HashMap hashMap = new HashMap();
        for (TaskInstance taskInstance : map.values()) {
            hashMap.put(taskInstance.taskName(), new AsyncTaskWorker(taskInstance));
        }
        this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(map, hashMap));
        this.taskWorkers = Collections.unmodifiableList(new ArrayList(hashMap.values()));
        this.isAsyncCommitEnabled = z;
    }

    private static Map<SystemStreamPartition, List<AsyncTaskWorker>> getSspToAsyncTaskWorkerMap(Map<TaskName, TaskInstance> map, Map<TaskName, AsyncTaskWorker> map2) {
        HashMap hashMap = new HashMap();
        for (TaskInstance taskInstance : map.values()) {
            for (SystemStreamPartition systemStreamPartition : (Set) JavaConverters.setAsJavaSetConverter(taskInstance.systemStreamPartitions()).asJava()) {
                hashMap.putIfAbsent(systemStreamPartition, new ArrayList());
                ((List) hashMap.get(systemStreamPartition)).add(map2.get(taskInstance.taskName()));
            }
        }
        return hashMap;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            Iterator<AsyncTaskWorker> it = this.taskWorkers.iterator();
            while (it.hasNext()) {
                it.next().init();
            }
            long nanoTime = this.clock.nanoTime();
            while (!this.shutdownNow) {
                if (this.throwable != null) {
                    log.error("Caught throwable and stopping run loop", this.throwable);
                    throw new SamzaException(this.throwable);
                }
                long nanoTime2 = this.clock.nanoTime();
                IncomingMessageEnvelope chooseEnvelope = chooseEnvelope();
                long nanoTime3 = this.clock.nanoTime();
                this.containerMetrics.chooseNs().update(nanoTime3 - nanoTime2);
                runTasks(chooseEnvelope);
                long nanoTime4 = this.clock.nanoTime();
                blockIfBusy(chooseEnvelope);
                long nanoTime5 = this.clock.nanoTime();
                long j = nanoTime4 - nanoTime3;
                long j2 = nanoTime5 - nanoTime;
                nanoTime = nanoTime5;
                this.containerMetrics.blockNs().update(nanoTime5 - nanoTime4);
                if (j2 != 0) {
                    this.containerMetrics.utilization().set(Double.valueOf(j / j2));
                }
            }
        } finally {
            this.workerTimer.shutdown();
            this.callbackExecutor.shutdown();
            if (this.callbackTimer != null) {
                this.callbackTimer.shutdown();
            }
        }
    }

    @Override // org.apache.samza.util.Throttleable
    public void setWorkFactor(double d) {
        this.callbackExecutor.setWorkFactor(d);
    }

    @Override // org.apache.samza.util.Throttleable
    public double getWorkFactor() {
        return this.callbackExecutor.getWorkFactor();
    }

    public void shutdown() {
        this.shutdownNow = true;
    }

    private IncomingMessageEnvelope chooseEnvelope() {
        IncomingMessageEnvelope choose = this.consumerMultiplexer.choose(false);
        if (choose != null) {
            log.trace("Choose envelope ssp {} offset {} for processing", choose.getSystemStreamPartition(), choose.getOffset());
            this.containerMetrics.envelopes().inc();
        } else {
            log.trace("No envelope is available");
            this.containerMetrics.nullEnvelopes().inc();
        }
        return choose;
    }

    private void runTasks(IncomingMessageEnvelope incomingMessageEnvelope) {
        if (incomingMessageEnvelope != null) {
            PendingEnvelope pendingEnvelope = new PendingEnvelope(incomingMessageEnvelope);
            Iterator<AsyncTaskWorker> it = this.sspToTaskWorkerMapping.get(incomingMessageEnvelope.getSystemStreamPartition()).iterator();
            while (it.hasNext()) {
                it.next().state.insertEnvelope(pendingEnvelope);
            }
        }
        Iterator<AsyncTaskWorker> it2 = this.taskWorkers.iterator();
        while (it2.hasNext()) {
            it2.next().run();
        }
    }

    private void blockIfBusy(IncomingMessageEnvelope incomingMessageEnvelope) {
        synchronized (this.latch) {
            while (!this.shutdownNow && this.throwable == null) {
                Iterator<AsyncTaskWorker> it = this.taskWorkers.iterator();
                while (it.hasNext()) {
                    if (it.next().state.isReady()) {
                        return;
                    }
                }
                try {
                    log.trace("Block loop thread");
                    this.latch.wait();
                } catch (InterruptedException e) {
                    throw new SamzaException("Run loop is interrupted", e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resume() {
        log.trace("Resume loop thread");
        if (this.coordinatorRequests.shouldShutdownNow() && this.coordinatorRequests.commitRequests().isEmpty()) {
            this.shutdownNow = true;
        }
        synchronized (this.latch) {
            this.latch.notifyAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void abort(Throwable th) {
        this.throwable = th;
    }
}
