package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.WatermarkEmissionPolicy;
import com.hazelcast.jet.core.WatermarkGenerationParams;
import com.hazelcast.jet.core.WatermarkPolicies;
import com.hazelcast.jet.datamodel.Tag;
import com.hazelcast.jet.function.DistributedBiFunction;
import com.hazelcast.jet.function.DistributedBiPredicate;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedPredicate;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.function.DistributedToLongFunction;
import com.hazelcast.jet.function.DistributedTriFunction;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.FilterTransform;
import com.hazelcast.jet.impl.pipeline.transform.FilterUsingContextTransform;
import com.hazelcast.jet.impl.pipeline.transform.FlatMapTransform;
import com.hazelcast.jet.impl.pipeline.transform.FlatMapUsingContextTransform;
import com.hazelcast.jet.impl.pipeline.transform.HashJoinTransform;
import com.hazelcast.jet.impl.pipeline.transform.MapTransform;
import com.hazelcast.jet.impl.pipeline.transform.MapUsingContextTransform;
import com.hazelcast.jet.impl.pipeline.transform.PeekTransform;
import com.hazelcast.jet.impl.pipeline.transform.ProcessorTransform;
import com.hazelcast.jet.impl.pipeline.transform.SinkTransform;
import com.hazelcast.jet.impl.pipeline.transform.StreamSourceTransform;
import com.hazelcast.jet.impl.pipeline.transform.TimestampTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.ContextFactory;
import com.hazelcast.jet.pipeline.JoinClause;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SinkStage;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.util.Preconditions;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/pipeline/ComputeStageImplBase.class */
public abstract class ComputeStageImplBase<T> extends AbstractStage {
    static final FunctionAdapter DONT_ADAPT = new FunctionAdapter();
    static final JetEventFunctionAdapter ADAPT_TO_JET_EVENT = new JetEventFunctionAdapter();
    private static final WatermarkEmissionPolicy THROWING_EMIT_POLICY = (j, j2) -> {
        throw new IllegalStateException("emit policy should have been replaced");
    };

    @Nonnull
    public FunctionAdapter fnAdapter;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ComputeStageImplBase(@Nonnull Transform transform, @Nonnull FunctionAdapter functionAdapter, @Nonnull PipelineImpl pipelineImpl, boolean z) {
        super(transform, z, pipelineImpl);
        this.fnAdapter = functionAdapter;
    }

