/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.jet.ComputeStage;
import com.hazelcast.jet.JoinClause;
import com.hazelcast.jet.Stage;
import com.hazelcast.jet.Transform;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Partitioner;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.DiagnosticProcessors;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedFunctions;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.AbstractStage;
import com.hazelcast.jet.impl.PipelineImpl;
import com.hazelcast.jet.impl.SinkImpl;
import com.hazelcast.jet.impl.SourceImpl;
import com.hazelcast.jet.impl.TopologicalSorter;
import com.hazelcast.jet.impl.processor.HashJoinCollectP;
import com.hazelcast.jet.impl.processor.HashJoinP;
import com.hazelcast.jet.impl.transform.CoGroupTransform;
import com.hazelcast.jet.impl.transform.FilterTransform;
import com.hazelcast.jet.impl.transform.FlatMapTransform;
import com.hazelcast.jet.impl.transform.GroupByTransform;
import com.hazelcast.jet.impl.transform.HashJoinTransform;
import com.hazelcast.jet.impl.transform.MapTransform;
import com.hazelcast.jet.impl.transform.PeekTransform;
import com.hazelcast.jet.impl.transform.ProcessorTransform;
import com.hazelcast.util.UuidUtil;
import java.lang.invoke.LambdaMetafactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

class Planner {
    private static final int RANDOM_SUFFIX_LENGTH = 8;
    private final PipelineImpl pipeline;
    private final DAG dag = new DAG();
    private final Map<Stage, PlannerVertex> stage2vertex = new HashMap<Stage, PlannerVertex>();

    Planner(PipelineImpl pipeline) {
        this.pipeline = pipeline;
    }

    DAG createDag() {
        Map<Stage, List<Stage>> adjacencyMap = this.pipeline.adjacencyMap();
        Planner.validateNoLeakage(adjacencyMap);
        Iterable<Stage> sorted = TopologicalSorter.topologicalSort(adjacencyMap, Object::toString);
        for (AbstractStage abstractStage : sorted) {
            Transform transform = abstractStage.transform;
            if (transform instanceof SourceImpl) {
                this.handleSource(abstractStage, (SourceImpl)transform);
                continue;
            }
            if (transform instanceof ProcessorTransform) {
                this.handleProcessorStage(abstractStage, (ProcessorTransform)transform);
                continue;
            }
            if (transform instanceof FilterTransform) {
                this.handleFilter(abstractStage, (FilterTransform)transform);
                continue;
            }
            if (transform instanceof MapTransform) {
                this.handleMap(abstractStage, (MapTransform)transform);
                continue;
            }
            if (transform instanceof FlatMapTransform) {
                this.handleFlatMap(abstractStage, (FlatMapTransform)transform);
                continue;
            }
            if (transform instanceof GroupByTransform) {
                this.handleGroupBy(abstractStage, (GroupByTransform)transform);
                continue;
            }
            if (transform instanceof CoGroupTransform) {
                this.handleCoGroup(abstractStage, (CoGroupTransform)transform);
                continue;
            }
            if (transform instanceof HashJoinTransform) {
                this.handleHashJoin(abstractStage, (HashJoinTransform)transform);
                continue;
            }
            if (transform instanceof PeekTransform) {
                this.handlePeek(abstractStage, (PeekTransform)transform);
                continue;
            }
            if (transform instanceof SinkImpl) {
                this.handleSink(abstractStage, (SinkImpl)transform);
                continue;
            }
            throw new IllegalArgumentException("Unknown transform " + transform);
        }
        return this.dag;
    }

    private static void validateNoLeakage(Map<Stage, List<Stage>> adjacencyMap) {
        List leakages = adjacencyMap.entrySet().stream().filter(e -> ((List)e.getValue()).isEmpty()).map(Map.Entry::getKey).filter(stage -> stage instanceof ComputeStage).map(stage -> (ComputeStage)stage).collect(Collectors.toList());
        if (!leakages.isEmpty()) {
            throw new IllegalArgumentException("These ComputeStages have nothing attached to them: " + leakages);
        }
    }

