/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.core.processor;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.ResettableSingletonTraverser;
import com.hazelcast.jet.core.TimestampKind;
import com.hazelcast.jet.core.WatermarkEmissionPolicy;
import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.core.WindowDefinition;
import com.hazelcast.jet.datamodel.TimestampedEntry;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedFunctions;
import com.hazelcast.jet.function.DistributedPredicate;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.function.DistributedToLongFunction;
import com.hazelcast.jet.impl.processor.AggregateP;
import com.hazelcast.jet.impl.processor.CoGroupP;
import com.hazelcast.jet.impl.processor.InsertWatermarksP;
import com.hazelcast.jet.impl.processor.SessionWindowP;
import com.hazelcast.jet.impl.processor.SlidingWindowP;
import com.hazelcast.jet.impl.processor.TransformP;
import com.hazelcast.jet.impl.util.WrappingProcessorMetaSupplier;
import com.hazelcast.jet.impl.util.WrappingProcessorSupplier;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;

public final class Processors {
    private Processors() {
    }

    @Nonnull
    public static <T, K, A, R> DistributedSupplier<Processor> aggregateByKeyP(@Nonnull DistributedFunction<? super T, K> getKeyFn, @Nonnull AggregateOperation1<? super T, A, R> aggrOp) {
        return () -> new CoGroupP(getKeyFn, aggrOp);
    }

    @Nonnull
    public static <T, K, A> DistributedSupplier<Processor> accumulateByKeyP(@Nonnull DistributedFunction<? super T, K> getKeyFn, @Nonnull AggregateOperation1<? super T, A, ?> aggrOp) {
        return () -> new CoGroupP(getKeyFn, aggrOp.withFinishFn(DistributedFunction.identity()));
    }

    @Nonnull
    public static <K, A, R> DistributedSupplier<Processor> coAggregateByKeyP(@Nonnull List<DistributedFunction<?, ? extends K>> getKeyFs, @Nonnull AggregateOperation<A, R> aggrOp) {
        return () -> new CoGroupP(getKeyFs, aggrOp);
    }

    @Nonnull
    public static <K, A> DistributedSupplier<Processor> coAccumulateByKeyP(@Nonnull List<DistributedFunction<?, ? extends K>> getKeyFs, @Nonnull AggregateOperation<A, ?> aggrOp) {
        return () -> new CoGroupP(getKeyFs, aggrOp.withFinishFn(DistributedFunction.identity()));
    }

    @Nonnull
    public static <A, R> DistributedSupplier<Processor> combineByKeyP(@Nonnull AggregateOperation<A, R> aggrOp) {
        return () -> new CoGroupP(Map.Entry::getKey, aggrOp.withCombiningAccumulateFn(Map.Entry::getValue));
    }

    @Nonnull
    public static <T, A, R> DistributedSupplier<Processor> aggregateP(@Nonnull AggregateOperation1<T, A, R> aggrOp) {
        return () -> new AggregateP(aggrOp);
    }

    @Nonnull
    public static <T, A, R> DistributedSupplier<Processor> accumulateP(@Nonnull AggregateOperation1<T, A, R> aggrOp) {
        return () -> new AggregateP(aggrOp.withFinishFn(DistributedFunction.identity()));
    }

    @Nonnull
    public static <T, A, R> DistributedSupplier<Processor> combineP(@Nonnull AggregateOperation1<T, A, R> aggrOp) {
        return () -> new AggregateP(aggrOp.withCombiningAccumulateFn(DistributedFunction.identity()));
    }

    @Nonnull
    public static <T, K, A, R> DistributedSupplier<Processor> aggregateToSlidingWindowP(@Nonnull DistributedFunction<? super T, K> getKeyFn, @Nonnull DistributedToLongFunction<? super T> getTimestampFn, @Nonnull TimestampKind timestampKind, @Nonnull WindowDefinition windowDef, @Nonnull AggregateOperation1<? super T, A, R> aggrOp) {
        return Processors.aggregateByKeyAndWindowP(getKeyFn, getTimestampFn, timestampKind, windowDef, aggrOp, true);
    }

    @Nonnull
    public static <T, K, A> DistributedSupplier<Processor> accumulateByFrameP(@Nonnull DistributedFunction<? super T, K> getKeyFn, @Nonnull DistributedToLongFunction<? super T> getTimestampFn, @Nonnull TimestampKind timestampKind, @Nonnull WindowDefinition windowDef, @Nonnull AggregateOperation1<? super T, A, ?> aggrOp) {
        WindowDefinition tumblingByFrame = windowDef.toTumblingByFrame();
        return Processors.aggregateByKeyAndWindowP(getKeyFn, getTimestampFn, timestampKind, tumblingByFrame, aggrOp.withFinishFn(DistributedFunction.identity()), false);
    }

