/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.core.processor;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.ResettableSingletonTraverser;
import com.hazelcast.jet.core.SlidingWindowPolicy;
import com.hazelcast.jet.core.TimestampKind;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.function.KeyedWindowResultFunction;
import com.hazelcast.jet.datamodel.KeyedWindowResult;
import com.hazelcast.jet.datamodel.WindowResult;
import com.hazelcast.jet.function.BiFunctionEx;
import com.hazelcast.jet.function.BiPredicateEx;
import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.PredicateEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.jet.function.ToLongFunctionEx;
import com.hazelcast.jet.function.TriFunction;
import com.hazelcast.jet.impl.processor.AsyncTransformUsingContextOrderedP;
import com.hazelcast.jet.impl.processor.AsyncTransformUsingContextUnorderedP;
import com.hazelcast.jet.impl.processor.GroupP;
import com.hazelcast.jet.impl.processor.InsertWatermarksP;
import com.hazelcast.jet.impl.processor.RollingAggregateP;
import com.hazelcast.jet.impl.processor.SessionWindowP;
import com.hazelcast.jet.impl.processor.SlidingWindowP;
import com.hazelcast.jet.impl.processor.TransformP;
import com.hazelcast.jet.impl.processor.TransformUsingContextP;
import com.hazelcast.jet.pipeline.ContextFactory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public final class Processors {
    private Processors() {
    }

    @Nonnull
    public static <A, R> SupplierEx<Processor> aggregateP(@Nonnull AggregateOperation<A, R> aggrOp) {
        return () -> new GroupP(Collections.nCopies(aggrOp.arity(), t -> "ALL"), aggrOp, (k, r) -> r);
    }

    @Nonnull
    public static <A, R> SupplierEx<Processor> accumulateP(@Nonnull AggregateOperation<A, R> aggrOp) {
        return () -> new GroupP(Collections.nCopies(aggrOp.arity(), t -> "ALL"), aggrOp.withIdentityFinish(), (k, r) -> r);
    }

    @Nonnull
    public static <A, R> SupplierEx<Processor> combineP(@Nonnull AggregateOperation<A, R> aggrOp) {
        return () -> new GroupP(t -> "ALL", aggrOp.withCombiningAccumulateFn(FunctionEx.identity()), (k, r) -> r);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> aggregateByKeyP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull AggregateOperation<A, R> aggrOp, @Nonnull BiFunctionEx<? super K, ? super R, OUT> mapToOutputFn) {
        return () -> new GroupP(keyFns, aggrOp, mapToOutputFn);
    }

    @Nonnull
    public static <K, A> SupplierEx<Processor> accumulateByKeyP(@Nonnull List<FunctionEx<?, ? extends K>> getKeyFns, @Nonnull AggregateOperation<A, ?> aggrOp) {
        return () -> new GroupP(getKeyFns, aggrOp.withIdentityFinish(), Util::entry);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> combineByKeyP(@Nonnull AggregateOperation<A, R> aggrOp, @Nonnull BiFunctionEx<? super K, ? super R, OUT> mapToOutputFn) {
        return () -> new GroupP(Map.Entry::getKey, aggrOp.withCombiningAccumulateFn(Map.Entry::getValue), mapToOutputFn);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> aggregateToSlidingWindowP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, long earlyResultsPeriod, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) {
        return Processors.aggregateByKeyAndWindowP(keyFns, timestampFns, timestampKind, winPolicy, earlyResultsPeriod, aggrOp, mapToOutputFn, true);
    }

    @Nonnull
    public static <K, A> SupplierEx<Processor> accumulateByFrameP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, ?> aggrOp) {
        return Processors.aggregateByKeyAndWindowP(keyFns, timestampFns, timestampKind, winPolicy.toTumblingByFrame(), 0L, aggrOp.withIdentityFinish(), KeyedWindowResult::new, false);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> combineToSlidingWindowP(@Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) {
        FunctionEx<KeyedWindowResult, Object> keyFn = KeyedWindowResult::key;
        ToLongFunctionEx<KeyedWindowResult> timestampFn = WindowResult::end;
        return Processors.aggregateByKeyAndWindowP(Collections.singletonList(keyFn), Collections.singletonList(timestampFn), TimestampKind.FRAME, winPolicy, 0L, aggrOp.withCombiningAccumulateFn(WindowResult::result), mapToOutputFn, true);
    }

    @Nonnull
    private static <K, A, R, OUT> SupplierEx<Processor> aggregateByKeyAndWindowP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, long earlyResultsPeriod, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn, boolean isLastStage) {
        return () -> new SlidingWindowP(keyFns, timestampFns.stream().map(f -> Processors.toFrameTimestampFn(f, timestampKind, winPolicy)).collect(Collectors.toList()), winPolicy, earlyResultsPeriod, aggrOp, mapToOutputFn, isLastStage);
    }

    private static ToLongFunctionEx<Object> toFrameTimestampFn(@Nonnull ToLongFunctionEx<?> timestampFnX, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy) {
        ToLongFunctionEx<?> timestampFn = timestampFnX;
        return timestampKind == TimestampKind.EVENT ? item -> winPolicy.higherFrameTs(timestampFn.applyAsLong(item)) : item -> winPolicy.higherFrameTs(timestampFn.applyAsLong(item) - 1L);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> aggregateToSessionWindowP(long sessionTimeout, long earlyResultsPeriod, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) {
        return () -> new SessionWindowP(sessionTimeout, earlyResultsPeriod, timestampFns, keyFns, aggrOp, mapToOutputFn);
    }

    @Nonnull
    public static <T> SupplierEx<Processor> insertWatermarksP(@Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        return () -> new InsertWatermarksP(eventTimePolicy);
    }

    @Nonnull
    public static <T, R> SupplierEx<Processor> mapP(@Nonnull FunctionEx<T, R> mapFn) {
        return () -> {
            ResettableSingletonTraverser trav = new ResettableSingletonTraverser();
            return new TransformP(item -> {
                trav.accept(mapFn.apply(item));
                return trav;
            });
        };
    }

    @Nonnull
    public static <T> SupplierEx<Processor> filterP(@Nonnull PredicateEx<T> filterFn) {
        return () -> {
            ResettableSingletonTraverser trav = new ResettableSingletonTraverser();
            return new TransformP(item -> {
                trav.accept(filterFn.test(item) ? item : null);
                return trav;
            });
        };
    }

    @Nonnull
    public static <T, R> SupplierEx<Processor> flatMapP(@Nonnull FunctionEx<T, ? extends Traverser<? extends R>> flatMapFn) {
        return () -> new TransformP(flatMapFn);
    }

    @Nonnull
    public static <C, T, R> ProcessorSupplier mapUsingContextP(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiFunctionEx<? super C, ? super T, ? extends R> mapFn) {
        return TransformUsingContextP.supplier(contextFactory, (singletonTraverser, context, item) -> {
            singletonTraverser.accept(mapFn.apply((Object)context, (Object)item));
            return singletonTraverser;
        });
    }

    @Nonnull
    public static <C, T, K, R> ProcessorSupplier mapUsingContextAsyncP(@Nonnull ContextFactory<C> contextFactory, @Nonnull FunctionEx<T, K> extractKeyFn, @Nonnull BiFunctionEx<? super C, ? super T, CompletableFuture<R>> mapAsyncFn) {
        return Processors.flatMapUsingContextAsyncP(contextFactory, extractKeyFn, (c, t) -> ((CompletableFuture)mapAsyncFn.apply((Object)c, (Object)t)).thenApply(Traversers::singleton));
    }

    @Nonnull
    public static <C, T> ProcessorSupplier filterUsingContextP(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiPredicateEx<? super C, ? super T> filterFn) {
        return TransformUsingContextP.supplier(contextFactory, (singletonTraverser, context, item) -> {
            singletonTraverser.accept(filterFn.test((Object)context, (Object)item) ? item : null);
            return singletonTraverser;
        });
    }

    @Nonnull
    public static <C, T, K> ProcessorSupplier filterUsingContextAsyncP(@Nonnull ContextFactory<C> contextFactory, @Nonnull FunctionEx<T, K> extractKeyFn, @Nonnull BiFunctionEx<? super C, ? super T, CompletableFuture<Boolean>> filterAsyncFn) {
        return Processors.flatMapUsingContextAsyncP(contextFactory, extractKeyFn, (c, t) -> ((CompletableFuture)filterAsyncFn.apply((Object)c, (Object)t)).thenApply(passed -> passed != false ? Traversers.singleton(t) : null));
    }

    @Nonnull
    public static <C, T, R> ProcessorSupplier flatMapUsingContextP(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiFunctionEx<? super C, ? super T, ? extends Traverser<? extends R>> flatMapFn) {
        return TransformUsingContextP.supplier(contextFactory, (singletonTraverser, context, item) -> (Traverser)flatMapFn.apply((Object)context, (Object)item));
    }

    @Nonnull
    public static <C, T, K, R> ProcessorSupplier flatMapUsingContextAsyncP(@Nonnull ContextFactory<C> contextFactory, @Nonnull FunctionEx<? super T, ? extends K> extractKeyFn, @Nonnull BiFunctionEx<? super C, ? super T, CompletableFuture<Traverser<R>>> flatMapAsyncFn) {
        return contextFactory.hasOrderedAsyncResponses() ? AsyncTransformUsingContextOrderedP.supplier(contextFactory, flatMapAsyncFn) : AsyncTransformUsingContextUnorderedP.supplier(contextFactory, flatMapAsyncFn, extractKeyFn);
    }

    @Nonnull
    public static <T, K, A, R, OUT> SupplierEx<Processor> rollingAggregateP(@Nonnull FunctionEx<? super T, ? extends K> keyFn, @Nonnull AggregateOperation1<? super T, A, ? extends R> aggrOp, @Nonnull TriFunction<? super T, ? super K, ? super R, ? extends OUT> mapToOutputFn) {
        return () -> new RollingAggregateP(keyFn, aggrOp, mapToOutputFn);
    }

    @Nonnull
    public static SupplierEx<Processor> noopP() {
        return () -> new NoopP();
    }

    private static class NoopP
    implements Processor {
        private Outbox outbox;

        private NoopP() {
        }

        @Override
        public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) throws Exception {
            this.outbox = outbox;
        }

        @Override
        public void process(int ordinal, @Nonnull Inbox inbox) {
            inbox.drain(ConsumerEx.noop());
        }

        @Override
        public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
            return this.outbox.offer(watermark);
        }

        @Override
        public void restoreFromSnapshot(@Nonnull Inbox inbox) {
            inbox.drain(ConsumerEx.noop());
        }
    }
}

