/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.jet.impl.TopologicalSorter;
import com.hazelcast.jet.impl.pipeline.PipelineImpl;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.SinkTransform;
import com.hazelcast.jet.impl.pipeline.transform.StreamSourceTransform;
import com.hazelcast.jet.impl.pipeline.transform.TimestampTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class Planner {
    private static final ILogger LOGGER = Logger.getLogger(Planner.class);
    private static final int MAXIMUM_WATERMARK_GAP = 1000;
    public final DAG dag = new DAG();
    public final Map<Transform, PlannerVertex> xform2vertex = new HashMap<Transform, PlannerVertex>();
    private final PipelineImpl pipeline;

    Planner(PipelineImpl pipeline) {
        this.pipeline = pipeline;
    }

    DAG createDag() {
        this.pipeline.makeNamesUnique();
        Map<Transform, List<Transform>> adjacencyMap = this.pipeline.adjacencyMap();
        Planner.validateNoLeakage(adjacencyMap);
        long frameSizeGcd = Util.gcd(adjacencyMap.keySet().stream().map(Transform::preferredWatermarkStride).filter(frameSize -> frameSize > 0L).mapToLong(i -> i).toArray());
        if (frameSizeGcd == 0L) {
            frameSizeGcd = 1000L;
        }
        if (frameSizeGcd > 1000L) {
            frameSizeGcd = Util.gcd(frameSizeGcd, 1000L);
        }
        LoggingUtil.logFine(LOGGER, "Watermarks in the pipeline will be throttled to %d", frameSizeGcd);
        for (Transform transform : adjacencyMap.keySet()) {
            AbstractTransform t;
            if (transform instanceof StreamSourceTransform) {
                t = (StreamSourceTransform)transform;
                EventTimePolicy policy = ((StreamSourceTransform)t).getEventTimePolicy();
                if (policy == null) continue;
                ((StreamSourceTransform)t).setEventTimePolicy(Planner.withFrameSize(policy, frameSizeGcd));
                continue;
            }
            if (!(transform instanceof TimestampTransform)) continue;
            t = (TimestampTransform)transform;
            ((TimestampTransform)t).setEventTimePolicy(Planner.withFrameSize(((TimestampTransform)t).getEventTimePolicy(), frameSizeGcd));
        }
        TopologicalSorter.checkTopologicalSort(adjacencyMap.entrySet());
        for (Transform transform : adjacencyMap.keySet()) {
            transform.addToDag(this);
        }
        return this.dag;
    }

    private static void validateNoLeakage(Map<Transform, List<Transform>> adjacencyMap) {
        List leakages = adjacencyMap.entrySet().stream().filter(e -> !(e.getKey() instanceof SinkTransform)).filter(e -> ((List)e.getValue()).isEmpty()).map(Map.Entry::getKey).collect(Collectors.toList());
        if (!leakages.isEmpty()) {
            throw new IllegalArgumentException("These transforms have nothing attached to them: " + leakages);
        }
    }

    public PlannerVertex addVertex(Transform transform, String name, int localParallelism, SupplierEx<Processor> procSupplier) {
        return this.addVertex(transform, name, localParallelism, ProcessorMetaSupplier.of(procSupplier));
    }

    public PlannerVertex addVertex(Transform transform, String name, int localParallelism, ProcessorSupplier procSupplier) {
        return this.addVertex(transform, name, localParallelism, ProcessorMetaSupplier.of(procSupplier));
    }

    public PlannerVertex addVertex(Transform transform, String name, int localParallelism, ProcessorMetaSupplier metaSupplier) {
        PlannerVertex pv = new PlannerVertex(this.dag.newVertex(name, metaSupplier));
        pv.v.localParallelism(localParallelism);
        this.xform2vertex.put(transform, pv);
        return pv;
    }

    public void addEdges(Transform transform, Vertex toVertex, BiConsumer<Edge, Integer> configureEdgeFn) {
        int destOrdinal = 0;
        for (Transform fromTransform : transform.upstream()) {
            PlannerVertex fromPv = this.xform2vertex.get(fromTransform);
            Edge edge = Edge.from(fromPv.v, fromPv.nextAvailableOrdinal()).to(toVertex, destOrdinal);
            this.dag.edge(edge);
            configureEdgeFn.accept(edge, destOrdinal);
            ++destOrdinal;
        }
    }

    public void addEdges(Transform transform, Vertex toVertex, Consumer<Edge> configureEdgeFn) {
        this.addEdges(transform, toVertex, (Edge e, Integer ord) -> configureEdgeFn.accept((Edge)e));
    }

    public void addEdges(Transform transform, Vertex toVertex) {
        this.addEdges(transform, toVertex, (Edge e) -> {});
    }

    @Nonnull
    private static <T> EventTimePolicy<T> withFrameSize(EventTimePolicy<T> original, long watermarkThrottlingFrameSize) {
        return EventTimePolicy.eventTimePolicy(original.timestampFn(), original.wrapFn(), original.newWmPolicyFn(), watermarkThrottlingFrameSize, 0L, original.idleTimeoutMillis());
    }

    public static <E> List<E> tailList(List<E> list) {
        return list.subList(1, list.size());
    }

    public static class PlannerVertex {
        public final Vertex v;
        private int availableOrdinal;

        PlannerVertex(Vertex v) {
            this.v = v;
        }

        public String toString() {
            return this.v.toString();
        }

        public int nextAvailableOrdinal() {
            return this.availableOrdinal++;
        }
    }
}

