/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.stream.impl.collectors;

import com.hazelcast.jet.DAG;
import com.hazelcast.jet.Edge;
import com.hazelcast.jet.KeyExtractors;
import com.hazelcast.jet.Partitioner;
import com.hazelcast.jet.Processors;
import com.hazelcast.jet.Vertex;
import com.hazelcast.jet.stream.IStreamMap;
import com.hazelcast.jet.stream.impl.StreamUtil;
import com.hazelcast.jet.stream.impl.collectors.HazelcastMapCollector;
import com.hazelcast.jet.stream.impl.pipeline.Pipeline;
import com.hazelcast.jet.stream.impl.pipeline.StreamContext;
import com.hazelcast.jet.stream.impl.processor.MergeP;
import java.util.function.BinaryOperator;
import java.util.function.Function;

public class HazelcastMergingMapCollector<T, K, V>
extends HazelcastMapCollector<T, K, V> {
    private final BinaryOperator<V> mergeFunction;

    public HazelcastMergingMapCollector(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends V> valueMapper, BinaryOperator<V> mergeFunction) {
        this(StreamUtil.uniqueMapName(), keyMapper, valueMapper, mergeFunction);
    }

    public HazelcastMergingMapCollector(String mapName, Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends V> valueMapper, BinaryOperator<V> mergeFunction) {
        super(mapName, keyMapper, valueMapper);
        this.mergeFunction = mergeFunction;
    }

    @Override
    public IStreamMap<K, V> collect(StreamContext context, Pipeline<? extends T> upstream) {
        Object target = this.getTarget(context.getJetInstance());
        DAG dag = new DAG();
        Vertex previous = upstream.buildDAG(dag);
        Vertex merger = dag.newVertex(StreamUtil.uniqueVertexName("merging-accumulator"), () -> new MergeP(this.keyMapper, this.valueMapper, this.mergeFunction));
        Vertex combiner = dag.newVertex(StreamUtil.uniqueVertexName("merging-combiner"), () -> new MergeP(null, null, this.mergeFunction));
        Vertex writer = dag.newVertex(StreamUtil.writerVertexName(this.mapName), Processors.writeMap(this.mapName));
        dag.edge(Edge.between(previous, merger).partitioned(this.keyMapper::apply, Partitioner.HASH_CODE)).edge(Edge.between(merger, combiner).distributed().partitioned(KeyExtractors.entryKey())).edge(Edge.between(combiner, writer));
        StreamUtil.executeJob(context, dag);
        return target;
    }
}

