package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.jet.impl.TopologicalSorter;
import com.hazelcast.jet.impl.pipeline.transform.SinkTransform;
import com.hazelcast.jet.impl.pipeline.transform.StreamSourceTransform;
import com.hazelcast.jet.impl.pipeline.transform.TimestampTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/pipeline/Planner.class */
public class Planner {
    private static final ILogger LOGGER = Logger.getLogger(Planner.class);
    private static final int MAXIMUM_WATERMARK_GAP = 1000;
    public final DAG dag = new DAG();
    public final Map<Transform, PlannerVertex> xform2vertex = new HashMap();
    private final PipelineImpl pipeline;

    /* loaded from: input_file:com/hazelcast/jet/impl/pipeline/Planner$PlannerVertex.class */
    public static class PlannerVertex {
        public final Vertex v;
        private int availableOrdinal;

        PlannerVertex(Vertex vertex) {
            this.v = vertex;
        }

        public String toString() {
            return this.v.toString();
        }

        public int nextAvailableOrdinal() {
            int i = this.availableOrdinal;
            this.availableOrdinal = i + 1;
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Planner(PipelineImpl pipelineImpl) {
        this.pipeline = pipelineImpl;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DAG createDag() {
        this.pipeline.makeNamesUnique();
        Map<Transform, List<Transform>> adjacencyMap = this.pipeline.adjacencyMap();
        validateNoLeakage(adjacencyMap);
        long gcd = Util.gcd(adjacencyMap.keySet().stream().map((v0) -> {
            return v0.preferredWatermarkStride();
        }).filter(l -> {
            return l.longValue() > 0;
        }).mapToLong(l2 -> {
            return l2.longValue();
        }).toArray());
        if (gcd == 0) {
            gcd = 1000;
        }
        if (gcd > 1000) {
            gcd = Util.gcd(gcd, 1000L);
        }
        LoggingUtil.logFine(LOGGER, "Watermarks in the pipeline will be throttled to %d", Long.valueOf(gcd));
        for (Transform transform : adjacencyMap.keySet()) {
            if (transform instanceof StreamSourceTransform) {
                StreamSourceTransform streamSourceTransform = (StreamSourceTransform) transform;
                EventTimePolicy eventTimePolicy = streamSourceTransform.getEventTimePolicy();
                if (eventTimePolicy != null) {
                    streamSourceTransform.setEventTimePolicy(withFrameSize(eventTimePolicy, gcd));
                }
            } else if (transform instanceof TimestampTransform) {
                TimestampTransform timestampTransform = (TimestampTransform) transform;
                timestampTransform.setEventTimePolicy(withFrameSize(timestampTransform.getEventTimePolicy(), gcd));
            }
        }
        TopologicalSorter.checkTopologicalSort(adjacencyMap.entrySet());
        Iterator<Transform> it = adjacencyMap.keySet().iterator();
        while (it.hasNext()) {
            it.next().addToDag(this);
        }
        return this.dag;
    }

    private static void validateNoLeakage(Map<Transform, List<Transform>> map) {
        List list = (List) map.entrySet().stream().filter(entry -> {
            return !(entry.getKey() instanceof SinkTransform);
        }).filter(entry2 -> {
            return ((List) entry2.getValue()).isEmpty();
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
        if (!list.isEmpty()) {
            throw new IllegalArgumentException("These transforms have nothing attached to them: " + list);
        }
    }

    public PlannerVertex addVertex(Transform transform, String str, int i, SupplierEx<Processor> supplierEx) {
        return addVertex(transform, str, i, ProcessorMetaSupplier.of(supplierEx));
    }

    public PlannerVertex addVertex(Transform transform, String str, int i, ProcessorSupplier processorSupplier) {
        return addVertex(transform, str, i, ProcessorMetaSupplier.of(processorSupplier));
    }

    public PlannerVertex addVertex(Transform transform, String str, int i, ProcessorMetaSupplier processorMetaSupplier) {
        PlannerVertex plannerVertex = new PlannerVertex(this.dag.newVertex(str, processorMetaSupplier));
        plannerVertex.v.localParallelism(i);
        this.xform2vertex.put(transform, plannerVertex);
        return plannerVertex;
    }

    public void addEdges(Transform transform, Vertex vertex, BiConsumer<Edge, Integer> biConsumer) {
        int i = 0;
        Iterator<Transform> it = transform.upstream().iterator();
        while (it.hasNext()) {
            PlannerVertex plannerVertex = this.xform2vertex.get(it.next());
            Edge edge = Edge.from(plannerVertex.v, plannerVertex.nextAvailableOrdinal()).to(vertex, i);
            this.dag.edge(edge);
            biConsumer.accept(edge, Integer.valueOf(i));
            i++;
        }
    }

    public void addEdges(Transform transform, Vertex vertex, Consumer<Edge> consumer) {
        addEdges(transform, vertex, (edge, num) -> {
            consumer.accept(edge);
        });
    }

    public void addEdges(Transform transform, Vertex vertex) {
        addEdges(transform, vertex, edge -> {
        });
    }

    @Nonnull
    private static <T> EventTimePolicy<T> withFrameSize(EventTimePolicy<T> eventTimePolicy, long j) {
        return EventTimePolicy.eventTimePolicy(eventTimePolicy.timestampFn(), eventTimePolicy.wrapFn(), eventTimePolicy.newWmPolicyFn(), j, 0L, eventTimePolicy.idleTimeoutMillis());
    }

    public static <E> List<E> tailList(List<E> list) {
        return list.subList(1, list.size());
    }
}
