package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.metrics.LongProbeFunction;
import com.hazelcast.internal.metrics.ProbeBuilder;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.metrics.ProbeUnit;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.processor.ProcessorWrapper;
import com.hazelcast.jet.impl.util.ArrayDequeInbox;
import com.hazelcast.jet.impl.util.CircularListCursor;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.util.Preconditions;
import com.hazelcast.util.function.Predicate;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/ProcessorTasklet.class */
public class ProcessorTasklet implements Tasklet {
    private static final int OUTBOX_BATCH_SIZE = 2048;
    private final OutboundEdgeStream[] outstreams;
    private final OutboxImpl outbox;
    private final Processor.Context context;
    private final Processor processor;
    private final SnapshotContext ssContext;
    private final BitSet receivedBarriers;
    private final Queue<ArrayList<InboundEdgeStream>> instreamGroupQueue;
    private final WatermarkCoalescer watermarkCoalescer;
    private final ILogger logger;
    private final SerializationService serializationService;
    private int numActiveOrdinals;
    private CircularListCursor<InboundEdgeStream> instreamCursor;
    private InboundEdgeStream currInstream;
    private ProcessorState state;
    private long pendingSnapshotId;
    private SnapshotBarrier currentBarrier;
    private Watermark pendingWatermark;
    private boolean processorClosed;
    private boolean waitForAllBarriers;
    private final AtomicLongArray receivedCounts;
    private final AtomicLongArray receivedBatches;
    private final AtomicLongArray emittedCounts;
    private final Predicate<Object> addToInboxFunction;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ProgressTracker progTracker = new ProgressTracker();
    private final ArrayDequeInbox inbox = new ArrayDequeInbox(this.progTracker);
    private final AtomicLong queuesSize = new AtomicLong();
    private final AtomicLong queuesCapacity = new AtomicLong();

