package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.Inbox;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.impl.util.ArrayDequeOutbox;
import com.hazelcast.jet.impl.util.CircularListCursor;
import com.hazelcast.jet.impl.util.DoneItem;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.util.Preconditions;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/ProcessorTasklet.class */
public class ProcessorTasklet implements Tasklet {
    private final ArrayDequeInbox inbox = new ArrayDequeInbox();
    private final ProgressTracker progTracker = new ProgressTracker();
    private final Processor processor;
    private final Queue<ArrayList<InboundEdgeStream>> instreamGroupQueue;
    private final String vertexName;
    private final Processor.Context context;
    private CircularListCursor<InboundEdgeStream> instreamCursor;
    private final ArrayDequeOutbox outbox;
    private final OutboundEdgeStream[] outstreams;
    private InboundEdgeStream currInstream;
    private boolean currInstreamExhausted;
    private boolean processorCompleted;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/ProcessorTasklet$ArrayDequeInbox.class */
    public static final class ArrayDequeInbox extends ArrayDeque<Object> implements Inbox {
        private ArrayDequeInbox() {
        }
    }

    public ProcessorTasklet(String str, Processor.Context context, Processor processor, List<InboundEdgeStream> list, List<OutboundEdgeStream> list2) {
        Preconditions.checkNotNull(processor, "processor");
        this.vertexName = str;
        this.processor = processor;
        this.instreamGroupQueue = (Queue) ((TreeMap) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.priority();
        }, TreeMap::new, Collectors.toCollection(ArrayList::new)))).entrySet().stream().map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toCollection(ArrayDeque::new));
        this.outstreams = (OutboundEdgeStream[]) list2.stream().sorted(Comparator.comparing((v0) -> {
            return v0.ordinal();
        })).toArray(i -> {
            return new OutboundEdgeStream[i];
        });
        this.outbox = new ArrayDequeOutbox(list2.size(), Stream.of((Object[]) this.outstreams).mapToInt((v0) -> {
            return v0.getHighWaterMark();
        }).toArray());
        this.context = context;
        this.instreamCursor = popInstreamGroup();
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet
    public void init() {
        this.processor.init(this.outbox, this.context);
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet
    public boolean isCooperative() {
        return this.processor.isCooperative();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.hazelcast.jet.impl.execution.Tasklet, java.util.concurrent.Callable
    @Nonnull
    public ProgressState call() {
        this.progTracker.reset();
        tryFillInbox();
        if (this.progTracker.isDone()) {
            completeIfNeeded();
        } else if (!this.inbox.isEmpty()) {
            tryProcessInbox();
        } else if (this.currInstreamExhausted) {
            this.progTracker.madeProgress(true);
            if (this.processor.completeEdge(this.currInstream.ordinal())) {
                this.currInstream = null;
            }
        }
        tryFlushOutbox();
        return this.progTracker.toProgressState();
    }

    private CircularListCursor<InboundEdgeStream> popInstreamGroup() {
        return (CircularListCursor) Optional.ofNullable(this.instreamGroupQueue.poll()).map((v1) -> {
            return new CircularListCursor(v1);
        }).orElse(null);
    }

    private void tryFillInbox() {
        if (!this.inbox.isEmpty() || (this.currInstream != null && this.currInstreamExhausted)) {
            this.progTracker.notDone();
            return;
        }
        if (this.instreamCursor == null) {
            return;
        }
        this.progTracker.notDone();
        InboundEdgeStream value = this.instreamCursor.value();
        do {
            this.currInstream = this.instreamCursor.value();
            ProgressState drainTo = this.currInstream.drainTo(this.inbox);
            this.progTracker.madeProgress(drainTo.isMadeProgress());
            this.currInstreamExhausted = drainTo.isDone();
            if (this.currInstreamExhausted) {
                this.instreamCursor.remove();
            }
            if (!this.instreamCursor.advance()) {
                this.instreamCursor = popInstreamGroup();
                return;
            } else if (drainTo.isMadeProgress()) {
                return;
            }
        } while (this.instreamCursor.value() != value);
    }

    private void tryProcessInbox() {
        if (this.outbox.isHighWater()) {
            this.progTracker.notDone();
            return;
        }
        this.progTracker.madeProgress(true);
        this.processor.process(this.currInstream.ordinal(), this.inbox);
        if (this.inbox.isEmpty()) {
            return;
        }
        this.progTracker.notDone();
    }

    private void completeIfNeeded() {
        if (this.processorCompleted) {
            return;
        }
        if (this.outbox.isHighWater()) {
            this.progTracker.notDone();
            return;
        }
        this.progTracker.madeProgress(true);
        if (!this.processor.complete()) {
            this.progTracker.notDone();
            return;
        }
        this.processorCompleted = true;
        for (OutboundEdgeStream outboundEdgeStream : this.outstreams) {
            this.outbox.add(outboundEdgeStream.ordinal(), DoneItem.DONE_ITEM);
        }
    }

    private void tryFlushOutbox() {
        for (int i = 0; i < this.outbox.bucketCount(); i++) {
            Queue<Object> queueWithOrdinal = this.outbox.queueWithOrdinal(i);
            while (true) {
                Object peek = queueWithOrdinal.peek();
                if (peek == null) {
                    break;
                }
                ProgressState offer = peek != DoneItem.DONE_ITEM ? this.outstreams[i].getCollector().offer(peek) : this.outstreams[i].getCollector().close();
                this.progTracker.madeProgress(offer.isMadeProgress());
                if (!offer.isDone()) {
                    this.progTracker.notDone();
                    break;
                }
                queueWithOrdinal.remove();
            }
        }
    }

    public String toString() {
        return "ProcessorTasklet{vertex=" + this.vertexName + ", processor=" + this.processor + '}';
    }
}
