package org.apache.flink.runtime.jobmanager.splitassigner;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.instance.Instance;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList.class */
public final class LocatableInputSplitList {
    private static final Log LOG = LogFactory.getLog(LocatableInputSplitList.class);
    private Set<LocatableInputSplit> masterSet = new HashSet();
    private Map<Instance, Queue<QueueElem>> instanceMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList$QueueElem.class */
    public final class QueueElem implements Comparable<QueueElem> {
        final LocatableInputSplit inputSplit;
        final int distance;

        private QueueElem(LocatableInputSplit locatableInputSplit, int i) {
            this.inputSplit = locatableInputSplit;
            this.distance = i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public LocatableInputSplit getInputSplit() {
            return this.inputSplit;
        }

        @Override // java.lang.Comparable
        public int compareTo(QueueElem queueElem) {
            return this.distance - queueElem.distance;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addSplit(LocatableInputSplit locatableInputSplit) {
        this.masterSet.add(locatableInputSplit);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized LocatableInputSplit getNextInputSplit(Instance instance) {
        Queue<QueueElem> instanceSplitList = getInstanceSplitList(instance);
        do {
            QueueElem poll = instanceSplitList.poll();
            if (poll == null) {
                return null;
            }
            if (this.masterSet.remove(poll.getInputSplit())) {
                if (LOG.isInfoEnabled()) {
                    if (poll.distance == 0) {
                        LOG.info(instance + " receives local file input split");
                    } else {
                        LOG.info(instance + " receives remote file input split (distance " + poll.distance + ")");
                    }
                }
                return poll.getInputSplit();
            }
        } while (!this.masterSet.isEmpty());
        return null;
    }

    private Queue<QueueElem> getInstanceSplitList(Instance instance) {
        Queue<QueueElem> queue = this.instanceMap.get(instance);
        if (queue == null) {
            queue = new PriorityQueue();
            for (LocatableInputSplit locatableInputSplit : this.masterSet) {
                String[] hostnames = locatableInputSplit.getHostnames();
                if (hostnames == null) {
                    queue.add(new QueueElem(locatableInputSplit, Integer.MAX_VALUE));
                } else {
                    int i = Integer.MAX_VALUE;
                    for (int i2 = 0; i2 < hostnames.length; i2++) {
                        int distance = instance.getDistance(hostnames[i2]);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Distance between " + instance + " and " + hostnames[i2] + " is " + distance);
                        }
                        if (distance < i) {
                            i = distance;
                        }
                    }
                    queue.add(new QueueElem(locatableInputSplit, i));
                }
            }
            this.instanceMap.put(instance, queue);
        }
        return queue;
    }
}