    @Nonnull
    public StreamStage<T> addTimestamps() {
        return addTimestamps(obj -> {
            return System.currentTimeMillis();
        }, 0L);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Nonnull
    public StreamStage<T> addTimestamps(DistributedToLongFunction<? super T> distributedToLongFunction, long j) {
        Preconditions.checkFalse(hasJetEvents(), "This stage already has timestamps assigned to it.");
        WatermarkGenerationParams<T> wmGenParams = WatermarkGenerationParams.wmGenParams(distributedToLongFunction, JetEvent::jetEvent, WatermarkPolicies.limitingLag(j), THROWING_EMIT_POLICY, WatermarkGenerationParams.DEFAULT_IDLE_TIMEOUT);
        if (this.transform instanceof StreamSourceTransform) {
            ((StreamSourceTransform) this.transform).setWmGenerationParams(wmGenParams);
            this.fnAdapter = ADAPT_TO_JET_EVENT;
            return (StreamStage) this;
        }
        TimestampTransform timestampTransform = new TimestampTransform(this.transform, wmGenParams);
        this.pipelineImpl.connect(this.transform, timestampTransform);
        return new StreamStageImpl(timestampTransform, ADAPT_TO_JET_EVENT, this.pipelineImpl);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public <R, RET> RET attachMap(@Nonnull DistributedFunction<? super T, ? extends R> distributedFunction) {
        return (RET) attach(new MapTransform(this.transform, this.fnAdapter.adaptMapFn(distributedFunction)), this.fnAdapter);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public <C, R, RET> RET attachMapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiFunction<? super C, ? super T, ? extends R> distributedBiFunction) {
        return (RET) attach(new MapUsingContextTransform(this.transform, contextFactory, this.fnAdapter.adaptMapUsingContextFn(distributedBiFunction)), this.fnAdapter);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public <RET> RET attachFilter(@Nonnull DistributedPredicate<T> distributedPredicate) {
        return (RET) attach(new FilterTransform(this.transform, this.fnAdapter.adaptFilterFn(distributedPredicate)), this.fnAdapter);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public <C, RET> RET attachFilterUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiPredicate<? super C, ? super T> distributedBiPredicate) {
        return (RET) attach(new FilterUsingContextTransform(this.transform, contextFactory, this.fnAdapter.adaptFilterUsingContextFn(distributedBiPredicate)), this.fnAdapter);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public <R, RET> RET attachFlatMap(@Nonnull DistributedFunction<? super T, ? extends Traverser<? extends R>> distributedFunction) {
        return (RET) attach(new FlatMapTransform(this.transform, this.fnAdapter.adaptFlatMapFn(distributedFunction)), this.fnAdapter);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public <C, R, RET> RET attachFlatMapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiFunction<? super C, ? super T, ? extends Traverser<? extends R>> distributedBiFunction) {
        return (RET) attach(new FlatMapUsingContextTransform(this.transform, contextFactory, this.fnAdapter.adaptFlatMapUsingContextFn(distributedBiFunction)), this.fnAdapter);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public <K1, T1_IN, T1, R, RET> RET attachHashJoin(@Nonnull BatchStage<T1_IN> batchStage, @Nonnull JoinClause<K1, ? super T, ? super T1_IN, ? extends T1> joinClause, @Nonnull DistributedBiFunction<T, T1, R> distributedBiFunction) {
        return (RET) attach(new HashJoinTransform((List<Transform>) Arrays.asList(this.transform, transformOf(batchStage)), Collections.singletonList(this.fnAdapter.adaptJoinClause(joinClause)), (List<Tag>) Collections.emptyList(), this.fnAdapter.adaptHashJoinOutputFn(distributedBiFunction)), this.fnAdapter);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public <K1, T1_IN, T1, K2, T2_IN, T2, R, RET> RET attachHashJoin2(@Nonnull BatchStage<T1_IN> batchStage, @Nonnull JoinClause<K1, ? super T, ? super T1_IN, ? extends T1> joinClause, @Nonnull BatchStage<T2_IN> batchStage2, @Nonnull JoinClause<K2, ? super T, ? super T2_IN, ? extends T2> joinClause2, @Nonnull DistributedTriFunction<T, T1, T2, R> distributedTriFunction) {
        return (RET) attach(new HashJoinTransform((List<Transform>) Arrays.asList(this.transform, transformOf(batchStage), transformOf(batchStage2)), Arrays.asList(this.fnAdapter.adaptJoinClause(joinClause), this.fnAdapter.adaptJoinClause(joinClause2)), (List<Tag>) Collections.emptyList(), this.fnAdapter.adaptHashJoinOutputFn(distributedTriFunction)), this.fnAdapter);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public <RET> RET attachPeek(@Nonnull DistributedPredicate<? super T> distributedPredicate, @Nonnull DistributedFunction<? super T, ? extends CharSequence> distributedFunction) {
        return (RET) attach(new PeekTransform(this.transform, this.fnAdapter.adaptFilterFn(distributedPredicate), this.fnAdapter.adaptToStringFn(distributedFunction)), this.fnAdapter);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public <RET> RET attachCustomTransform(@Nonnull String str, @Nonnull DistributedSupplier<Processor> distributedSupplier) {
        return (RET) attach(new ProcessorTransform(this.transform, str, distributedSupplier), this.fnAdapter);
    }

    @Nonnull
    public SinkStage drainTo(@Nonnull Sink<? super T> sink) {
        SinkImpl sinkImpl = (SinkImpl) sink;
        SinkTransform sinkTransform = new SinkTransform(sinkImpl, this.transform, this.fnAdapter == ADAPT_TO_JET_EVENT);
        SinkStageImpl sinkStageImpl = new SinkStageImpl(sinkTransform, this.pipelineImpl);
        sinkImpl.onAssignToStage();
        this.pipelineImpl.connect(this.transform, sinkTransform);
        return sinkStageImpl;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public abstract <RET> RET attach(@Nonnull AbstractTransform abstractTransform, @Nonnull FunctionAdapter functionAdapter);

    private boolean hasJetEvents() {
        return this.fnAdapter.equals(ADAPT_TO_JET_EVENT);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void ensureJetEvents(@Nonnull ComputeStageImplBase computeStageImplBase, @Nonnull String str) {
        if (computeStageImplBase.fnAdapter != ADAPT_TO_JET_EVENT) {
            throw new IllegalStateException(str + " is missing a timestamp definition. Call one of the .addTimestamps() methods on it before performing the aggregation.");
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1408786559:
                if (implMethodName.equals("jetEvent")) {
                    z = true;
                    break;
                }
                break;
            case 468673849:
                if (implMethodName.equals("lambda$static$293ea8be$1")) {
                    z = false;
                    break;
                }
                break;
            case 475521086:
                if (implMethodName.equals("lambda$addTimestamps$b2742fd1$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/core/WatermarkEmissionPolicy") && serializedLambda.getFunctionalInterfaceMethodName().equals("throttleWm") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(JJ)J") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/pipeline/ComputeStageImplBase") && serializedLambda.getImplMethodSignature().equals("(JJ)J")) {
                    return (j, j2) -> {
                        throw new IllegalStateException("emit policy should have been replaced");
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/DistributedObjLongBiFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/pipeline/JetEvent") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;J)Lcom/hazelcast/jet/impl/pipeline/JetEvent;")) {
                    return JetEvent::jetEvent;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/DistributedToLongFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLong") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/pipeline/ComputeStageImplBase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)J")) {
                    return obj -> {
                        return System.currentTimeMillis();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
