package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.shaded.com.google.common.hash.HashFunction;
import org.apache.flink.shaded.com.google.common.hash.Hasher;
import org.apache.flink.shaded.com.google.common.hash.Hashing;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.class */
public class StreamingJobGraphGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
    private static final long DEFAULT_RESTART_DELAY = 10000;
    private StreamGraph streamGraph;
    private Map<Integer, JobVertex> jobVertices;
    private JobGraph jobGraph;
    private Collection<Integer> builtVertices;
    private List<StreamEdge> physicalEdgesInOrder;
    private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
    private Map<Integer, StreamConfig> vertexConfigs;
    private Map<Integer, String> chainedNames;

    public StreamingJobGraphGenerator(StreamGraph streamGraph) {
        this.streamGraph = streamGraph;
    }

    private void init() {
        this.jobVertices = new HashMap();
        this.builtVertices = new HashSet();
        this.chainedConfigs = new HashMap();
        this.vertexConfigs = new HashMap();
        this.chainedNames = new HashMap();
        this.physicalEdgesInOrder = new ArrayList();
    }

    public JobGraph createJobGraph() {
        this.jobGraph = new JobGraph(this.streamGraph.getJobName());
        this.jobGraph.setScheduleMode(ScheduleMode.ALL);
        init();
        setChaining(traverseStreamGraphAndGenerateHashes());
        setPhysicalEdges();
        setSlotSharing();
        configureCheckpointing();
        configureRestartStrategy();
        try {
            InstantiationUtil.writeObjectToConfig(this.streamGraph.getExecutionConfig(), this.jobGraph.getJobConfiguration(), "runtime.config");
            return this.jobGraph;
        } catch (IOException e) {
            throw new RuntimeException("Config object could not be written to Job Configuration: ", e);
        }
    }

    private void setPhysicalEdges() {
        HashMap hashMap = new HashMap();
        for (StreamEdge streamEdge : this.physicalEdgesInOrder) {
            int targetId = streamEdge.getTargetId();
            List list = (List) hashMap.get(Integer.valueOf(targetId));
            if (list == null) {
                list = new ArrayList();
                hashMap.put(Integer.valueOf(targetId), list);
            }
            list.add(streamEdge);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            int intValue = ((Integer) entry.getKey()).intValue();
            this.vertexConfigs.get(Integer.valueOf(intValue)).setInPhysicalEdges((List) entry.getValue());
        }
    }

    private void setChaining(Map<Integer, byte[]> map) {
        for (Integer num : this.streamGraph.getSourceIDs()) {
            createChain(num, num, map);
        }
    }

    private List<StreamEdge> createChain(Integer num, Integer num2, Map<Integer, byte[]> map) {
        if (this.builtVertices.contains(num)) {
            return new ArrayList();
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (StreamEdge streamEdge : this.streamGraph.getStreamNode(num2).getOutEdges()) {
            if (isChainable(streamEdge)) {
                arrayList2.add(streamEdge);
            } else {
                arrayList3.add(streamEdge);
            }
        }
        Iterator<StreamEdge> it = arrayList2.iterator();
        while (it.hasNext()) {
            arrayList.addAll(createChain(num, Integer.valueOf(it.next().getTargetId()), map));
        }
        for (StreamEdge streamEdge2 : arrayList3) {
            arrayList.add(streamEdge2);
            createChain(Integer.valueOf(streamEdge2.getTargetId()), Integer.valueOf(streamEdge2.getTargetId()), map);
        }
        this.chainedNames.put(num2, createChainedName(num2, arrayList2));
        StreamConfig createJobVertex = num2.equals(num) ? createJobVertex(num, map) : new StreamConfig(new Configuration());
        setVertexConfig(num2, createJobVertex, arrayList2, arrayList3);
        if (num2.equals(num)) {
            createJobVertex.setChainStart();
            createJobVertex.setOutEdgesInOrder(arrayList);
            createJobVertex.setOutEdges(this.streamGraph.getStreamNode(num2).getOutEdges());
            Iterator<StreamEdge> it2 = arrayList.iterator();
            while (it2.hasNext()) {
                connect(num, it2.next());
            }
            createJobVertex.setTransitiveChainedTaskConfigs(this.chainedConfigs.get(num));
        } else {
            if (this.chainedConfigs.get(num) == null) {
                this.chainedConfigs.put(num, new HashMap());
            }
            this.chainedConfigs.get(num).put(num2, createJobVertex);
        }
        return arrayList;
    }

    private String createChainedName(Integer num, List<StreamEdge> list) {
        String operatorName = this.streamGraph.getStreamNode(num).getOperatorName();
        if (list.size() <= 1) {
            return list.size() == 1 ? operatorName + " -> " + this.chainedNames.get(Integer.valueOf(list.get(0).getTargetId())) : operatorName;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<StreamEdge> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(this.chainedNames.get(Integer.valueOf(it.next().getTargetId())));
        }
        return operatorName + " -> (" + StringUtils.join(arrayList, ", ") + ")";
    }

    private StreamConfig createJobVertex(Integer num, Map<Integer, byte[]> map) {
        JobVertex jobVertex;
        StreamNode streamNode = this.streamGraph.getStreamNode(num);
        byte[] bArr = map.get(num);
        if (bArr == null) {
            throw new IllegalStateException("Cannot find node hash. Did you generate them before calling this method?");
        }
        JobVertexID jobVertexID = new JobVertexID(bArr);
        if (streamNode.getInputFormat() != null) {
            jobVertex = new InputFormatVertex(this.chainedNames.get(num), jobVertexID);
            new TaskConfig(jobVertex.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper(streamNode.getInputFormat()));
        } else {
            jobVertex = new JobVertex(this.chainedNames.get(num), jobVertexID);
        }
        jobVertex.setInvokableClass(streamNode.getJobVertexClass());
        int parallelism = streamNode.getParallelism();
        if (parallelism > 0) {
            jobVertex.setParallelism(parallelism);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parallelism set: {} for {}", Integer.valueOf(parallelism), num);
        }
        this.jobVertices.put(num, jobVertex);
        this.builtVertices.add(num);
        this.jobGraph.addVertex(jobVertex);
        return new StreamConfig(jobVertex.getConfiguration());
    }

    private void setVertexConfig(Integer num, StreamConfig streamConfig, List<StreamEdge> list, List<StreamEdge> list2) {
        StreamNode streamNode = this.streamGraph.getStreamNode(num);
        streamConfig.setVertexID(num);
        streamConfig.setBufferTimeout(streamNode.getBufferTimeout().longValue());
        streamConfig.setTypeSerializerIn1(streamNode.getTypeSerializerIn1());
        streamConfig.setTypeSerializerIn2(streamNode.getTypeSerializerIn2());
        streamConfig.setTypeSerializerOut(streamNode.getTypeSerializerOut());
        streamConfig.setStreamOperator(streamNode.getOperator());
        streamConfig.setOutputSelectors(streamNode.getOutputSelectors());
        streamConfig.setNumberOfOutputs(list2.size());
        streamConfig.setNonChainedOutputs(list2);
        streamConfig.setChainedOutputs(list);
        streamConfig.setTimeCharacteristic(this.streamGraph.getEnvironment().getStreamTimeCharacteristic());
        CheckpointConfig checkpointConfig = this.streamGraph.getCheckpointConfig();
        streamConfig.setStateBackend(this.streamGraph.getStateBackend());
        streamConfig.setCheckpointingEnabled(checkpointConfig.isCheckpointingEnabled());
        if (checkpointConfig.isCheckpointingEnabled()) {
            streamConfig.setCheckpointMode(checkpointConfig.getCheckpointingMode());
        } else {
            streamConfig.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
        }
        streamConfig.setStatePartitioner(0, streamNode.getStatePartitioner1());
        streamConfig.setStatePartitioner(1, streamNode.getStatePartitioner2());
        streamConfig.setStateKeySerializer(streamNode.getStateKeySerializer());
        Class<? extends AbstractInvokable> jobVertexClass = streamNode.getJobVertexClass();
        if (jobVertexClass.equals(StreamIterationHead.class) || jobVertexClass.equals(StreamIterationTail.class)) {
            streamConfig.setIterationId(this.streamGraph.getBrokerID(num));
            streamConfig.setIterationWaitTime(this.streamGraph.getLoopTimeout(num));
        }
        new ArrayList(list).addAll(list2);
        this.vertexConfigs.put(num, streamConfig);
    }

    private void connect(Integer num, StreamEdge streamEdge) {
        this.physicalEdgesInOrder.add(streamEdge);
        Integer valueOf = Integer.valueOf(streamEdge.getTargetId());
        JobVertex jobVertex = this.jobVertices.get(num);
        JobVertex jobVertex2 = this.jobVertices.get(valueOf);
        StreamConfig streamConfig = new StreamConfig(jobVertex2.getConfiguration());
        streamConfig.setNumberOfInputs(streamConfig.getNumberOfInputs() + 1);
        StreamPartitioner<?> partitioner = streamEdge.getPartitioner();
        if (partitioner instanceof ForwardPartitioner) {
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED, true);
        } else if (partitioner instanceof RescalePartitioner) {
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED, true);
        } else {
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, true);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("CONNECTED: {} - {} -> {}", new Object[]{partitioner.getClass().getSimpleName(), num, valueOf});
        }
    }

    private boolean isChainable(StreamEdge streamEdge) {
        StreamNode sourceVertex = streamEdge.getSourceVertex();
        StreamNode targetVertex = streamEdge.getTargetVertex();
        StreamOperator<?> operator = sourceVertex.getOperator();
        StreamOperator<?> operator2 = targetVertex.getOperator();
        return targetVertex.getInEdges().size() == 1 && operator2 != null && operator != null && sourceVertex.isSameSlotSharingGroup(targetVertex) && operator2.getChainingStrategy() == ChainingStrategy.ALWAYS && (operator.getChainingStrategy() == ChainingStrategy.HEAD || operator.getChainingStrategy() == ChainingStrategy.ALWAYS) && (streamEdge.getPartitioner() instanceof ForwardPartitioner) && sourceVertex.getParallelism() == targetVertex.getParallelism() && this.streamGraph.isChainingEnabled();
    }

    private void setSlotSharing() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<Integer, JobVertex> entry : this.jobVertices.entrySet()) {
            String slotSharingGroup = this.streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup();
            SlotSharingGroup slotSharingGroup2 = (SlotSharingGroup) hashMap.get(slotSharingGroup);
            if (slotSharingGroup2 == null) {
                slotSharingGroup2 = new SlotSharingGroup();
                hashMap.put(slotSharingGroup, slotSharingGroup2);
            }
            entry.getValue().setSlotSharingGroup(slotSharingGroup2);
        }
        for (Tuple2<StreamNode, StreamNode> tuple2 : this.streamGraph.getIterationSourceSinkPairs()) {
            CoLocationGroup coLocationGroup = new CoLocationGroup();
            JobVertex jobVertex = this.jobVertices.get(Integer.valueOf(((StreamNode) tuple2.f0).getId()));
            JobVertex jobVertex2 = this.jobVertices.get(Integer.valueOf(((StreamNode) tuple2.f1).getId()));
            coLocationGroup.addVertex(jobVertex);
            coLocationGroup.addVertex(jobVertex2);
            jobVertex.updateCoLocationGroup(coLocationGroup);
            jobVertex2.updateCoLocationGroup(coLocationGroup);
        }
    }

    private void configureCheckpointing() {
        CheckpointConfig checkpointConfig = this.streamGraph.getCheckpointConfig();
        if (checkpointConfig.isCheckpointingEnabled()) {
            long checkpointInterval = checkpointConfig.getCheckpointInterval();
            if (checkpointInterval < 1) {
                throw new IllegalArgumentException("The checkpoint interval must be positive");
            }
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList(this.jobVertices.size());
            ArrayList arrayList3 = new ArrayList();
            for (JobVertex jobVertex : this.jobVertices.values()) {
                if (jobVertex.isInputVertex()) {
                    arrayList.add(jobVertex.getID());
                }
                arrayList3.add(jobVertex.getID());
                arrayList2.add(jobVertex.getID());
            }
            this.jobGraph.setSnapshotSettings(new JobSnapshottingSettings(arrayList, arrayList2, arrayList3, checkpointInterval, checkpointConfig.getCheckpointTimeout(), checkpointConfig.getMinPauseBetweenCheckpoints(), checkpointConfig.getMaxConcurrentCheckpoints()));
            if (this.streamGraph.getExecutionConfig().getRestartStrategy() == null) {
                this.streamGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
            }
        }
    }

    private void configureRestartStrategy() {
        this.jobGraph.setRestartStrategyConfiguration(this.streamGraph.getExecutionConfig().getRestartStrategy());
    }

    private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
        HashFunction murmur3_128 = Hashing.murmur3_128(0);
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        ArrayDeque arrayDeque = new ArrayDeque();
        ArrayList<Integer> arrayList = new ArrayList();
        Iterator<Integer> it = this.streamGraph.getSourceIDs().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next());
        }
        Collections.sort(arrayList);
        for (Integer num : arrayList) {
            arrayDeque.add(this.streamGraph.getStreamNode(num));
            hashSet.add(num);
        }
        while (true) {
            StreamNode streamNode = (StreamNode) arrayDeque.poll();
            if (streamNode == null) {
                return hashMap;
            }
            if (generateNodeHash(streamNode, murmur3_128, hashMap)) {
                Iterator<StreamEdge> it2 = streamNode.getOutEdges().iterator();
                while (it2.hasNext()) {
                    StreamNode targetVertex = it2.next().getTargetVertex();
                    if (!hashSet.contains(Integer.valueOf(targetVertex.getId()))) {
                        arrayDeque.add(targetVertex);
                        hashSet.add(Integer.valueOf(targetVertex.getId()));
                    }
                }
            } else {
                hashSet.remove(Integer.valueOf(streamNode.getId()));
            }
        }
    }

    private boolean generateNodeHash(StreamNode streamNode, HashFunction hashFunction, Map<Integer, byte[]> map) {
        if (streamNode.getTransformationId() == null) {
            Iterator<StreamEdge> it = streamNode.getInEdges().iterator();
            while (it.hasNext()) {
                if (!map.containsKey(Integer.valueOf(it.next().getSourceId()))) {
                    return false;
                }
            }
            if (map.put(Integer.valueOf(streamNode.getId()), generateDeterministicHash(streamNode, hashFunction.newHasher(), map)) != null) {
                throw new IllegalStateException("Unexpected state. Tried to add node hash twice. This is probably a bug in the JobGraph generator.");
            }
            return true;
        }
        Iterator<StreamEdge> it2 = streamNode.getInEdges().iterator();
        while (it2.hasNext()) {
            if (isChainable(it2.next())) {
                throw new UnsupportedOperationException("Cannot assign user-specified hash to intermediate node in chain. This will be supported in future versions of Flink. As a work around start new chain at task " + streamNode.getOperatorName() + ".");
            }
        }
        byte[] generateUserSpecifiedHash = generateUserSpecifiedHash(streamNode, hashFunction.newHasher());
        Iterator<byte[]> it3 = map.values().iterator();
        while (it3.hasNext()) {
            if (Arrays.equals(it3.next(), generateUserSpecifiedHash)) {
                throw new IllegalArgumentException("Hash collision on user-specified ID. Most likely cause is a non-unique ID. Please check that all IDs specified via `uid(String)` are unique.");
            }
        }
        if (map.put(Integer.valueOf(streamNode.getId()), generateUserSpecifiedHash) != null) {
            throw new IllegalStateException("Unexpected state. Tried to add node hash twice. This is probably a bug in the JobGraph generator.");
        }
        return true;
    }

    private byte[] generateUserSpecifiedHash(StreamNode streamNode, Hasher hasher) {
        hasher.putString((CharSequence) streamNode.getTransformationId(), Charset.forName("UTF-8"));
        return hasher.hash().asBytes();
    }

    private byte[] generateDeterministicHash(StreamNode streamNode, Hasher hasher, Map<Integer, byte[]> map) {
        generateNodeLocalHash(streamNode, hasher, map.size());
        for (StreamEdge streamEdge : streamNode.getOutEdges()) {
            if (isChainable(streamEdge)) {
                generateNodeLocalHash(streamEdge.getTargetVertex(), hasher, map.size());
            }
        }
        byte[] asBytes = hasher.hash().asBytes();
        for (StreamEdge streamEdge2 : streamNode.getInEdges()) {
            byte[] bArr = map.get(Integer.valueOf(streamEdge2.getSourceId()));
            if (bArr == null) {
                throw new IllegalStateException("Missing hash for input node " + streamEdge2.getSourceVertex() + ". Cannot generate hash for " + streamNode + ".");
            }
            for (int i = 0; i < asBytes.length; i++) {
                asBytes[i] = (byte) ((asBytes[i] * 37) ^ bArr[i]);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Generated hash '" + org.apache.flink.util.StringUtils.byteToHexString(asBytes) + "' for node '" + streamNode.toString() + "' {id: " + streamNode.getId() + ", parallelism: " + streamNode.getParallelism() + ", user function: " + (streamNode.getOperator() instanceof AbstractUdfStreamOperator ? ((AbstractUdfStreamOperator) streamNode.getOperator()).getUserFunction().getClass().getName() : "") + "}");
        }
        return asBytes;
    }

    private void generateNodeLocalHash(StreamNode streamNode, Hasher hasher, int i) {
        hasher.putInt(i);
        hasher.putInt(streamNode.getParallelism());
        if (streamNode.getOperator() instanceof AbstractUdfStreamOperator) {
            hasher.putString((CharSequence) ((AbstractUdfStreamOperator) streamNode.getOperator()).getUserFunction().getClass().getName(), Charset.forName("UTF-8"));
        }
    }
}