    private void handleSource(AbstractStage stage, SourceImpl source) {
        this.addVertex((Stage)stage, source.name(), source.metaSupplier());
    }

    private void handleProcessorStage(AbstractStage stage, ProcessorTransform procTransform) {
        PlannerVertex pv = this.addVertex((Stage)stage, procTransform.transformName + '.' + Planner.randomSuffix(), procTransform.procSupplier);
        this.addEdges(stage, pv.v);
    }

    private void handleMap(AbstractStage stage, MapTransform map) {
        PlannerVertex pv = this.addVertex((Stage)stage, "map." + Planner.randomSuffix(), Processors.mapP(map.mapFn));
        this.addEdges(stage, pv.v);
    }

    private void handleFilter(AbstractStage stage, FilterTransform filter) {
        PlannerVertex pv = this.addVertex((Stage)stage, "filter." + Planner.randomSuffix(), Processors.filterP(filter.filterFn));
        this.addEdges(stage, pv.v);
    }

    private void handleFlatMap(AbstractStage stage, FlatMapTransform flatMap) {
        PlannerVertex pv = this.addVertex((Stage)stage, "flatMap." + Planner.randomSuffix(), Processors.flatMapP(flatMap.flatMapFn()));
        this.addEdges(stage, pv.v);
    }

    private void handleGroupBy(AbstractStage stage, GroupByTransform<Object, Object, Object, Object> groupBy) {
        String name = "groupByKey." + Planner.randomSuffix() + ".stage";
        Vertex v1 = this.dag.newVertex(name + '1', Processors.accumulateByKeyP(groupBy.keyFn(), groupBy.aggregateOperation()));
        PlannerVertex pv2 = this.addVertex((Stage)stage, name + '2', Processors.combineByKeyP(groupBy.aggregateOperation()));
        this.addEdges(stage, v1, (Edge e) -> e.partitioned(groupBy.keyFn(), Partitioner.HASH_CODE));
        this.dag.edge(Edge.between(v1, pv2.v).distributed().partitioned(DistributedFunctions.entryKey()));
    }

    private void handleCoGroup(AbstractStage stage, CoGroupTransform<Object, Object, Object> coGroup) {
        List groupKeyFs = coGroup.groupKeyFs();
        String name = "coGroup." + Planner.randomSuffix() + ".stage";
        Vertex v1 = this.dag.newVertex(name + '1', Processors.coAccumulateByKeyP(groupKeyFs, coGroup.aggregateOperation()));
        PlannerVertex pv2 = this.addVertex((Stage)stage, name + '2', Processors.combineByKeyP(coGroup.aggregateOperation()));
        this.addEdges(stage, v1, (Edge e, Integer ord) -> e.partitioned((DistributedFunction)groupKeyFs.get((int)ord), Partitioner.HASH_CODE));
        this.dag.edge(Edge.between(v1, pv2.v).distributed().partitioned(DistributedFunctions.entryKey()));
    }

