/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline.transform;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Partitioner;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.impl.pipeline.PipelineImpl;
import com.hazelcast.jet.impl.pipeline.Planner;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.pipeline.ServiceFactories;
import java.util.HashSet;

public class DistinctTransform<T, K>
extends AbstractTransform {
    private final FunctionEx<? super T, ? extends K> keyFn;

    public DistinctTransform(Transform upstream, FunctionEx<? super T, ? extends K> keyFn) {
        super("distinct", upstream);
        this.keyFn = keyFn;
    }

    @Override
    public void addToDag(Planner p, PipelineImpl.Context context) {
        String vertexName = this.name();
        this.determineLocalParallelism(-1, context, false);
        Vertex v1 = p.dag.newVertex(vertexName + "-prepare", DistinctTransform.distinctP(this.keyFn)).localParallelism(this.determinedLocalParallelism());
        Planner.PlannerVertex pv2 = p.addVertex((Transform)this, vertexName, this.determinedLocalParallelism(), DistinctTransform.distinctP(this.keyFn));
        p.addEdges((Transform)this, v1, (e, ord) -> e.partitioned(this.keyFn, Partitioner.HASH_CODE));
        p.dag.edge(Edge.between(v1, pv2.v).distributed().partitioned(this.keyFn));
    }

    private static <T, K> ProcessorSupplier distinctP(FunctionEx<? super T, ? extends K> keyFn) {
        return Processors.filterUsingServiceP(ServiceFactories.nonSharedService(pctx -> new HashSet()), (seenItems, item) -> seenItems.add(keyFn.apply(item)));
    }
}

