package org.apache.hadoop.mapreduce.lib.map;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;

@InterfaceStability.Stable
@InterfaceAudience.Public
/* loaded from: input_file:hadoop-2.7.5.1/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.5.1.jar:org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.class */
public class MultithreadedMapper<K1, V1, K2, V2> extends Mapper<K1, V1, K2, V2> {
    private static final Log LOG = LogFactory.getLog(MultithreadedMapper.class);
    public static String NUM_THREADS = "mapreduce.mapper.multithreadedmapper.threads";
    public static String MAP_CLASS = "mapreduce.mapper.multithreadedmapper.mapclass";
    private Class<? extends Mapper<K1, V1, K2, V2>> mapClass;
    private Mapper<K1, V1, K2, V2>.Context outer;
    private List<MultithreadedMapper<K1, V1, K2, V2>.MapRunner> runners;

    /* loaded from: input_file:hadoop-2.7.5.1/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.5.1.jar:org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper$MapRunner.class */
    private class MapRunner extends Thread {
        private Mapper<K1, V1, K2, V2> mapper;
        private Mapper<K1, V1, K2, V2>.Context subcontext;
        private Throwable throwable;
        private RecordReader<K1, V1> reader;

        MapRunner(Mapper<K1, V1, K2, V2>.Context context) throws IOException, InterruptedException {
            this.reader = new SubMapRecordReader();
            this.mapper = (Mapper) ReflectionUtils.newInstance(MultithreadedMapper.this.mapClass, context.getConfiguration());
            this.subcontext = new WrappedMapper().getMapContext(new MapContextImpl(MultithreadedMapper.this.outer.getConfiguration(), MultithreadedMapper.this.outer.getTaskAttemptID(), this.reader, new SubMapRecordWriter(), context.getOutputCommitter(), new SubMapStatusReporter(), MultithreadedMapper.this.outer.getInputSplit()));
            this.reader.initialize(context.getInputSplit(), context);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.mapper.run(this.subcontext);
                this.reader.close();
            } catch (Throwable th) {
                this.throwable = th;
            }
        }
    }

    /* loaded from: input_file:hadoop-2.7.5.1/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.5.1.jar:org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper$SubMapRecordReader.class */
    private class SubMapRecordReader extends RecordReader<K1, V1> {
        private K1 key;
        private V1 value;
        private Configuration conf;

        private SubMapRecordReader() {
        }

        @Override // org.apache.hadoop.mapreduce.RecordReader, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        @Override // org.apache.hadoop.mapreduce.RecordReader
        public float getProgress() throws IOException, InterruptedException {
            return CapacitySchedulerConfiguration.MINIMUM_CAPACITY_VALUE;
        }

        @Override // org.apache.hadoop.mapreduce.RecordReader
        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            this.conf = taskAttemptContext.getConfiguration();
        }

        @Override // org.apache.hadoop.mapreduce.RecordReader
        public boolean nextKeyValue() throws IOException, InterruptedException {
            synchronized (MultithreadedMapper.this.outer) {
                if (!MultithreadedMapper.this.outer.nextKeyValue()) {
                    return false;
                }
                this.key = (K1) ReflectionUtils.copy(MultithreadedMapper.this.outer.getConfiguration(), MultithreadedMapper.this.outer.getCurrentKey(), this.key);
                this.value = (V1) ReflectionUtils.copy(this.conf, MultithreadedMapper.this.outer.getCurrentValue(), this.value);
                return true;
            }
        }

        @Override // org.apache.hadoop.mapreduce.RecordReader
        public K1 getCurrentKey() {
            return this.key;
        }

        @Override // org.apache.hadoop.mapreduce.RecordReader
        public V1 getCurrentValue() {
            return this.value;
        }
    }

    /* loaded from: input_file:hadoop-2.7.5.1/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.5.1.jar:org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper$SubMapRecordWriter.class */
    private class SubMapRecordWriter extends RecordWriter<K2, V2> {
        private SubMapRecordWriter() {
        }

        @Override // org.apache.hadoop.mapreduce.RecordWriter
        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        }

        @Override // org.apache.hadoop.mapreduce.RecordWriter
        public void write(K2 k2, V2 v2) throws IOException, InterruptedException {
            synchronized (MultithreadedMapper.this.outer) {
                MultithreadedMapper.this.outer.write(k2, v2);
            }
        }
    }

    /* loaded from: input_file:hadoop-2.7.5.1/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.5.1.jar:org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper$SubMapStatusReporter.class */
    private class SubMapStatusReporter extends StatusReporter {
        private SubMapStatusReporter() {
        }

        @Override // org.apache.hadoop.mapreduce.StatusReporter, org.apache.hadoop.mapred.Reporter
        public Counter getCounter(Enum<?> r4) {
            return MultithreadedMapper.this.outer.getCounter(r4);
        }

        @Override // org.apache.hadoop.mapreduce.StatusReporter, org.apache.hadoop.mapred.Reporter
        public Counter getCounter(String str, String str2) {
            return MultithreadedMapper.this.outer.getCounter(str, str2);
        }

        @Override // org.apache.hadoop.mapreduce.StatusReporter, org.apache.hadoop.util.Progressable
        public void progress() {
            MultithreadedMapper.this.outer.progress();
        }

        @Override // org.apache.hadoop.mapreduce.StatusReporter, org.apache.hadoop.mapred.Reporter
        public void setStatus(String str) {
            MultithreadedMapper.this.outer.setStatus(str);
        }

        @Override // org.apache.hadoop.mapreduce.StatusReporter, org.apache.hadoop.mapred.Reporter
        public float getProgress() {
            return MultithreadedMapper.this.outer.getProgress();
        }
    }

    public static int getNumberOfThreads(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(NUM_THREADS, 10);
    }

    public static void setNumberOfThreads(Job job, int i) {
        job.getConfiguration().setInt(NUM_THREADS, i);
    }

    public static <K1, V1, K2, V2> Class<Mapper<K1, V1, K2, V2>> getMapperClass(JobContext jobContext) {
        return (Class<Mapper<K1, V1, K2, V2>>) jobContext.getConfiguration().getClass(MAP_CLASS, Mapper.class);
    }

    public static <K1, V1, K2, V2> void setMapperClass(Job job, Class<? extends Mapper<K1, V1, K2, V2>> cls) {
        if (MultithreadedMapper.class.isAssignableFrom(cls)) {
            throw new IllegalArgumentException("Can't have recursive MultithreadedMapper instances.");
        }
        job.getConfiguration().setClass(MAP_CLASS, cls, Mapper.class);
    }

    @Override // org.apache.hadoop.mapreduce.Mapper
    public void run(Mapper<K1, V1, K2, V2>.Context context) throws IOException, InterruptedException {
        this.outer = context;
        int numberOfThreads = getNumberOfThreads(context);
        this.mapClass = getMapperClass(context);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Configuring multithread runner to use " + numberOfThreads + " threads");
        }
        this.runners = new ArrayList(numberOfThreads);
        for (int i = 0; i < numberOfThreads; i++) {
            MultithreadedMapper<K1, V1, K2, V2>.MapRunner mapRunner = new MapRunner(context);
            mapRunner.start();
            this.runners.add(i, mapRunner);
        }
        for (int i2 = 0; i2 < numberOfThreads; i2++) {
            MultithreadedMapper<K1, V1, K2, V2>.MapRunner mapRunner2 = this.runners.get(i2);
            mapRunner2.join();
            Throwable th = ((MapRunner) mapRunner2).throwable;
            if (th != null) {
                if (th instanceof IOException) {
                    throw ((IOException) th);
                }
                if (!(th instanceof InterruptedException)) {
                    throw new RuntimeException(th);
                }
                throw ((InterruptedException) th);
            }
        }
    }
}
