package org.apache.flink.runtime.jobmanager.splitassigner;

import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.class */
public class DefaultInputSplitAssigner implements InputSplitAssigner {
    private static final Log LOG = LogFactory.getLog(DefaultInputSplitAssigner.class);
    private final ConcurrentMap<ExecutionGroupVertex, Queue<InputSplit>> splitMap = new ConcurrentHashMap();

    @Override // org.apache.flink.runtime.jobmanager.splitassigner.InputSplitAssigner
    public void registerGroupVertex(ExecutionGroupVertex executionGroupVertex) {
        InputSplit[] inputSplits = executionGroupVertex.getInputSplits();
        if (inputSplits == null || inputSplits.length == 0) {
            return;
        }
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        if (this.splitMap.putIfAbsent(executionGroupVertex, concurrentLinkedQueue) != null) {
            LOG.error("Group vertex " + executionGroupVertex.getName() + " already has a split queue");
        }
        concurrentLinkedQueue.addAll(Arrays.asList(inputSplits));
    }

    @Override // org.apache.flink.runtime.jobmanager.splitassigner.InputSplitAssigner
    public void unregisterGroupVertex(ExecutionGroupVertex executionGroupVertex) {
        this.splitMap.remove(executionGroupVertex);
    }

    @Override // org.apache.flink.runtime.jobmanager.splitassigner.InputSplitAssigner
    public InputSplit getNextInputSplit(ExecutionVertex executionVertex) {
        Queue<InputSplit> queue = this.splitMap.get(executionVertex.getGroupVertex());
        if (queue == null) {
            LOG.error("Cannot find split queue for vertex " + executionVertex.getGroupVertex() + " (job " + executionVertex.getExecutionGraph().getJobID() + ")");
            return null;
        }
        InputSplit poll = queue.poll();
        if (LOG.isDebugEnabled() && poll != null) {
            LOG.debug("Assigning split " + poll.getSplitNumber() + " to " + executionVertex);
        }
        return poll;
    }
}