    @Nonnull
    public static <K, A, R> DistributedSupplier<Processor> combineToSlidingWindowP(@Nonnull WindowDefinition windowDef, @Nonnull AggregateOperation1<?, A, R> aggrOp) {
        return Processors.aggregateByKeyAndWindowP(TimestampedEntry::getKey, TimestampedEntry::getTimestamp, TimestampKind.FRAME, windowDef, aggrOp.withCombiningAccumulateFn(TimestampedEntry::getValue), true);
    }

    @Nonnull
    private static <T, K, A, R> DistributedSupplier<Processor> aggregateByKeyAndWindowP(@Nonnull DistributedFunction<? super T, K> getKeyFn, @Nonnull DistributedToLongFunction<? super T> getTimestampFn, @Nonnull TimestampKind timestampKind, @Nonnull WindowDefinition windowDef, @Nonnull AggregateOperation1<? super T, A, R> aggrOp, boolean isLastStage) {
        return () -> new SlidingWindowP(getKeyFn, timestampKind == TimestampKind.EVENT ? item -> windowDef.higherFrameTs(getTimestampFn.applyAsLong(item)) : getTimestampFn, windowDef, aggrOp, isLastStage);
    }

    @Nonnull
    public static <T, K, A, R> DistributedSupplier<Processor> aggregateToSessionWindowP(long sessionTimeout, @Nonnull DistributedToLongFunction<? super T> getTimestampFn, @Nonnull DistributedFunction<? super T, K> getKeyFn, @Nonnull AggregateOperation1<? super T, A, R> aggrOp) {
        return () -> new SessionWindowP(sessionTimeout, getTimestampFn, getKeyFn, aggrOp);
    }

    @Nonnull
    public static <T> DistributedSupplier<Processor> insertWatermarksP(@Nonnull DistributedToLongFunction<T> getTimestampF, @Nonnull DistributedSupplier<WatermarkPolicy> newWmPolicyF, @Nonnull WatermarkEmissionPolicy wmEmitPolicy) {
        return () -> new InsertWatermarksP(getTimestampF, (WatermarkPolicy)newWmPolicyF.get(), wmEmitPolicy);
    }

    @Nonnull
    public static <T, R> DistributedSupplier<Processor> mapP(@Nonnull DistributedFunction<T, R> mapper) {
        return () -> {
            ResettableSingletonTraverser trav = new ResettableSingletonTraverser();
            return new TransformP(item -> {
                trav.accept(mapper.apply(item));
                return trav;
            });
        };
    }

    @Nonnull
    public static <T> DistributedSupplier<Processor> filterP(@Nonnull DistributedPredicate<T> predicate) {
        return () -> {
            ResettableSingletonTraverser trav = new ResettableSingletonTraverser();
            return new TransformP(item -> {
                trav.accept(predicate.test(item) ? item : null);
                return trav;
            });
        };
    }

    @Nonnull
    public static <T, R> DistributedSupplier<Processor> flatMapP(@Nonnull DistributedFunction<T, ? extends Traverser<? extends R>> mapper) {
        return () -> new TransformP(mapper);
    }

    @Nonnull
    public static DistributedSupplier<Processor> noopP() {
        return () -> new NoopP();
    }

    @Nonnull
    public static ProcessorMetaSupplier nonCooperativeP(@Nonnull ProcessorMetaSupplier wrapped) {
        return new WrappingProcessorMetaSupplier(wrapped, p -> {
            ((AbstractProcessor)p).setCooperative(false);
            return p;
        });
    }

    @Nonnull
    public static ProcessorSupplier nonCooperativeP(@Nonnull ProcessorSupplier wrapped) {
        return new WrappingProcessorSupplier(wrapped, p -> {
            ((AbstractProcessor)p).setCooperative(false);
            return p;
        });
    }

    @Nonnull
    public static DistributedSupplier<Processor> nonCooperativeP(@Nonnull DistributedSupplier<Processor> wrapped) {
        return () -> {
            Processor p = (Processor)wrapped.get();
            ((AbstractProcessor)p).setCooperative(false);
            return p;
        };
    }

    private static class NoopP
    implements Processor {
        private NoopP() {
        }

        @Override
        public void process(int ordinal, @Nonnull Inbox inbox) {
            inbox.drain(DistributedFunctions.noopConsumer());
        }
    }
}

