/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.util.concurrent.ManyToOneConcurrentArrayQueue;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.datamodel.Tuple3;
import com.hazelcast.jet.function.BiFunctionEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.impl.processor.ProcessorSupplierWithContext;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.pipeline.ContextFactory;
import java.util.ArrayDeque;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public final class AsyncTransformUsingContextUnorderedP<C, T, K, R>
extends AbstractProcessor {
    private final ContextFactory<C> contextFactory;
    private final BiFunctionEx<? super C, ? super T, CompletableFuture<Traverser<R>>> callAsyncFn;
    private final Function<? super T, ? extends K> extractKeyFn;
    private C contextObject;
    private ManyToOneConcurrentArrayQueue<Tuple3<T, Long, Object>> resultQueue;
    private final SortedMap<Long, Long> watermarkCounts = new TreeMap<Long, Long>();
    private final Map<T, Integer> inFlightItems = new IdentityHashMap<T, Integer>();
    private Traverser<Object> currentTraverser = Traversers.empty();
    private Traverser<Map.Entry> snapshotTraverser;
    private boolean tryProcessSucceeded;
    private Long lastReceivedWm = Long.MIN_VALUE;
    private long lastEmittedWm = Long.MIN_VALUE;
    private long minRestoredWm = Long.MAX_VALUE;
    private int maxAsyncOps;
    private int asyncOpsCounter;
    private ArrayDeque<T> restoredObjects = new ArrayDeque();
    @Probe(name="numInFlightOps")
    private final AtomicInteger asyncOpsCounterMetric = new AtomicInteger();

    private AsyncTransformUsingContextUnorderedP(@Nonnull ContextFactory<C> contextFactory, @Nullable C contextObject, @Nonnull BiFunctionEx<? super C, ? super T, CompletableFuture<Traverser<R>>> callAsyncFn, @Nonnull Function<? super T, ? extends K> extractKeyFn) {
        assert (contextObject == null ^ contextFactory.hasLocalSharing()) : "if contextObject is shared, it must be non-null, or vice versa";
        this.contextFactory = contextFactory;
        this.callAsyncFn = callAsyncFn;
        this.contextObject = contextObject;
        this.extractKeyFn = extractKeyFn;
    }

    @Override
    public boolean isCooperative() {
        return this.contextFactory.isCooperative();
    }

    @Override
    protected void init(@Nonnull Processor.Context context) {
        if (!this.contextFactory.hasLocalSharing()) {
            assert (this.contextObject == null) : "contextObject is not null: " + this.contextObject;
            this.contextObject = this.contextFactory.createFn().apply(context.jetInstance());
        }
        this.maxAsyncOps = this.contextFactory.maxPendingCallsPerProcessor();
        this.resultQueue = new ManyToOneConcurrentArrayQueue(this.maxAsyncOps);
    }

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        if (this.getOutbox().hasUnfinishedItem() && !this.emitFromTraverser(this.currentTraverser)) {
            return false;
        }
        this.asyncOpsCounterMetric.lazySet(this.asyncOpsCounter);
        Object castedItem = item;
        if (!this.processItem(castedItem)) {
            this.tryFlushQueue();
            return false;
        }
        return true;
    }

    @CheckReturnValue
    private boolean processItem(@Nonnull T item) {
        if (this.asyncOpsCounter == this.maxAsyncOps) {
            return false;
        }
        CompletableFuture<Traverser<R>> future = this.callAsyncFn.apply(this.contextObject, item);
        if (future == null) {
            return true;
        }
        ++this.asyncOpsCounter;
        this.watermarkCounts.merge(this.lastReceivedWm, 1L, Long::sum);
        Long lastWatermarkAtReceiveTime = this.lastReceivedWm;
        future.whenComplete(ExceptionUtil.withTryCatch(this.getLogger(), (r, e) -> this.resultQueue.add(Tuple3.tuple3(item, lastWatermarkAtReceiveTime, r != null ? r : e))));
        this.inFlightItems.merge(item, 1, Integer::sum);
        return true;
    }

    @Override
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        if (!this.emitFromTraverser(this.currentTraverser)) {
            return false;
        }
        assert (this.lastEmittedWm <= this.lastReceivedWm) : "lastEmittedWm=" + this.lastEmittedWm + ", lastReceivedWm=" + this.lastReceivedWm;
        if (watermark.timestamp() <= this.lastReceivedWm) {
            return true;
        }
        if (this.watermarkCounts.isEmpty()) {
            if (!this.tryEmit(watermark)) {
                return false;
            }
            this.lastEmittedWm = watermark.timestamp();
        }
        this.lastReceivedWm = watermark.timestamp();
        return true;
    }

    @Override
    public boolean tryProcess() {
        if (this.tryProcessSucceeded) {
            this.tryFlushQueue();
        } else {
            this.emitFromTraverser(this.currentTraverser);
        }
        this.tryProcessSucceeded = !this.getOutbox().hasUnfinishedItem();
        return this.tryProcessSucceeded;
    }

    @Override
    public boolean complete() {
        return this.tryFlushQueue();
    }

    @Override
    public boolean saveToSnapshot() {
        assert (this.restoredObjects.isEmpty()) : "restoredObjects not empty";
        if (!this.emitFromTraverser(this.currentTraverser)) {
            return false;
        }
        if (this.snapshotTraverser == null) {
            LoggingUtil.logFinest(this.getLogger(), "Saving to snapshot: %s, lastReceivedWm=%d", this.inFlightItems, this.lastReceivedWm);
            this.snapshotTraverser = Traversers.traverseIterable(this.inFlightItems.entrySet()).map(en -> Util.entry(this.extractKeyFn.apply(en.getKey()), Tuple2.tuple2(en.getKey(), en.getValue()))).append(Util.entry(BroadcastKey.broadcastKey(Keys.LAST_EMITTED_WM), this.lastReceivedWm)).onFirstNull(() -> {
                this.snapshotTraverser = null;
            });
        }
        return this.emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    @Override
    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        if (key instanceof BroadcastKey) {
            assert (((BroadcastKey)key).key().equals((Object)Keys.LAST_EMITTED_WM)) : "Unexpected key: " + key;
            this.minRestoredWm = Math.min(this.minRestoredWm, (Long)value);
            return;
        }
        Tuple2 value1 = (Tuple2)value;
        for (int i = 0; i < (Integer)value1.f1(); ++i) {
            this.restoredObjects.add(value1.f0());
            LoggingUtil.logFinest(this.getLogger(), "Restored: %s", value1.f0());
        }
    }

    @Override
    public boolean finishSnapshotRestore() {
        T t;
        while ((t = this.restoredObjects.peek()) != null && this.processItem(t)) {
            this.restoredObjects.remove();
        }
        if (this.restoredObjects.isEmpty()) {
            if (!this.emitFromTraverser(this.currentTraverser)) {
                return false;
            }
            this.restoredObjects = new ArrayDeque(0);
            this.lastReceivedWm = this.minRestoredWm;
            LoggingUtil.logFine(this.getLogger(), "restored lastReceivedWm=%s", this.minRestoredWm);
            return true;
        }
        this.tryFlushQueue();
        return false;
    }

    @Override
    public void close() {
        if (this.contextObject != null && !this.contextFactory.hasLocalSharing()) {
            this.contextFactory.destroyFn().accept(this.contextObject);
        }
        this.contextObject = null;
    }

    private boolean tryFlushQueue() {
        while (this.emitFromTraverser(this.currentTraverser)) {
            Tuple3<T, Long, Object> tuple = this.resultQueue.poll();
            if (tuple == null) {
                return this.watermarkCounts.isEmpty();
            }
            --this.asyncOpsCounter;
            Integer inFlightItemsCount = this.inFlightItems.merge(tuple.f0(), -1, (o, n) -> o == 1 ? null : Integer.valueOf(o + n));
            assert (inFlightItemsCount == null || inFlightItemsCount > 0) : "inFlightItemsCount=" + inFlightItemsCount;
            Long count = this.watermarkCounts.merge(tuple.f1(), -1L, Long::sum);
            assert (count >= 0L) : "count=" + count;
            if (tuple.f2() instanceof Throwable) {
                throw new JetException("Async operation completed exceptionally: " + tuple.f2(), (Throwable)tuple.f2());
            }
            this.currentTraverser = (Traverser)tuple.f2();
            if (this.currentTraverser == null) {
                this.currentTraverser = Traversers.empty();
            }
            if (count > 0L) continue;
            long wmToEmit = Long.MIN_VALUE;
            Iterator<Map.Entry<Long, Long>> it = this.watermarkCounts.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Long, Long> entry = it.next();
                if (entry.getValue() != 0L) {
                    wmToEmit = entry.getKey();
                    break;
                }
                it.remove();
            }
            if (this.watermarkCounts.isEmpty() && this.lastReceivedWm > this.lastEmittedWm) {
                wmToEmit = this.lastReceivedWm;
            }
            if (wmToEmit <= Long.MIN_VALUE || wmToEmit <= this.lastEmittedWm) continue;
            this.lastEmittedWm = wmToEmit;
            this.currentTraverser = this.currentTraverser.append(new Watermark(wmToEmit));
        }
        return false;
    }

    public static <C, T, K, R> ProcessorSupplier supplier(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiFunctionEx<? super C, ? super T, CompletableFuture<Traverser<R>>> callAsyncFn, @Nonnull FunctionEx<? super T, ? extends K> extractKeyFn) {
        return ProcessorSupplierWithContext.supplierWithContext(contextFactory, (ctxF, ctxO) -> new AsyncTransformUsingContextUnorderedP((ContextFactory<Object>)ctxF, ctxO, callAsyncFn, extractKeyFn));
    }

    private static enum Keys {
        LAST_EMITTED_WM;

    }
}

