package org.apache.reef.io.data.loading.impl;

import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.catalog.NodeDescriptor;
import org.apache.reef.io.data.loading.impl.DistributedDataSetPartitionSerializer;
import org.apache.reef.io.data.loading.impl.JobConfExternalConstructor;
import org.apache.reef.tang.annotations.Parameter;

@DriverSide
@Unstable
/* loaded from: input_file:org/apache/reef/io/data/loading/impl/MultiDataCenterEvaluatorToPartitionStrategy.class */
public final class MultiDataCenterEvaluatorToPartitionStrategy extends AbstractEvaluatorToPartitionStrategy {
    private static final Logger LOG = Logger.getLogger(MultiDataCenterEvaluatorToPartitionStrategy.class.getName());
    private Set<String> normalizedLocations;
    private ConcurrentMap<String, BlockingQueue<NumberedSplit<InputSplit>>> partialLocationsToSplits;

    @Inject
    MultiDataCenterEvaluatorToPartitionStrategy(@Parameter(JobConfExternalConstructor.InputFormatClass.class) String str, @Parameter(DistributedDataSetPartitionSerializer.DistributedDataSetPartitions.class) Set<String> set) {
        super(str, set);
    }

    @Override // org.apache.reef.io.data.loading.impl.AbstractEvaluatorToPartitionStrategy
    protected void setUp() {
        this.normalizedLocations = new TreeSet(Collections.reverseOrder());
        this.partialLocationsToSplits = new ConcurrentHashMap();
    }

    @Override // org.apache.reef.io.data.loading.impl.AbstractEvaluatorToPartitionStrategy
    protected void updateLocations(NumberedSplit<InputSplit> numberedSplit) {
        String location = numberedSplit.getLocation();
        addLocationMapping(this.locationToSplits, numberedSplit, location);
        String normalize = normalize(location);
        addLocationMapping(this.partialLocationsToSplits, numberedSplit, normalize);
        this.normalizedLocations.add(normalize);
    }

    @Override // org.apache.reef.io.data.loading.impl.AbstractEvaluatorToPartitionStrategy
    protected NumberedSplit<InputSplit> tryAllocate(NodeDescriptor nodeDescriptor, String str) {
        String name = nodeDescriptor.getRackDescriptor().getName();
        LOG.log(Level.FINE, "Trying an exact match on rack name {0}", name);
        if (this.locationToSplits.containsKey(name)) {
            LOG.log(Level.FINE, "Found splits possibly hosted for {0} at {1}", new Object[]{str, name});
            NumberedSplit<InputSplit> allocateSplit = allocateSplit(str, this.locationToSplits.get(name));
            if (allocateSplit != null) {
                return allocateSplit;
            }
        }
        LOG.fine("No success, trying based on a partial match on locations");
        for (String str2 : this.normalizedLocations) {
            LOG.log(Level.FINE, "Trying on possible location {0}", str2);
            if (name.startsWith(str2)) {
                LOG.log(Level.FINE, "Found splits possibly hosted for {0} at {1} for rack {2}", new Object[]{str, str2, name});
                NumberedSplit<InputSplit> allocateSplit2 = allocateSplit(str, this.partialLocationsToSplits.get(str2));
                if (allocateSplit2 != null) {
                    return allocateSplit2;
                }
            }
        }
        LOG.fine("Nothing found");
        return null;
    }

    private void addLocationMapping(ConcurrentMap<String, BlockingQueue<NumberedSplit<InputSplit>>> concurrentMap, NumberedSplit<InputSplit> numberedSplit, String str) {
        if (!concurrentMap.containsKey(str)) {
            concurrentMap.put(str, new LinkedBlockingQueue());
        }
        concurrentMap.get(str).add(numberedSplit);
    }

    private String normalize(String str) {
        String str2 = str;
        if (!str2.startsWith("/")) {
            str2 = "/" + str2;
        }
        if (str2.equals("/*")) {
            return "/";
        }
        while (true) {
            if (!str2.endsWith("*") && !str2.endsWith("/")) {
                return str2;
            }
            str2 = str2.substring(0, str2.length() - 1);
        }
    }
}
