/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.stream.impl.terminal;

import com.hazelcast.jet.DAG;
import com.hazelcast.jet.Distributed;
import com.hazelcast.jet.Edge;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.Processors;
import com.hazelcast.jet.Vertex;
import com.hazelcast.jet.stream.IStreamList;
import com.hazelcast.jet.stream.impl.StreamUtil;
import com.hazelcast.jet.stream.impl.pipeline.Pipeline;
import com.hazelcast.jet.stream.impl.pipeline.StreamContext;
import com.hazelcast.jet.stream.impl.processor.AccumulateP;
import com.hazelcast.jet.stream.impl.processor.CombineP;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;

public class Reducer {
    private final StreamContext context;

    public Reducer(StreamContext context) {
        this.context = context;
    }

    public <T, U> U reduce(Pipeline<T> upstream, U identity, Distributed.BiFunction<U, ? super T, U> accumulator, Distributed.BinaryOperator<U> combiner) {
        return this.reduce(upstream, identity, (BiFunction<U, ? super T, U>)accumulator, (BinaryOperator<U>)combiner);
    }

    public <T, U> U reduce(Pipeline<T> upstream, U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
        StreamUtil.checkSerializable(accumulator, "accumulator");
        StreamUtil.checkSerializable(combiner, "combiner");
        DAG dag = new DAG();
        Vertex accumulatorVertex = Reducer.buildMappingAccumulator(dag, upstream, identity, accumulator);
        Vertex combinerVertex = this.buildCombiner(dag, accumulatorVertex, combiner);
        return (U)this.execute(dag, combinerVertex).get();
    }

    private <T> Vertex buildCombiner(DAG dag, Vertex accumulatorVertex, BinaryOperator<T> combiner) {
        Distributed.Supplier<Processor> supplier = () -> new CombineP(combiner, Distributed.Function.identity());
        Vertex combinerVertex = dag.newVertex(StreamUtil.uniqueVertexName("combiner"), supplier).localParallelism(1);
        dag.edge(Edge.between(accumulatorVertex, combinerVertex).distributed().allToOne());
        return combinerVertex;
    }

    public <T> Optional<T> reduce(Pipeline<T> upstream, Distributed.BinaryOperator<T> operator) {
        return this.reduce(upstream, (BinaryOperator<T>)operator);
    }

    public <T> Optional<T> reduce(Pipeline<T> upstream, BinaryOperator<T> operator) {
        StreamUtil.checkSerializable(operator, "operator");
        DAG dag = new DAG();
        Vertex accumulatorVertex = Reducer.buildAccumulator(dag, upstream, operator, null);
        Vertex combinerVertex = this.buildCombiner(dag, accumulatorVertex, operator);
        return this.execute(dag, combinerVertex);
    }

    public <T> T reduce(Pipeline<T> upstream, T identity, Distributed.BinaryOperator<T> accumulator) {
        return this.reduce(upstream, identity, (BinaryOperator<T>)accumulator);
    }

    public <T> T reduce(Pipeline<T> upstream, T identity, BinaryOperator<T> accumulator) {
        StreamUtil.checkSerializable(accumulator, "accumulator");
        DAG dag = new DAG();
        Vertex accumulatorVertex = Reducer.buildAccumulator(dag, upstream, accumulator, identity);
        Vertex combinerVertex = this.buildCombiner(dag, accumulatorVertex, accumulator);
        return this.execute(dag, combinerVertex).get();
    }

    private <T> Optional<T> execute(DAG dag, Vertex combiner) {
        String listName = StreamUtil.uniqueListName();
        Vertex writer = dag.newVertex(StreamUtil.writerVertexName(listName), Processors.writeList(listName));
        dag.edge(Edge.between(combiner, writer));
        IStreamList list = this.context.getJetInstance().getList(listName);
        StreamUtil.executeJob(this.context, dag);
        if (list.isEmpty()) {
            list.destroy();
            return Optional.empty();
        }
        Object result = list.get(0);
        list.destroy();
        return Optional.of(result);
    }

    private static <T, U> Vertex buildMappingAccumulator(DAG dag, Pipeline<? extends T> upstream, U identity, BiFunction<U, ? super T, U> accumulator) {
        Vertex acc = dag.newVertex(StreamUtil.uniqueVertexName("accumulator"), () -> new AccumulateP(accumulator, identity));
        Vertex previous = upstream.buildDAG(dag);
        if (previous != acc) {
            dag.edge(Edge.between(previous, acc));
        }
        return acc;
    }

    private static <T> Vertex buildAccumulator(DAG dag, Pipeline<? extends T> upstream, BinaryOperator<T> accumulator, T identity) {
        Vertex accumulatorVertex = Reducer.getAccumulatorVertex(accumulator, identity);
        dag.vertex(accumulatorVertex);
        Vertex previous = upstream.buildDAG(dag);
        if (previous != accumulatorVertex) {
            dag.edge(Edge.between(previous, accumulatorVertex));
        }
        return accumulatorVertex;
    }

    private static <T> Vertex getAccumulatorVertex(BinaryOperator<T> accumulator, T identity) {
        return identity != null ? new Vertex(StreamUtil.uniqueVertexName("accumulator"), () -> new AccumulateP(accumulator, identity)) : new Vertex(StreamUtil.uniqueVertexName("combiner"), () -> new CombineP(accumulator, Distributed.Function.identity()));
    }
}

