package com.hazelcast.jet.impl.processor;

import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.ResettableSingletonTraverser;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.function.BiFunctionEx;
import com.hazelcast.jet.pipeline.ContextFactory;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* loaded from: input_file:com/hazelcast/jet/impl/processor/AsyncTransformUsingContextOrderedP.class */
public final class AsyncTransformUsingContextOrderedP<C, T, R> extends AbstractProcessor {
    private final ContextFactory<C> contextFactory;
    private final BiFunctionEx<? super C, ? super T, CompletableFuture<Traverser<R>>> callAsyncFn;
    private C contextObject;
    private ArrayDeque<Object> queue;
    private int maxAsyncOps;
    private boolean tryProcessSucceeded;
    static final /* synthetic */ boolean $assertionsDisabled;
    private Traverser<?> currentTraverser = Traversers.empty();
    private ResettableSingletonTraverser<Watermark> watermarkTraverser = new ResettableSingletonTraverser<>();

    @Probe(name = "numInFlightOps")
    private final AtomicInteger asyncOpsCounterMetric = new AtomicInteger();

    private AsyncTransformUsingContextOrderedP(@Nonnull ContextFactory<C> contextFactory, @Nullable C c, @Nonnull BiFunctionEx<? super C, ? super T, CompletableFuture<Traverser<R>>> biFunctionEx) {
        this.contextFactory = contextFactory;
        this.callAsyncFn = biFunctionEx;
        this.contextObject = c;
        if ($assertionsDisabled) {
            return;
        }
        if (!((c == null) ^ contextFactory.hasLocalSharing())) {
            throw new AssertionError("if contextObject is shared, it must be non-null, or vice versa");
        }
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean isCooperative() {
        return this.contextFactory.isCooperative();
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected void init(@Nonnull Processor.Context context) {
        if (!this.contextFactory.hasLocalSharing()) {
            if (!$assertionsDisabled && this.contextObject != null) {
                throw new AssertionError("contextObject is not null: " + this.contextObject);
            }
            this.contextObject = this.contextFactory.createFn().apply(context.jetInstance());
        }
        this.maxAsyncOps = this.contextFactory.maxPendingCallsPerProcessor();
        this.queue = new ArrayDeque<>(this.maxAsyncOps);
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected boolean tryProcess(int i, @Nonnull Object obj) {
        if (this.queue.size() == this.maxAsyncOps) {
            tryFlushQueue();
            return false;
        }
        CompletableFuture<Traverser<R>> apply = this.callAsyncFn.apply(this.contextObject, obj);
        if (apply == null) {
            return true;
        }
        this.queue.add(Tuple2.tuple2(obj, apply));
        return true;
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor, com.hazelcast.jet.core.Processor
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        tryFlushQueue();
        return this.queue.size() < this.maxAsyncOps && !getOutbox().hasUnfinishedItem() && this.queue.add(watermark);
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean tryProcess() {
        if (this.tryProcessSucceeded) {
            tryFlushQueue();
        } else {
            emitFromTraverser(this.currentTraverser);
        }
        this.asyncOpsCounterMetric.lazySet(this.queue.size());
        boolean z = !getOutbox().hasUnfinishedItem();
        this.tryProcessSucceeded = z;
        return z;
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean complete() {
        return tryFlushQueue();
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean saveToSnapshot() {
        return tryFlushQueue();
    }

    @Override // com.hazelcast.jet.core.Processor
    public void close() {
        if (this.contextObject != null && !this.contextFactory.hasLocalSharing()) {
            this.contextFactory.destroyFn().accept(this.contextObject);
        }
        this.contextObject = null;
    }

    private boolean tryFlushQueue() {
        while (emitFromTraverser(this.currentTraverser)) {
            Object peek = this.queue.peek();
            if (peek == null) {
                return true;
            }
            if (peek instanceof Watermark) {
                this.watermarkTraverser.accept((Watermark) peek);
                this.currentTraverser = this.watermarkTraverser;
            } else {
                CompletableFuture completableFuture = (CompletableFuture) ((Tuple2) peek).f1();
                if (!completableFuture.isDone()) {
                    return false;
                }
                try {
                    this.currentTraverser = (Traverser) completableFuture.get();
                    if (this.currentTraverser == null) {
                        this.currentTraverser = Traversers.empty();
                    }
                } catch (Throwable th) {
                    throw new JetException("Async operation completed exceptionally: " + th, th);
                }
            }
            this.queue.remove();
        }
        return false;
    }

    public static <C, T, R> ProcessorSupplier supplier(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiFunctionEx<? super C, ? super T, CompletableFuture<Traverser<R>>> biFunctionEx) {
        return ProcessorSupplierWithContext.supplierWithContext(contextFactory, (contextFactory2, obj) -> {
            return new AsyncTransformUsingContextOrderedP(contextFactory2, obj, biFunctionEx);
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 747909036:
                if (implMethodName.equals("lambda$supplier$70362fc7$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/BiFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/AsyncTransformUsingContextOrderedP") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/function/BiFunctionEx;Lcom/hazelcast/jet/pipeline/ContextFactory;Ljava/lang/Object;)Lcom/hazelcast/jet/core/Processor;")) {
                    BiFunctionEx biFunctionEx = (BiFunctionEx) serializedLambda.getCapturedArg(0);
                    return (contextFactory2, obj) -> {
                        return new AsyncTransformUsingContextOrderedP(contextFactory2, obj, biFunctionEx);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !AsyncTransformUsingContextOrderedP.class.desiredAssertionStatus();
    }
}
