/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline.transform;

import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Partitioner;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.function.BiFunctionEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.Functions;
import com.hazelcast.jet.impl.pipeline.Planner;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import java.util.List;
import javax.annotation.Nonnull;

public class GroupTransform<K, A, R, OUT>
extends AbstractTransform {
    @Nonnull
    private final List<FunctionEx<?, ? extends K>> groupKeyFns;
    @Nonnull
    private final AggregateOperation<A, R> aggrOp;
    @Nonnull
    private final BiFunctionEx<? super K, ? super R, OUT> mapToOutputFn;

    public GroupTransform(@Nonnull List<Transform> upstream, @Nonnull List<FunctionEx<?, ? extends K>> groupKeyFns, @Nonnull AggregateOperation<A, R> aggrOp, @Nonnull BiFunctionEx<? super K, ? super R, OUT> mapToOutputFn) {
        super(GroupTransform.createName(upstream), upstream);
        this.groupKeyFns = groupKeyFns;
        this.aggrOp = aggrOp;
        this.mapToOutputFn = mapToOutputFn;
    }

    private static String createName(@Nonnull List<Transform> upstream) {
        return upstream.size() == 1 ? "group-and-aggregate" : upstream.size() + "-way cogroup-and-aggregate";
    }

    @Override
    public void addToDag(Planner p) {
        if (this.getOptimization() == AbstractTransform.Optimization.MEMORY || this.aggrOp.combineFn() == null) {
            this.addToDagSingleStage(p);
        } else {
            this.addToDagTwoStage(p);
        }
    }

    private void addToDagSingleStage(Planner p) {
        Planner.PlannerVertex pv = p.addVertex((Transform)this, this.name(), this.localParallelism(), Processors.aggregateByKeyP(this.groupKeyFns, this.aggrOp, this.mapToOutputFn));
        p.addEdges((Transform)this, pv.v, (e, ord) -> e.distributed().partitioned(this.groupKeyFns.get((int)ord)));
    }

    private void addToDagTwoStage(Planner p) {
        List groupKeyFns = this.groupKeyFns;
        Vertex v1 = p.dag.newVertex(this.name() + "-prepare", Processors.accumulateByKeyP(groupKeyFns, this.aggrOp)).localParallelism(this.localParallelism());
        Planner.PlannerVertex pv2 = p.addVertex((Transform)this, this.name(), this.localParallelism(), Processors.combineByKeyP(this.aggrOp, this.mapToOutputFn));
        p.addEdges((Transform)this, v1, (e, ord) -> e.partitioned((FunctionEx)groupKeyFns.get((int)ord), Partitioner.HASH_CODE));
        p.dag.edge(Edge.between(v1, pv2.v).distributed().partitioned(Functions.entryKey()));
    }
}

