/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.impl.execution.BroadcastItem;
import com.hazelcast.jet.impl.execution.OutboundCollector;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.util.Preconditions;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;

public class OutboxImpl
implements Outbox {
    private final OutboundCollector[] outstreams;
    private final ProgressTracker progTracker;
    private final SerializationService serializationService;
    private final int batchSize;
    private final AtomicLongArray counters;
    private final int[] singleEdge = new int[]{0};
    private final int[] allEdges;
    private final int[] allEdgesAndSnapshot;
    private final int[] snapshotEdge;
    private final BitSet broadcastTracker;
    private Map.Entry<Data, Data> pendingSnapshotEntry;
    private int numRemainingInBatch;
    private Object unfinishedItem;
    private int[] unfinishedItemOrdinals;
    private Object unfinishedSnapshotKey;
    private Object unfinishedSnapshotValue;

    public OutboxImpl(OutboundCollector[] outstreams, boolean hasSnapshot, ProgressTracker progTracker, SerializationService serializationService, int batchSize, AtomicLongArray counters) {
        int[] nArray;
        this.outstreams = outstreams;
        this.progTracker = progTracker;
        this.serializationService = serializationService;
        this.batchSize = batchSize;
        this.counters = counters;
        Preconditions.checkPositive(batchSize, "batchSize must be positive");
        this.allEdges = IntStream.range(0, outstreams.length - (hasSnapshot ? 1 : 0)).toArray();
        this.allEdgesAndSnapshot = IntStream.range(0, outstreams.length).toArray();
        if (hasSnapshot) {
            int[] nArray2 = new int[1];
            nArray = nArray2;
            nArray2[0] = outstreams.length - 1;
        } else {
            nArray = null;
        }
        this.snapshotEdge = nArray;
        this.broadcastTracker = new BitSet(outstreams.length);
    }

    @Override
    public final int bucketCount() {
        return this.allEdges.length;
    }

    @Override
    public final boolean offer(int ordinal, @Nonnull Object item) {
        if (ordinal == -1) {
            return this.offerInternal(this.allEdges, item);
        }
        if (ordinal == this.bucketCount()) {
            throw new IllegalArgumentException("Illegal edge ordinal: " + ordinal);
        }
        this.singleEdge[0] = ordinal;
        return this.offerInternal(this.singleEdge, item);
    }

    @Override
    public final boolean offer(@Nonnull int[] ordinals, @Nonnull Object item) {
        assert (this.snapshotEdge == null || com.hazelcast.jet.impl.util.Util.arrayIndexOf(this.snapshotEdge[0], ordinals) < 0) : "Ordinal " + this.snapshotEdge[0] + " is out of range";
        return this.offerInternal(ordinals, item);
    }

    private boolean offerInternal(@Nonnull int[] ordinals, @Nonnull Object item) {
        assert (this.unfinishedItem == null || item.equals(this.unfinishedItem)) : "Different item offered after previous call returned false: expected=" + this.unfinishedItem + ", got=" + item;
        assert (this.unfinishedItemOrdinals == null || Arrays.equals(this.unfinishedItemOrdinals, ordinals)) : "Offered to different ordinals after previous call returned false: expected=" + Arrays.toString(this.unfinishedItemOrdinals) + ", got=" + Arrays.toString(ordinals);
        assert (this.numRemainingInBatch != -1) : "Outbox.offer() called again after it returned false, without a call to reset(). You probably didn't return from Processor method after Outbox.offer() or AbstractProcessor.tryEmit() returned false";
        --this.numRemainingInBatch;
        boolean done = true;
        if (this.numRemainingInBatch == -1) {
            done = false;
        } else {
            if (ordinals.length == 0) {
                this.progTracker.madeProgress();
            }
            for (int i = 0; i < ordinals.length; ++i) {
                if (this.broadcastTracker.get(i)) continue;
                ProgressState result = this.doOffer(this.outstreams[ordinals[i]], item);
                if (result.isMadeProgress()) {
                    this.progTracker.madeProgress();
                }
                if (result.isDone()) {
                    this.broadcastTracker.set(i);
                    if (item instanceof BroadcastItem) continue;
                    com.hazelcast.jet.impl.util.Util.lazyIncrement(this.counters, ordinals[i]);
                    continue;
                }
                done = false;
            }
        }
        if (done) {
            this.broadcastTracker.clear();
            this.unfinishedItem = null;
            this.unfinishedItemOrdinals = null;
        } else {
            this.numRemainingInBatch = -1;
            this.unfinishedItem = item;
            assert ((this.unfinishedItemOrdinals = Arrays.copyOf(ordinals, ordinals.length)) != null);
        }
        return done;
    }

    @Override
    public final boolean offer(@Nonnull Object item) {
        return this.offerInternal(this.allEdges, item);
    }

    @Override
    public final boolean offerToSnapshot(@Nonnull Object key, @Nonnull Object value) {
        boolean success;
        if (this.snapshotEdge == null) {
            throw new IllegalStateException("Outbox does not have snapshot queue");
        }
        assert (this.unfinishedSnapshotKey == null || this.unfinishedSnapshotKey.equals(key)) : "Different key offered after previous call returned false: expected=" + this.unfinishedSnapshotKey + ", got=" + key;
        assert (this.unfinishedSnapshotValue == null || this.unfinishedSnapshotValue.equals(value)) : "Different value offered after previous call returned false: expected=" + this.unfinishedSnapshotValue + ", got=" + value;
        if (this.pendingSnapshotEntry == null) {
            Object sKey = this.serializationService.toData(key);
            Object sValue = this.serializationService.toData(value);
            this.pendingSnapshotEntry = Util.entry(sKey, sValue);
        }
        if (success = this.offerInternal(this.snapshotEdge, this.pendingSnapshotEntry)) {
            this.pendingSnapshotEntry = null;
            this.unfinishedSnapshotKey = null;
            this.unfinishedSnapshotValue = null;
        } else {
            this.unfinishedSnapshotKey = key;
            this.unfinishedSnapshotValue = value;
        }
        return success;
    }

    public void reset() {
        this.numRemainingInBatch = this.batchSize;
    }

    private ProgressState doOffer(OutboundCollector collector, Object item) {
        if (item instanceof BroadcastItem) {
            return collector.offerBroadcast((BroadcastItem)item);
        }
        return collector.offer(item);
    }

    final boolean offerToEdgesAndSnapshot(Object item) {
        return this.offerInternal(this.allEdgesAndSnapshot, item);
    }
}

