/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.ResettableSingletonTraverser;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.function.BiFunctionEx;
import com.hazelcast.jet.impl.processor.ProcessorSupplierWithContext;
import com.hazelcast.jet.pipeline.ContextFactory;
import java.util.ArrayDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public final class AsyncTransformUsingContextOrderedP<C, T, R>
extends AbstractProcessor {
    private final ContextFactory<C> contextFactory;
    private final BiFunctionEx<? super C, ? super T, CompletableFuture<Traverser<R>>> callAsyncFn;
    private C contextObject;
    private ArrayDeque<Object> queue;
    private Traverser<?> currentTraverser = Traversers.empty();
    private int maxAsyncOps;
    private ResettableSingletonTraverser<Watermark> watermarkTraverser = new ResettableSingletonTraverser();
    private boolean tryProcessSucceeded;
    @Probe(name="numInFlightOps")
    private final AtomicInteger asyncOpsCounterMetric = new AtomicInteger();

    private AsyncTransformUsingContextOrderedP(@Nonnull ContextFactory<C> contextFactory, @Nullable C contextObject, @Nonnull BiFunctionEx<? super C, ? super T, CompletableFuture<Traverser<R>>> callAsyncFn) {
        this.contextFactory = contextFactory;
        this.callAsyncFn = callAsyncFn;
        this.contextObject = contextObject;
        assert (contextObject == null ^ contextFactory.hasLocalSharing()) : "if contextObject is shared, it must be non-null, or vice versa";
    }

    @Override
    public boolean isCooperative() {
        return this.contextFactory.isCooperative();
    }

    @Override
    protected void init(@Nonnull Processor.Context context) {
        if (!this.contextFactory.hasLocalSharing()) {
            assert (this.contextObject == null) : "contextObject is not null: " + this.contextObject;
            this.contextObject = this.contextFactory.createFn().apply(context.jetInstance());
        }
        this.maxAsyncOps = this.contextFactory.maxPendingCallsPerProcessor();
        this.queue = new ArrayDeque(this.maxAsyncOps);
    }

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        if (this.queue.size() == this.maxAsyncOps) {
            this.tryFlushQueue();
            return false;
        }
        Object castedItem = item;
        CompletableFuture<Traverser<R>> future = this.callAsyncFn.apply(this.contextObject, castedItem);
        if (future != null) {
            this.queue.add(Tuple2.tuple2(castedItem, future));
        }
        return true;
    }

    @Override
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        this.tryFlushQueue();
        return this.queue.size() < this.maxAsyncOps && !this.getOutbox().hasUnfinishedItem() && this.queue.add(watermark);
    }

    @Override
    public boolean tryProcess() {
        if (this.tryProcessSucceeded) {
            this.tryFlushQueue();
        } else {
            this.emitFromTraverser(this.currentTraverser);
        }
        this.asyncOpsCounterMetric.lazySet(this.queue.size());
        this.tryProcessSucceeded = !this.getOutbox().hasUnfinishedItem();
        return this.tryProcessSucceeded;
    }

    @Override
    public boolean complete() {
        return this.tryFlushQueue();
    }

    @Override
    public boolean saveToSnapshot() {
        return this.tryFlushQueue();
    }

    @Override
    public void close() {
        if (this.contextObject != null && !this.contextFactory.hasLocalSharing()) {
            this.contextFactory.destroyFn().accept(this.contextObject);
        }
        this.contextObject = null;
    }

    private boolean tryFlushQueue() {
        while (this.emitFromTraverser(this.currentTraverser)) {
            Object o = this.queue.peek();
            if (o == null) {
                return true;
            }
            if (o instanceof Watermark) {
                this.watermarkTraverser.accept((Watermark)o);
                this.currentTraverser = this.watermarkTraverser;
            } else {
                CompletableFuture f = (CompletableFuture)((Tuple2)o).f1();
                if (!f.isDone()) {
                    return false;
                }
                try {
                    this.currentTraverser = (Traverser)f.get();
                    if (this.currentTraverser == null) {
                        this.currentTraverser = Traversers.empty();
                    }
                }
                catch (Throwable e) {
                    throw new JetException("Async operation completed exceptionally: " + e, e);
                }
            }
            this.queue.remove();
        }
        return false;
    }

    public static <C, T, R> ProcessorSupplier supplier(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiFunctionEx<? super C, ? super T, CompletableFuture<Traverser<R>>> callAsyncFn) {
        return ProcessorSupplierWithContext.supplierWithContext(contextFactory, (ctxF, ctxO) -> new AsyncTransformUsingContextOrderedP((ContextFactory<Object>)ctxF, ctxO, callAsyncFn));
    }
}