    private void handleHashJoin(AbstractStage stage, HashJoinTransform<?> hashJoin) {
        String hashJoinName = "hashJoin." + Planner.randomSuffix();
        PlannerVertex primary = this.stage2vertex.get(stage.upstream.get(0));
        List keyFns = hashJoin.clauses().stream().map(JoinClause::leftKeyFn).collect(Collectors.toList());
        Vertex joiner = this.addVertex((Stage)stage, (String)new StringBuilder().append((String)hashJoinName).append((String)".joiner").toString(), (DistributedSupplier<Processor>)(DistributedSupplier)LambdaMetafactory.altMetafactory(null, null, null, ()Ljava/lang/Object;, lambda$handleHashJoin$2c4c2be2$1(java.util.List com.hazelcast.jet.impl.transform.HashJoinTransform ), ()Lcom/hazelcast/jet/core/Processor;)(keyFns, hashJoin)).v;
        this.dag.edge(Edge.from(primary.v, primary.availableOrdinal++).to(joiner, 0));
        String collectorName = hashJoinName + ".collector.";
        int collectorOrdinal = 1;
        for (Stage fromStage : Planner.tailList(stage.upstream)) {
            PlannerVertex fromPv = this.stage2vertex.get(fromStage);
            JoinClause<?, ?, ?, ?> clause = hashJoin.clauses().get(collectorOrdinal - 1);
            DistributedFunction<?, ?> getKeyFn = clause.rightKeyFn();
            DistributedFunction<?, ?> projectFn = clause.rightProjectFn();
            Vertex collector = this.dag.newVertex(collectorName + collectorOrdinal, () -> new HashJoinCollectP(getKeyFn, projectFn));
            collector.localParallelism(1);
            this.dag.edge(Edge.from(fromPv.v, fromPv.availableOrdinal++).to(collector, 0).distributed().broadcast());
            this.dag.edge(Edge.from(collector, 0).to(joiner, collectorOrdinal).broadcast().priority(-1));
            ++collectorOrdinal;
        }
    }

    private void handlePeek(AbstractStage stage, PeekTransform peekTransform) {
        PlannerVertex peekedPv = this.stage2vertex.get(stage.upstream.get(0));
        this.stage2vertex.put(stage, peekedPv);
        peekedPv.v.updateMetaSupplier(sup -> DiagnosticProcessors.peekOutputP(peekTransform.toStringFn(), peekTransform.shouldLogFn(), sup));
    }

    private void handleSink(AbstractStage stage, SinkImpl sink) {
        PlannerVertex pv = this.addVertex((Stage)stage, sink.name(), sink.metaSupplier());
        this.addEdges(stage, pv.v);
    }

    private PlannerVertex addVertex(Stage stage, String name, DistributedSupplier<Processor> procSupplier) {
        return this.addVertex(stage, name, ProcessorMetaSupplier.of(procSupplier));
    }

    private PlannerVertex addVertex(Stage stage, String name, ProcessorMetaSupplier metaSupplier) {
        PlannerVertex pv = new PlannerVertex(this.dag.newVertex(name, metaSupplier));
        this.stage2vertex.put(stage, pv);
        return pv;
    }

    private void addEdges(AbstractStage stage, Vertex toVertex, BiConsumer<Edge, Integer> configureEdgeFn) {
        int destOrdinal = 0;
        for (Stage fromStage : stage.upstream) {
            PlannerVertex fromPv = this.stage2vertex.get(fromStage);
            Edge edge = Edge.from(fromPv.v, fromPv.availableOrdinal++).to(toVertex, destOrdinal);
            this.dag.edge(edge);
            configureEdgeFn.accept(edge, destOrdinal);
            ++destOrdinal;
        }
    }

    private void addEdges(AbstractStage stage, Vertex toVertex, Consumer<Edge> configureEdgeFn) {
        this.addEdges(stage, toVertex, (Edge e, Integer ord) -> configureEdgeFn.accept((Edge)e));
    }

    private void addEdges(AbstractStage stage, Vertex toVertex) {
        this.addEdges(stage, toVertex, (Edge e) -> {});
    }

    private static String randomSuffix() {
        String uuid = UuidUtil.newUnsecureUUID().toString();
        return uuid.substring(uuid.length() - 8, uuid.length());
    }

    private static <E> List<E> tailList(List<E> list) {
        return list.subList(1, list.size());
    }

    private static /* synthetic */ Processor lambda$handleHashJoin$2c4c2be2$1(List keyFns, HashJoinTransform hashJoin) {
        return new HashJoinP(keyFns, hashJoin.tags());
    }

    private static class PlannerVertex {
        Vertex v;
        int availableOrdinal;

        PlannerVertex(Vertex v) {
            this.v = v;
        }

        public String toString() {
            return this.v.toString();
        }
    }
}

