/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.stream.impl.collectors;

import com.hazelcast.core.IMap;
import com.hazelcast.jet.DAG;
import com.hazelcast.jet.Distributed;
import com.hazelcast.jet.Edge;
import com.hazelcast.jet.KeyExtractors;
import com.hazelcast.jet.Partitioner;
import com.hazelcast.jet.Processors;
import com.hazelcast.jet.Vertex;
import com.hazelcast.jet.stream.DistributedCollector;
import com.hazelcast.jet.stream.IStreamMap;
import com.hazelcast.jet.stream.impl.StreamUtil;
import com.hazelcast.jet.stream.impl.collectors.AbstractCollector;
import com.hazelcast.jet.stream.impl.pipeline.Pipeline;
import com.hazelcast.jet.stream.impl.pipeline.StreamContext;
import com.hazelcast.jet.stream.impl.processor.CombineGroupsP;
import com.hazelcast.jet.stream.impl.processor.GroupAndAccumulateP;
import java.util.function.Function;
import java.util.stream.Collector;

public class HazelcastGroupingMapCollector<T, A, K, D>
extends AbstractCollector<T, A, IMap<K, D>> {
    private final String mapName;
    private final Function<? super T, ? extends K> classifier;
    private final Collector<? super T, A, D> collector;

    public HazelcastGroupingMapCollector(Distributed.Function<? super T, ? extends K> classifier, DistributedCollector<? super T, A, D> collector) {
        this(StreamUtil.uniqueMapName(), classifier, collector);
    }

    public HazelcastGroupingMapCollector(String mapName, Function<? super T, ? extends K> classifier, Collector<? super T, A, D> collector) {
        this.mapName = mapName;
        this.classifier = classifier;
        this.collector = collector;
    }

    @Override
    public IMap<K, D> collect(StreamContext context, Pipeline<? extends T> upstream) {
        IStreamMap target = context.getJetInstance().getMap(this.mapName);
        DAG dag = new DAG();
        Vertex previous = upstream.buildDAG(dag);
        Vertex merger = dag.newVertex(StreamUtil.uniqueVertexName("grouping-accumulator"), () -> new GroupAndAccumulateP(this.classifier, this.collector));
        Vertex combiner = dag.newVertex(StreamUtil.uniqueVertexName("grouping-combiner"), () -> new CombineGroupsP(this.collector));
        Vertex writer = dag.newVertex(StreamUtil.writerVertexName(this.mapName), Processors.writeMap(this.mapName));
        dag.edge(Edge.between(previous, merger).partitioned(this.classifier::apply, Partitioner.HASH_CODE)).edge(Edge.between(merger, combiner).distributed().partitioned(KeyExtractors.entryKey())).edge(Edge.between(combiner, writer));
        StreamUtil.executeJob(context, dag);
        return target;
    }
}

