/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.ResettableSingletonTraverser;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.impl.processor.AbstractAsyncTransformUsingServiceP;
import com.hazelcast.jet.impl.processor.ProcessorSupplierWithService;
import com.hazelcast.jet.pipeline.ServiceFactory;
import java.util.ArrayDeque;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;

public class AsyncTransformUsingServiceOrderedP<C, S, T, R>
extends AbstractAsyncTransformUsingServiceP<C, S> {
    private final BiFunctionEx<? super S, ? super T, ? extends CompletableFuture<Traverser<R>>> callAsyncFn;
    private ArrayDeque<Object> queue;
    private int queuedWmCount;
    private Traverser<?> currentTraverser = Traversers.empty();
    private ResettableSingletonTraverser<Watermark> watermarkTraverser = new ResettableSingletonTraverser();
    @Probe(name="numInFlightOps")
    private final Counter asyncOpsCounterMetric = SwCounter.newSwCounter();

    AsyncTransformUsingServiceOrderedP(@Nonnull ServiceFactory<C, S> serviceFactory, @Nonnull C serviceContext, int maxConcurrentOps, @Nonnull BiFunctionEx<? super S, ? super T, ? extends CompletableFuture<Traverser<R>>> callAsyncFn) {
        super(serviceFactory, serviceContext, maxConcurrentOps, true);
        this.callAsyncFn = callAsyncFn;
    }

    @Override
    protected void init(@Nonnull Processor.Context context) throws Exception {
        super.init(context);
        this.queue = new ArrayDeque(this.maxConcurrentOps * 2);
    }

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        if (this.isQueueFull() && !this.tryFlushQueue()) {
            return false;
        }
        Object castItem = item;
        CompletableFuture<Traverser<R>> future = this.callAsyncFn.apply(this.service, castItem);
        if (future != null) {
            this.queue.add(Tuple2.tuple2(castItem, future));
        }
        return true;
    }

    boolean isQueueFull() {
        return this.queue.size() - this.queuedWmCount == this.maxConcurrentOps;
    }

    @Override
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        this.tryFlushQueue();
        if (this.queue.peekLast() instanceof Watermark) {
            this.queue.removeLast();
            this.queue.add(watermark);
        } else {
            this.queue.add(watermark);
            ++this.queuedWmCount;
        }
        return true;
    }

    @Override
    public boolean tryProcess() {
        this.tryFlushQueue();
        this.asyncOpsCounterMetric.set(this.queue.size());
        return true;
    }

    @Override
    public boolean complete() {
        return this.tryFlushQueue();
    }

    @Override
    public boolean saveToSnapshot() {
        return this.tryFlushQueue();
    }

    boolean tryFlushQueue() {
        while (this.emitFromTraverser(this.currentTraverser)) {
            Object o = this.queue.peek();
            if (o == null) {
                return true;
            }
            if (o instanceof Watermark) {
                this.watermarkTraverser.accept((Watermark)o);
                this.currentTraverser = this.watermarkTraverser;
                --this.queuedWmCount;
            } else {
                CompletableFuture f = (CompletableFuture)((Tuple2)o).f1();
                if (!f.isDone()) {
                    return false;
                }
                try {
                    this.currentTraverser = (Traverser)f.get();
                    if (this.currentTraverser == null) {
                        this.currentTraverser = Traversers.empty();
                    }
                }
                catch (Throwable e) {
                    throw new JetException("Async operation completed exceptionally: " + e, e);
                }
            }
            this.queue.remove();
        }
        return false;
    }

    public static <C, S, T, R> ProcessorSupplier supplier(@Nonnull ServiceFactory<C, S> serviceFactory, int maxConcurrentOps, @Nonnull BiFunctionEx<? super S, ? super T, ? extends CompletableFuture<Traverser<R>>> callAsyncFn) {
        return ProcessorSupplierWithService.supplierWithService(serviceFactory, (serviceFn, context) -> new AsyncTransformUsingServiceOrderedP(serviceFn, context, maxConcurrentOps, callAsyncFn));
    }
}

