package org.apache.samza.task;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.operators.impl.InputOperatorImpl;
import org.apache.samza.operators.impl.OperatorImplGraph;
import org.apache.samza.system.DrainMessage;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.MessageType;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.WatermarkMessage;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/task/StreamOperatorTask.class */
public class StreamOperatorTask implements AsyncStreamTask, InitableTask, WindowableTask, ClosableTask {
    private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorTask.class);
    private final OperatorSpecGraph specGraph;
    private final Clock clock;
    private ExecutorService taskThreadPool;
    private OperatorImplGraph operatorImplGraph;

    /* renamed from: org.apache.samza.task.StreamOperatorTask$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/samza/task/StreamOperatorTask$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$samza$system$MessageType = new int[MessageType.values().length];

        static {
            try {
                $SwitchMap$org$apache$samza$system$MessageType[MessageType.USER_MESSAGE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$samza$system$MessageType[MessageType.END_OF_STREAM.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$samza$system$MessageType[MessageType.DRAIN.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$samza$system$MessageType[MessageType.WATERMARK.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public StreamOperatorTask(OperatorSpecGraph operatorSpecGraph, Clock clock) {
        this.specGraph = operatorSpecGraph.m132clone();
        this.clock = clock;
    }

    public StreamOperatorTask(OperatorSpecGraph operatorSpecGraph) {
        this(operatorSpecGraph, SystemClock.instance());
    }

    public final void init(Context context) throws Exception {
        this.operatorImplGraph = new OperatorImplGraph(this.specGraph, context, this.clock);
    }

    public final void processAsync(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator, TaskCallback taskCallback) {
        Runnable runnable = () -> {
            CompletionStage<Void> failedFuture;
            try {
                SystemStream systemStream = incomingMessageEnvelope.getSystemStreamPartition().getSystemStream();
                InputOperatorImpl inputOperator = this.operatorImplGraph.getInputOperator(systemStream);
                if (inputOperator != null) {
                    MessageType of = MessageType.of(incomingMessageEnvelope.getMessage());
                    switch (AnonymousClass1.$SwitchMap$org$apache$samza$system$MessageType[of.ordinal()]) {
                        case 1:
                            failedFuture = inputOperator.onMessageAsync(incomingMessageEnvelope, messageCollector, taskCoordinator);
                            break;
                        case CoordinatorStreamMessage.KEY_INDEX /* 2 */:
                            failedFuture = inputOperator.aggregateEndOfStream((EndOfStreamMessage) incomingMessageEnvelope.getMessage(), incomingMessageEnvelope.getSystemStreamPartition(), messageCollector, taskCoordinator);
                            break;
                        case 3:
                            failedFuture = inputOperator.aggregateDrainMessages((DrainMessage) incomingMessageEnvelope.getMessage(), incomingMessageEnvelope.getSystemStreamPartition(), messageCollector, taskCoordinator);
                            break;
                        case 4:
                            failedFuture = inputOperator.aggregateWatermark((WatermarkMessage) incomingMessageEnvelope.getMessage(), incomingMessageEnvelope.getSystemStreamPartition(), messageCollector, taskCoordinator);
                            break;
                        default:
                            failedFuture = failedFuture(new SamzaException("Unknown message type " + of + " encountered."));
                            break;
                    }
                    failedFuture.whenComplete((r4, th) -> {
                        if (th != null) {
                            taskCallback.failure(th);
                        } else {
                            taskCallback.complete();
                        }
                    });
                } else {
                    String format = String.format("InputOperator not found in OperatorGraph for %s. The available input operators are: %s. Please check SystemStream configuration for the `SystemConsumer` and/or task.inputs task configuration.", systemStream, this.operatorImplGraph.getAllInputOperators());
                    LOG.error(format);
                    taskCallback.failure(new SamzaException(format));
                }
            } catch (Exception e) {
                LOG.error("Failed to process the incoming message due to ", e);
                taskCallback.failure(e);
            }
        };
        if (this.taskThreadPool != null) {
            LOG.debug("Processing message using thread pool.");
            this.taskThreadPool.submit(runnable);
        } else {
            LOG.debug("Processing message on the run loop thread.");
            runnable.run();
        }
    }

    public final void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        CompletableFuture.allOf((CompletableFuture[]) this.operatorImplGraph.getAllInputOperators().stream().map(inputOperatorImpl -> {
            return inputOperatorImpl.onTimer(messageCollector, taskCoordinator);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).join();
    }

    public void close() throws Exception {
        if (this.operatorImplGraph != null) {
            this.operatorImplGraph.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setTaskThreadPool(ExecutorService executorService) {
        this.taskThreadPool = executorService;
    }

    @VisibleForTesting
    void setOperatorImplGraph(OperatorImplGraph operatorImplGraph) {
        this.operatorImplGraph = operatorImplGraph;
    }

    OperatorImplGraph getOperatorImplGraph() {
        return this.operatorImplGraph;
    }

    private static CompletableFuture<Void> failedFuture(Throwable th) {
        Preconditions.checkNotNull(th);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(th);
        return completableFuture;
    }
}
