package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Outbox;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.Watermark;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.util.Preconditions;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/BlockingProcessorTasklet.class */
public class BlockingProcessorTasklet extends ProcessorTaskletBase {
    private final BlockingOutbox outbox;
    private CompletableFuture<?> jobFuture;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/BlockingProcessorTasklet$BlockingOutbox.class */
    public class BlockingOutbox implements Outbox {
        static final /* synthetic */ boolean $assertionsDisabled;

        private BlockingOutbox() {
        }

        @Override // com.hazelcast.jet.Outbox
        public int bucketCount() {
            return BlockingProcessorTasklet.this.outstreams.length;
        }

        @Override // com.hazelcast.jet.Outbox
        public boolean offer(int i, @Nonnull Object obj) {
            BlockingProcessorTasklet.this.progTracker.madeProgress();
            if (i != -1) {
                submit(BlockingProcessorTasklet.this.outstreams[i], obj);
                return true;
            }
            for (OutboundEdgeStream outboundEdgeStream : BlockingProcessorTasklet.this.outstreams) {
                submit(outboundEdgeStream, obj);
            }
            return true;
        }

        @Override // com.hazelcast.jet.Outbox
        public boolean offer(int[] iArr, @Nonnull Object obj) {
            BlockingProcessorTasklet.this.progTracker.madeProgress();
            for (int i : iArr) {
                submit(BlockingProcessorTasklet.this.outstreams[i], obj);
            }
            return true;
        }

        void add(@Nonnull Object obj) {
            boolean offer = BlockingProcessorTasklet.this.outbox.offer(obj);
            if (!$assertionsDisabled && !offer) {
                throw new AssertionError("Blocking outbox refused an item: " + obj);
            }
        }

        /* JADX WARN: Type inference failed for: r0v18, types: [com.hazelcast.util.concurrent.IdleStrategy] */
        private void submit(OutboundEdgeStream outboundEdgeStream, @Nonnull Object obj) {
            OutboundCollector collector = outboundEdgeStream.getCollector();
            long j = 0;
            while (true) {
                ProgressState offerBroadcast = ((obj instanceof Watermark) || (obj instanceof DoneItem)) ? collector.offerBroadcast(obj) : collector.offer(obj);
                if (offerBroadcast.isDone()) {
                    return;
                }
                if (BlockingProcessorTasklet.this.jobFuture.isDone()) {
                    throw new JobFutureCompleted();
                }
                if (offerBroadcast.isMadeProgress()) {
                    j = 0;
                } else {
                    ?? r0 = ExecutionService.IDLER;
                    long j2 = j + 1;
                    j = r0;
                    r0.idle(j2);
                }
            }
        }

        static {
            $assertionsDisabled = !BlockingProcessorTasklet.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/BlockingProcessorTasklet$JobFutureCompleted.class */
    public static class JobFutureCompleted extends RuntimeException {
        private JobFutureCompleted() {
        }
    }

    public BlockingProcessorTasklet(Contexts.ProcCtx procCtx, Processor processor, List<InboundEdgeStream> list, List<OutboundEdgeStream> list2) {
        super(procCtx, processor, list, list2);
        Preconditions.checkFalse(processor.isCooperative(), "Processor is cooperative");
        this.outbox = new BlockingOutbox();
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet
    public final boolean isCooperative() {
        return false;
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet
    public void init(CompletableFuture<Void> completableFuture) {
        super.init(completableFuture);
        this.jobFuture = completableFuture;
        initProcessor(this.outbox, completableFuture);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.hazelcast.jet.impl.execution.Tasklet, java.util.concurrent.Callable
    @Nonnull
    public ProgressState call() {
        try {
            this.progTracker.reset();
            if (inbox().isEmpty()) {
                callNullaryProcess();
                tryFillInbox();
            } else {
                this.progTracker.notDone();
            }
            if (this.progTracker.isDone()) {
                complete();
            } else if (!inbox().isEmpty()) {
                this.processor.process(this.currInstream.ordinal(), inbox());
            }
            return this.progTracker.toProgressState();
        } catch (JobFutureCompleted e) {
            return ProgressState.DONE;
        }
    }

    private void callNullaryProcess() {
        if (!this.processor.tryProcess()) {
            throw new JetException("Non-cooperative processor's tryProcess() returned false: " + this.processor);
        }
    }

    private void complete() {
        if (this.processor.complete()) {
            this.outbox.add(DoneItem.DONE_ITEM);
        } else {
            this.progTracker.notDone();
        }
    }

    @Override // com.hazelcast.jet.impl.execution.ProcessorTaskletBase
    public /* bridge */ /* synthetic */ String toString() {
        return super.toString();
    }
}
