/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.stream.impl.collectors;

import com.hazelcast.jet.DAG;
import com.hazelcast.jet.Distributed;
import com.hazelcast.jet.Edge;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.Processors;
import com.hazelcast.jet.Vertex;
import com.hazelcast.jet.stream.DistributedCollector;
import com.hazelcast.jet.stream.IStreamList;
import com.hazelcast.jet.stream.impl.StreamUtil;
import com.hazelcast.jet.stream.impl.pipeline.Pipeline;
import com.hazelcast.jet.stream.impl.pipeline.StreamContext;
import com.hazelcast.jet.stream.impl.processor.CollectorAccumulateP;
import com.hazelcast.jet.stream.impl.processor.CollectorCombineP;
import com.hazelcast.jet.stream.impl.processor.CombineP;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

public class DistributedCollectorImpl<T, A, R>
implements DistributedCollector<T, A, R> {
    private final Distributed.Supplier<A> supplier;
    private final Distributed.BiConsumer<A, T> accumulator;
    private final Distributed.BinaryOperator<A> combiner;
    private final Set<Collector.Characteristics> characteristics;
    private final Distributed.Function<A, R> finisher;

    public DistributedCollectorImpl(Distributed.Supplier<A> supplier, Distributed.BiConsumer<A, T> accumulator, Distributed.BinaryOperator<A> combiner, Distributed.Function<A, R> finisher, Set<Collector.Characteristics> characteristics) {
        this.supplier = supplier;
        this.accumulator = accumulator;
        this.combiner = combiner;
        this.finisher = finisher;
        this.characteristics = characteristics;
    }

    public DistributedCollectorImpl(Distributed.Supplier<A> supplier, Distributed.BiConsumer<A, T> accumulator, Distributed.BinaryOperator<A> combiner, Set<Collector.Characteristics> characteristics) {
        this(supplier, accumulator, combiner, DistributedCollectorImpl.castingIdentity(), characteristics);
    }

    static <I, R> Distributed.Function<I, R> castingIdentity() {
        return i -> i;
    }

    static <R> R execute(StreamContext context, DAG dag, Vertex combiner) {
        String listName = StreamUtil.uniqueListName();
        Vertex writer = dag.newVertex(StreamUtil.uniqueVertexName("writer"), Processors.writeList(listName));
        dag.edge(Edge.between(combiner, writer));
        StreamUtil.executeJob(context, dag);
        IStreamList list = context.getJetInstance().getList(listName);
        Object result = list.get(0);
        list.destroy();
        return (R)result;
    }

    static <T, R> Vertex buildAccumulator(DAG dag, Pipeline<T> upstream, Supplier<R> supplier, BiConsumer<R, ? super T> accumulator) {
        Vertex previous;
        Vertex accumulatorVertex = dag.newVertex(StreamUtil.uniqueVertexName("accumulator"), () -> new CollectorAccumulateP(accumulator, supplier));
        if (upstream.isOrdered()) {
            accumulatorVertex.localParallelism(1);
        }
        if ((previous = upstream.buildDAG(dag)) != accumulatorVertex) {
            dag.edge(Edge.between(previous, accumulatorVertex));
        }
        return accumulatorVertex;
    }

    static <A, R> Vertex buildCombiner(DAG dag, Vertex accumulatorVertex, Object combiner, Function<A, R> finisher) {
        Distributed.Supplier<Processor> processorSupplier = DistributedCollectorImpl.getCombinerSupplier(combiner, finisher);
        Vertex combinerVertex = dag.newVertex(StreamUtil.uniqueVertexName("combiner"), processorSupplier).localParallelism(1);
        dag.edge(Edge.between(accumulatorVertex, combinerVertex).distributed().allToOne());
        return combinerVertex;
    }

    private static <A, R> Distributed.Supplier<Processor> getCombinerSupplier(Object combiner, Function<A, R> finisher) {
        if (combiner instanceof BiConsumer) {
            return () -> new CollectorCombineP((BiConsumer)combiner, finisher);
        }
        if (combiner instanceof BinaryOperator) {
            return () -> new CombineP((BinaryOperator)combiner, finisher);
        }
        throw new IllegalArgumentException("combiner is of type " + combiner.getClass());
    }

    @Override
    public Distributed.Supplier<A> supplier() {
        return this.supplier;
    }

    @Override
    public Distributed.BiConsumer<A, T> accumulator() {
        return this.accumulator;
    }

    @Override
    public Distributed.BinaryOperator<A> combiner() {
        return this.combiner;
    }

    @Override
    public Distributed.Function<A, R> finisher() {
        return this.finisher;
    }

    @Override
    public Set<Collector.Characteristics> characteristics() {
        return this.characteristics;
    }

    @Override
    public R collect(StreamContext context, Pipeline<? extends T> upstream) {
        DAG dag = new DAG();
        Vertex accumulatorVertex = DistributedCollectorImpl.buildAccumulator(dag, upstream, this.supplier(), this.accumulator());
        Vertex combinerVertex = DistributedCollectorImpl.buildCombiner(dag, accumulatorVertex, this.combiner(), this.finisher);
        return DistributedCollectorImpl.execute(context, dag, combinerVertex);
    }
}

