package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.util.ArrayDequeInbox;
import com.hazelcast.jet.impl.util.CircularListCursor;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.util.Preconditions;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/ProcessorTasklet.class */
public class ProcessorTasklet implements Tasklet {
    private static final int OUTBOX_BATCH_SIZE = 2048;
    private final OutboundEdgeStream[] outstreams;
    private final OutboxImpl outbox;
    private final Contexts.ProcCtx context;
    private final Processor processor;
    private final SnapshotContext ssContext;
    private final BitSet receivedBarriers;
    private final Queue<ArrayList<InboundEdgeStream>> instreamGroupQueue;
    private int numActiveOrdinals;
    private CircularListCursor<InboundEdgeStream> instreamCursor;
    private InboundEdgeStream currInstream;
    private ProcessorState state;
    private long pendingSnapshotId;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ProgressTracker progTracker = new ProgressTracker();
    private final ArrayDequeInbox inbox = new ArrayDequeInbox(this.progTracker);

    public ProcessorTasklet(@Nonnull Contexts.ProcCtx procCtx, @Nonnull Processor processor, @Nonnull List<? extends InboundEdgeStream> list, @Nonnull List<? extends OutboundEdgeStream> list2, @Nonnull SnapshotContext snapshotContext, @Nonnull OutboundCollector outboundCollector) {
        Preconditions.checkNotNull(processor, "processor");
        this.context = procCtx;
        this.processor = processor;
        this.numActiveOrdinals = list.size();
        this.instreamGroupQueue = (Queue) ((TreeMap) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.priority();
        }, TreeMap::new, Collectors.toCollection(ArrayList::new)))).entrySet().stream().map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toCollection(ArrayDeque::new));
        this.outstreams = (OutboundEdgeStream[]) list2.stream().sorted(Comparator.comparing((v0) -> {
            return v0.ordinal();
        })).toArray(i -> {
            return new OutboundEdgeStream[i];
        });
        this.ssContext = snapshotContext;
        this.instreamCursor = popInstreamGroup();
        this.currInstream = this.instreamCursor != null ? this.instreamCursor.value() : null;
        this.outbox = createOutbox(outboundCollector);
        this.receivedBarriers = new BitSet(list.size());
        this.state = initialProcessingState();
        this.pendingSnapshotId = snapshotContext.lastSnapshotId() + 1;
    }

    private OutboxImpl createOutbox(OutboundCollector outboundCollector) {
        OutboundCollector[] outboundCollectorArr = new OutboundCollector[this.outstreams.length + (outboundCollector == null ? 0 : 1)];
        for (int i = 0; i < this.outstreams.length; i++) {
            outboundCollectorArr[i] = this.outstreams[i].getCollector();
        }
        if (outboundCollector != null) {
            outboundCollectorArr[this.outstreams.length] = outboundCollector;
        }
        return new OutboxImpl(outboundCollectorArr, outboundCollector != null, this.progTracker, this.context.getSerializationService(), OUTBOX_BATCH_SIZE);
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet
    public void init(CompletableFuture<Void> completableFuture) {
        this.processor.init(this.outbox, this.context);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.hazelcast.jet.impl.execution.Tasklet, java.util.concurrent.Callable
    @Nonnull
    public ProgressState call() {
        this.progTracker.reset();
        this.outbox.resetBatch();
        stateMachineStep();
        return this.progTracker.toProgressState();
    }

    private void stateMachineStep() {
        switch (this.state) {
            case PROCESS_INBOX:
                this.progTracker.notDone();
                if (this.inbox.isEmpty() && (isSnapshotInbox() || this.processor.tryProcess())) {
                    fillInbox();
                }
                if (!this.inbox.isEmpty()) {
                    if (isSnapshotInbox()) {
                        this.processor.restoreFromSnapshot(this.inbox);
                    } else {
                        this.processor.process(this.currInstream.ordinal(), this.inbox);
                    }
                }
                if (this.inbox.isEmpty()) {
                    if (this.currInstream != null && this.currInstream.isDone()) {
                        this.state = ProcessorState.COMPLETE_EDGE;
                        this.progTracker.madeProgress();
                        return;
                    } else if (this.context.snapshottingEnabled() && this.numActiveOrdinals > 0 && this.receivedBarriers.cardinality() == this.numActiveOrdinals) {
                        this.state = ProcessorState.SAVE_SNAPSHOT;
                        return;
                    } else {
                        if (this.numActiveOrdinals == 0) {
                            this.progTracker.madeProgress();
                            this.state = ProcessorState.COMPLETE;
                            return;
                        }
                        return;
                    }
                }
                return;
            case COMPLETE_EDGE:
                this.progTracker.notDone();
                if (isSnapshotInbox()) {
                    if (!this.processor.finishSnapshotRestore()) {
                        return;
                    }
                } else if (!this.processor.completeEdge(this.currInstream.ordinal())) {
                    return;
                }
                this.progTracker.madeProgress();
                this.state = initialProcessingState();
                return;
            case SAVE_SNAPSHOT:
                if (!$assertionsDisabled && !this.context.snapshottingEnabled()) {
                    throw new AssertionError("Snapshotting is not enabled");
                }
                this.progTracker.notDone();
                if (this.processor.saveToSnapshot()) {
                    this.progTracker.madeProgress();
                    this.state = ProcessorState.EMIT_BARRIER;
                    return;
                }
                return;
            case EMIT_BARRIER:
                if (!$assertionsDisabled && !this.context.snapshottingEnabled()) {
                    throw new AssertionError("Snapshotting is not enabled");
                }
                this.progTracker.notDone();
                if (this.outbox.offerToEdgesAndSnapshot(new SnapshotBarrier(this.pendingSnapshotId))) {
                    this.receivedBarriers.clear();
                    this.pendingSnapshotId++;
                    this.state = initialProcessingState();
                    return;
                }
                return;
            case COMPLETE:
                this.progTracker.notDone();
                if (this.context.snapshottingEnabled()) {
                    long lastSnapshotId = this.ssContext.lastSnapshotId();
                    if (!$assertionsDisabled && lastSnapshotId > this.pendingSnapshotId) {
                        throw new AssertionError("Unexpected new snapshot id " + lastSnapshotId + ", current was" + this.pendingSnapshotId);
                    }
                    if (lastSnapshotId == this.pendingSnapshotId) {
                        this.state = ProcessorState.SAVE_SNAPSHOT;
                        this.progTracker.madeProgress();
                        return;
                    }
                }
                if (this.processor.complete()) {
                    this.progTracker.madeProgress();
                    this.state = ProcessorState.EMIT_DONE_ITEM;
                    return;
                }
                return;
            case EMIT_DONE_ITEM:
                if (this.outbox.offerToEdgesAndSnapshot(DoneItem.DONE_ITEM)) {
                    this.state = ProcessorState.END;
                    return;
                } else {
                    this.progTracker.notDone();
                    return;
                }
            default:
                throw new JetException("Unexpected state: " + this.state);
        }
    }

    private void fillInbox() {
        if (this.instreamCursor == null) {
            return;
        }
        InboundEdgeStream value = this.instreamCursor.value();
        do {
            this.currInstream = this.instreamCursor.value();
            ProgressState progressState = ProgressState.NO_PROGRESS;
            if (this.ssContext != null && this.ssContext.processingGuarantee() == ProcessingGuarantee.EXACTLY_ONCE && this.receivedBarriers.get(this.currInstream.ordinal())) {
                this.instreamCursor.advance();
            } else {
                InboundEdgeStream inboundEdgeStream = this.currInstream;
                ArrayDequeInbox arrayDequeInbox = this.inbox;
                arrayDequeInbox.getClass();
                progressState = inboundEdgeStream.drainTo(arrayDequeInbox::add);
                this.progTracker.madeProgress(progressState.isMadeProgress());
                if (progressState.isDone()) {
                    this.receivedBarriers.clear(this.currInstream.ordinal());
                    this.instreamCursor.remove();
                    this.numActiveOrdinals--;
                }
                if (this.inbox.peekLast() instanceof SnapshotBarrier) {
                    observeSnapshot(this.currInstream.ordinal(), ((SnapshotBarrier) this.inbox.removeLast()).snapshotId());
                }
                if (!this.instreamCursor.advance()) {
                    this.instreamCursor = popInstreamGroup();
                    return;
                }
            }
            if (progressState.isMadeProgress()) {
                return;
            }
        } while (this.instreamCursor.value() != value);
    }

    private CircularListCursor<InboundEdgeStream> popInstreamGroup() {
        return (CircularListCursor) Optional.ofNullable(this.instreamGroupQueue.poll()).map((v1) -> {
            return new CircularListCursor(v1);
        }).orElse(null);
    }

    public String toString() {
        return "ProcessorTasklet{vertex=" + this.context.vertexName() + ", processor=" + this.processor + '}';
    }

    private void observeSnapshot(int i, long j) {
        if (j != this.pendingSnapshotId) {
            throw new JetException("Unexpected snapshot barrier " + j + " from ordinal " + i + " expected " + this.pendingSnapshotId);
        }
        this.receivedBarriers.set(i);
    }

    private ProcessorState initialProcessingState() {
        return this.instreamCursor == null ? ProcessorState.COMPLETE : ProcessorState.PROCESS_INBOX;
    }

    private boolean isSnapshotInbox() {
        return this.currInstream != null && this.currInstream.priority() == Integer.MIN_VALUE;
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet
    public boolean isCooperative() {
        return this.processor.isCooperative();
    }

    static {
        $assertionsDisabled = !ProcessorTasklet.class.desiredAssertionStatus();
    }
}