    public ProcessorTasklet(@Nonnull Processor.Context context, @Nonnull SerializationService serializationService, @Nonnull Processor processor, @Nonnull List<? extends InboundEdgeStream> list, @Nonnull List<? extends OutboundEdgeStream> list2, @Nonnull SnapshotContext snapshotContext, @Nonnull OutboundCollector outboundCollector, @Nullable ProbeBuilder probeBuilder) {
        Deque<Object> queue = this.inbox.queue();
        queue.getClass();
        this.addToInboxFunction = queue::add;
        Preconditions.checkNotNull(processor, "processor");
        this.context = context;
        this.serializationService = serializationService;
        this.processor = processor;
        this.numActiveOrdinals = list.size();
        this.instreamGroupQueue = new ArrayDeque(((TreeMap) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.priority();
        }, TreeMap::new, Collectors.toCollection(ArrayList::new)))).values());
        this.outstreams = (OutboundEdgeStream[]) list2.stream().sorted(Comparator.comparing((v0) -> {
            return v0.ordinal();
        })).toArray(i -> {
            return new OutboundEdgeStream[i];
        });
        this.ssContext = snapshotContext;
        this.logger = getLogger(context);
        this.instreamCursor = popInstreamGroup();
        this.receivedCounts = new AtomicLongArray(list.size());
        this.receivedBatches = new AtomicLongArray(list.size());
        this.emittedCounts = new AtomicLongArray(list2.size() + 1);
        this.outbox = createOutbox(outboundCollector);
        this.receivedBarriers = new BitSet(list.size());
        this.state = initialProcessingState();
        this.pendingSnapshotId = snapshotContext.activeSnapshotId() + 1;
        this.waitForAllBarriers = snapshotContext.processingGuarantee() == ProcessingGuarantee.EXACTLY_ONCE;
        this.watermarkCoalescer = WatermarkCoalescer.create(list.size());
        if (probeBuilder != null) {
            registerMetrics(list, probeBuilder);
        }
    }

    @SuppressFBWarnings(value = {"RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE"}, justification = "jetInstance() can be null in TestProcessorContext")
    private ILogger getLogger(@Nonnull Processor.Context context) {
        return context.jetInstance() != null ? context.jetInstance().getHazelcastInstance().getLoggingService().getLogger(getClass() + "." + toString()) : Logger.getLogger(getClass());
    }

    private void registerMetrics(List<? extends InboundEdgeStream> list, ProbeBuilder probeBuilder) {
        for (int i = 0; i < list.size(); i++) {
            int i2 = i;
            ProbeBuilder withTag = probeBuilder.withTag("ordinal", String.valueOf(i));
            withTag.register((ProbeBuilder) this, "receivedCount", ProbeLevel.INFO, ProbeUnit.COUNT, (LongProbeFunction<ProbeBuilder>) processorTasklet -> {
                return processorTasklet.receivedCounts.get(i2);
            });
            withTag.register((ProbeBuilder) this, "receivedBatches", ProbeLevel.INFO, ProbeUnit.COUNT, (LongProbeFunction<ProbeBuilder>) processorTasklet2 -> {
                return processorTasklet2.receivedBatches.get(i2);
            });
            InboundEdgeStream inboundEdgeStream = list.get(i2);
            withTag.register((ProbeBuilder) this, "topObservedWm", ProbeLevel.INFO, ProbeUnit.MS, (LongProbeFunction<ProbeBuilder>) processorTasklet3 -> {
                return inboundEdgeStream.topObservedWm();
            });
            withTag.register((ProbeBuilder) this, "coalescedWm", ProbeLevel.INFO, ProbeUnit.MS, (LongProbeFunction<ProbeBuilder>) processorTasklet4 -> {
                return inboundEdgeStream.coalescedWm();
            });
        }
        int i3 = 0;
        while (true) {
            if (i3 >= this.emittedCounts.length() - (this.context.snapshottingEnabled() ? 0 : 1)) {
                probeBuilder.register((ProbeBuilder) this, "topObservedWm", ProbeLevel.INFO, ProbeUnit.MS, (LongProbeFunction<ProbeBuilder>) processorTasklet5 -> {
                    return processorTasklet5.watermarkCoalescer.topObservedWm();
                });
                probeBuilder.register((ProbeBuilder) this, "coalescedWm", ProbeLevel.INFO, ProbeUnit.MS, (LongProbeFunction<ProbeBuilder>) processorTasklet6 -> {
                    return processorTasklet6.watermarkCoalescer.coalescedWm();
                });
                probeBuilder.register((ProbeBuilder) this, "lastForwardedWm", ProbeLevel.INFO, ProbeUnit.MS, (LongProbeFunction<ProbeBuilder>) processorTasklet7 -> {
                    return processorTasklet7.outbox.lastForwardedWm();
                });
                probeBuilder.register((ProbeBuilder) this, "lastForwardedWmLatency", ProbeLevel.INFO, ProbeUnit.MS, (LongProbeFunction<ProbeBuilder>) processorTasklet8 -> {
                    return lastForwardedWmLatency();
                });
                probeBuilder.register((ProbeBuilder) this, "queuesSize", ProbeLevel.INFO, ProbeUnit.COUNT, (LongProbeFunction<ProbeBuilder>) processorTasklet9 -> {
                    return processorTasklet9.queuesSize.get();
                });
                probeBuilder.register((ProbeBuilder) this, "queuesCapacity", ProbeLevel.INFO, ProbeUnit.COUNT, (LongProbeFunction<ProbeBuilder>) processorTasklet10 -> {
                    return processorTasklet10.queuesCapacity.get();
                });
                return;
            }
            int i4 = i3;
            probeBuilder.withTag("ordinal", i3 == this.emittedCounts.length() - 1 ? "snapshot" : String.valueOf(i3)).register((ProbeBuilder) this, "emittedCount", ProbeLevel.INFO, ProbeUnit.COUNT, (LongProbeFunction<ProbeBuilder>) processorTasklet11 -> {
                return processorTasklet11.emittedCounts.get(i4);
            });
            i3++;
        }
    }

    private OutboxImpl createOutbox(@Nonnull OutboundCollector outboundCollector) {
        OutboundCollector[] outboundCollectorArr = new OutboundCollector[this.outstreams.length + 1];
        for (int i = 0; i < this.outstreams.length; i++) {
            outboundCollectorArr[i] = this.outstreams[i].getCollector();
        }
        outboundCollectorArr[this.outstreams.length] = outboundCollector;
        return new OutboxImpl(outboundCollectorArr, true, this.progTracker, this.serializationService, OUTBOX_BATCH_SIZE, this.emittedCounts);
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet
    public void init() {
        if (this.serializationService.getManagedContext() != null) {
            Processor wrapped = this.processor instanceof ProcessorWrapper ? ((ProcessorWrapper) this.processor).getWrapped() : this.processor;
            Object initialize = this.serializationService.getManagedContext().initialize(wrapped);
            if (!$assertionsDisabled && initialize != wrapped) {
                throw new AssertionError("different object returned");
            }
        }
        try {
            this.processor.init(this.outbox, this.context);
        } catch (Exception e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet
    @Nonnull
    public ProgressState call() {
        if (!$assertionsDisabled && this.processorClosed) {
            throw new AssertionError("processor closed");
        }
        this.progTracker.reset();
        this.outbox.reset();
        stateMachineStep();
        ProgressState progressState = this.progTracker.toProgressState();
        if (progressState.isDone()) {
            closeProcessor();
            this.processorClosed = true;
        }
        return progressState;
    }

    private void closeProcessor() {
        if (!$assertionsDisabled && this.processorClosed) {
            throw new AssertionError("processor already closed");
        }
        try {
            this.processor.close();
        } catch (Exception e) {
            this.logger.severe(Util.jobNameAndExecutionId(this.context.jobConfig().getName(), this.context.executionId()) + " encountered an exception in Processor.close(), ignoring it", e);
        }
    }

    private void stateMachineStep() {
        switch (this.state) {
            case PROCESS_WATERMARK:
                this.progTracker.notDone();
                if (this.pendingWatermark == null) {
                    long checkWmHistory = this.watermarkCoalescer.checkWmHistory();
                    if (checkWmHistory == Long.MIN_VALUE) {
                        this.state = ProcessorState.PROCESS_INBOX;
                        stateMachineStep();
                        return;
                    }
                    this.pendingWatermark = new Watermark(checkWmHistory);
                }
                if (this.pendingWatermark.equals(WatermarkCoalescer.IDLE_MESSAGE)) {
                    if (!this.outbox.offer(WatermarkCoalescer.IDLE_MESSAGE)) {
                        return;
                    }
                } else if (!this.processor.tryProcessWatermark(this.pendingWatermark)) {
                    return;
                }
                this.state = ProcessorState.PROCESS_INBOX;
                this.pendingWatermark = null;
                stateMachineStep();
                return;
            case PROCESS_INBOX:
                this.progTracker.notDone();
                if (this.inbox.isEmpty()) {
                    if (!isSnapshotInbox() && !this.processor.tryProcess()) {
                        return;
                    }
                    if (!$assertionsDisabled && this.outbox.hasUnfinishedItem()) {
                        throw new AssertionError(isSnapshotInbox() ? "Unfinished item before fillInbox call" : "Processor.tryProcess() returned true, but there's unfinished item in the outbox");
                    }
                    fillInbox();
                }
                if (!this.inbox.isEmpty()) {
                    if (isSnapshotInbox()) {
                        this.processor.restoreFromSnapshot(this.inbox);
                    } else {
                        this.processor.process(this.currInstream.ordinal(), this.inbox);
                    }
                }
                if (this.inbox.isEmpty()) {
                    if (this.currInstream != null && this.currInstream.isDone()) {
                        this.state = ProcessorState.COMPLETE_EDGE;
                        this.progTracker.madeProgress();
                        return;
                    } else if (this.numActiveOrdinals > 0 && this.receivedBarriers.cardinality() == this.numActiveOrdinals) {
                        this.state = ProcessorState.SAVE_SNAPSHOT;
                        return;
                    } else if (this.numActiveOrdinals != 0) {
                        this.state = ProcessorState.PROCESS_WATERMARK;
                        return;
                    } else {
                        this.progTracker.madeProgress();
                        this.state = ProcessorState.COMPLETE;
                        return;
                    }
                }
                return;
            case COMPLETE_EDGE:
                this.progTracker.notDone();
                if (isSnapshotInbox()) {
                    if (!this.processor.finishSnapshotRestore()) {
                        return;
                    }
                } else if (!this.processor.completeEdge(this.currInstream.ordinal())) {
                    return;
                }
                if (!$assertionsDisabled && this.outbox.hasUnfinishedItem()) {
                    throw new AssertionError("outbox has unfinished item after successful completeEdge() or finishSnapshotRestore()");
                }
                this.progTracker.madeProgress();
                this.state = initialProcessingState();
                return;
            case SAVE_SNAPSHOT:
                this.progTracker.notDone();
                if (this.processor.saveToSnapshot()) {
                    this.progTracker.madeProgress();
                    this.state = ProcessorState.EMIT_BARRIER;
                    return;
                }
                return;
            case EMIT_BARRIER:
                if (!$assertionsDisabled && this.currentBarrier == null) {
                    throw new AssertionError("currentBarrier == null");
                }
                if (this.outbox.offerToEdgesAndSnapshot(this.currentBarrier)) {
                    this.progTracker.madeProgress();
                    if (this.currentBarrier.isTerminal()) {
                        this.state = ProcessorState.EMIT_DONE_ITEM;
                    } else {
                        this.currentBarrier = null;
                        this.receivedBarriers.clear();
                        this.pendingSnapshotId++;
                        this.state = initialProcessingState();
                    }
                }
                this.progTracker.notDone();
                return;
            case COMPLETE:
                this.progTracker.notDone();
                long activeSnapshotId = this.ssContext.activeSnapshotId();
                if (!$assertionsDisabled && activeSnapshotId > this.pendingSnapshotId) {
                    throw new AssertionError("Unexpected new snapshot id " + activeSnapshotId + ", current was" + this.pendingSnapshotId);
                }
                if (activeSnapshotId == this.pendingSnapshotId) {
                    if (!this.outbox.hasUnfinishedItem()) {
                        this.outbox.unblock();
                        this.state = ProcessorState.SAVE_SNAPSHOT;
                        this.currentBarrier = new SnapshotBarrier(activeSnapshotId, this.ssContext.isTerminalSnapshot());
                        this.progTracker.madeProgress();
                        return;
                    }
                    this.outbox.block();
                }
                if (this.processor.complete()) {
                    this.progTracker.madeProgress();
                    this.state = ProcessorState.EMIT_DONE_ITEM;
                    return;
                }
                return;
            case EMIT_DONE_ITEM:
                if (this.outbox.offerToEdgesAndSnapshot(DoneItem.DONE_ITEM)) {
                    this.state = ProcessorState.END;
                    return;
                } else {
                    this.progTracker.notDone();
                    return;
                }
            default:
                throw new JetException("Unexpected state: " + this.state);
        }
    }

    private void fillInbox() {
        if (!$assertionsDisabled && !this.inbox.isEmpty()) {
            throw new AssertionError("inbox is not empty");
        }
        if (!$assertionsDisabled && this.pendingWatermark != null) {
            throw new AssertionError("null wm expected, but was " + this.pendingWatermark);
        }
        if (this.instreamCursor == null) {
            return;
        }
        InboundEdgeStream value = this.instreamCursor.value();
        while (true) {
            this.currInstream = this.instreamCursor.value();
            ProgressState progressState = ProgressState.NO_PROGRESS;
            if (!this.waitForAllBarriers || !this.receivedBarriers.get(this.currInstream.ordinal())) {
                progressState = this.currInstream.drainTo(this.addToInboxFunction);
                this.progTracker.madeProgress(progressState.isMadeProgress());
                Object peekLast = this.inbox.queue().peekLast();
                if (peekLast instanceof Watermark) {
                    long observeWm = this.watermarkCoalescer.observeWm(this.currInstream.ordinal(), ((Watermark) this.inbox.queue().removeLast()).timestamp());
                    if (observeWm != Long.MIN_VALUE) {
                        this.pendingWatermark = new Watermark(observeWm);
                    }
                } else if (peekLast instanceof SnapshotBarrier) {
                    observeBarrier(this.currInstream.ordinal(), (SnapshotBarrier) this.inbox.queue().removeLast());
                } else if (peekLast != null && !(peekLast instanceof BroadcastItem)) {
                    this.watermarkCoalescer.observeEvent(this.currInstream.ordinal());
                }
                if (progressState.isDone()) {
                    this.receivedBarriers.clear(this.currInstream.ordinal());
                    long queueDone = this.watermarkCoalescer.queueDone(this.currInstream.ordinal());
                    if (queueDone != Long.MIN_VALUE) {
                        if (!$assertionsDisabled && this.pendingWatermark != null && this.pendingWatermark.timestamp() >= queueDone) {
                            throw new AssertionError("trying to assign lower WM. Old=" + this.pendingWatermark.timestamp() + ", new=" + queueDone);
                        }
                        this.pendingWatermark = new Watermark(queueDone);
                    }
                    this.instreamCursor.remove();
                    this.numActiveOrdinals--;
                }
                if (!this.instreamCursor.advance()) {
                    this.instreamCursor = popInstreamGroup();
                    break;
                }
            } else {
                this.instreamCursor.advance();
            }
            if (progressState.isMadeProgress() || this.instreamCursor.value() == value) {
                break;
            }
        }
        Util.lazyAdd(this.receivedCounts, this.currInstream.ordinal(), this.inbox.size());
        if (!this.inbox.isEmpty()) {
            Util.lazyIncrement(this.receivedBatches, this.currInstream.ordinal());
        }
        this.queuesCapacity.lazySet(this.instreamCursor == null ? 0L : Util.sum(this.instreamCursor.getList(), (v0) -> {
            return v0.capacities();
        }));
        this.queuesSize.lazySet(this.instreamCursor == null ? 0L : Util.sum(this.instreamCursor.getList(), (v0) -> {
            return v0.sizes();
        }));
    }

    private CircularListCursor<InboundEdgeStream> popInstreamGroup() {
        return (CircularListCursor) Optional.ofNullable(this.instreamGroupQueue.poll()).map((v1) -> {
            return new CircularListCursor(v1);
        }).orElse(null);
    }

    public String toString() {
        return "ProcessorTasklet{" + (this.context.jobConfig().getName() == null ? "" : this.context.jobConfig().getName() + "/") + this.context.vertexName() + '#' + this.context.globalProcessorIndex() + '}';
    }

    private void observeBarrier(int i, SnapshotBarrier snapshotBarrier) {
        if (snapshotBarrier.snapshotId() != this.pendingSnapshotId) {
            throw new JetException("Unexpected snapshot barrier ID " + snapshotBarrier.snapshotId() + " from ordinal " + i + " expected " + this.pendingSnapshotId);
        }
        this.currentBarrier = snapshotBarrier;
        if (snapshotBarrier.isTerminal()) {
            this.waitForAllBarriers = true;
        }
        this.receivedBarriers.set(i);
    }

    private ProcessorState initialProcessingState() {
        return this.pendingWatermark != null ? ProcessorState.PROCESS_WATERMARK : this.instreamCursor == null ? ProcessorState.COMPLETE : ProcessorState.PROCESS_INBOX;
    }

    private boolean isSnapshotInbox() {
        return this.currInstream != null && this.currInstream.priority() == Integer.MIN_VALUE;
    }

    private long lastForwardedWmLatency() {
        long lastForwardedWm = this.outbox.lastForwardedWm();
        if (lastForwardedWm == WatermarkCoalescer.IDLE_MESSAGE.timestamp()) {
            return Long.MIN_VALUE;
        }
        if (lastForwardedWm == Long.MIN_VALUE) {
            return Long.MAX_VALUE;
        }
        return System.currentTimeMillis() - lastForwardedWm;
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet
    public boolean isCooperative() {
        return this.processor.isCooperative();
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet
    public void close() {
        if (this.processorClosed) {
            return;
        }
        closeProcessor();
        this.processorClosed = true;
    }

    static {
        $assertionsDisabled = !ProcessorTasklet.class.desiredAssertionStatus();
    }
}
