package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.Processor;
import com.hazelcast.jet.Watermark;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.util.ArrayDequeOutbox;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.util.Preconditions;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/CooperativeProcessorTasklet.class */
public class CooperativeProcessorTasklet extends ProcessorTaskletBase {
    private final ArrayDequeOutbox outbox;
    private boolean processorCompleted;

    public CooperativeProcessorTasklet(Contexts.ProcCtx procCtx, Processor processor, List<InboundEdgeStream> list, List<OutboundEdgeStream> list2) {
        super(procCtx, processor, list, list2);
        Preconditions.checkTrue(processor.isCooperative(), "Processor is non-cooperative");
        this.outbox = new ArrayDequeOutbox(Stream.of((Object[]) this.outstreams).mapToInt((v0) -> {
            return v0.getOutboxCapacity();
        }).toArray(), this.progTracker);
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet
    public final boolean isCooperative() {
        return true;
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet
    public void init(CompletableFuture<Void> completableFuture) {
        initProcessor(this.outbox, completableFuture);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.hazelcast.jet.impl.execution.Tasklet, java.util.concurrent.Callable
    @Nonnull
    public ProgressState call() {
        this.progTracker.reset();
        if (!inbox().isEmpty()) {
            this.progTracker.notDone();
        } else {
            if (!this.processor.tryProcess()) {
                tryFlushOutbox();
                this.progTracker.notDone();
                return this.progTracker.toProgressState();
            }
            tryFillInbox();
        }
        if (this.progTracker.isDone()) {
            completeIfNeeded();
        } else if (!inbox().isEmpty()) {
            this.processor.process(this.currInstream.ordinal(), inbox());
        }
        tryFlushOutbox();
        return this.progTracker.toProgressState();
    }

    private void completeIfNeeded() {
        if (this.processorCompleted) {
            return;
        }
        this.processorCompleted = this.processor.complete();
        if (this.processorCompleted) {
            this.outbox.addIgnoringCapacity(DoneItem.DONE_ITEM);
        } else {
            this.progTracker.notDone();
        }
    }

    private void tryFlushOutbox() {
        for (int i = 0; i < this.outbox.bucketCount(); i++) {
            Queue<Object> queueWithOrdinal = this.outbox.queueWithOrdinal(i);
            while (true) {
                Object peek = queueWithOrdinal.peek();
                if (peek != null) {
                    OutboundCollector collector = this.outstreams[i].getCollector();
                    ProgressState offerBroadcast = ((peek instanceof Watermark) || (peek instanceof DoneItem)) ? collector.offerBroadcast(peek) : collector.offer(peek);
                    this.progTracker.madeProgress(offerBroadcast.isMadeProgress());
                    if (!offerBroadcast.isDone()) {
                        this.progTracker.notDone();
                        break;
                    }
                    queueWithOrdinal.remove();
                }
            }
        }
    }

    @Override // com.hazelcast.jet.impl.execution.ProcessorTaskletBase
    public /* bridge */ /* synthetic */ String toString() {
        return super.toString();
    }
}
