/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.mapreduce.processor.reduce;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;

public class ReduceProcessor
extends MRTask
implements LogicalIOProcessor {
    private static final Log LOG = LogFactory.getLog(ReduceProcessor.class);
    private Counters.Counter reduceInputKeyCounter;
    private Counters.Counter reduceInputValueCounter;

    public ReduceProcessor() {
        super(false);
    }

    @Override
    public void initialize(TezProcessorContext processorContext) throws IOException {
        try {
            super.initialize(processorContext);
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    public void handleEvents(List<Event> processorEvents) {
    }

    public void close() throws IOException {
    }

    public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
        LOG.info((Object)("Running reduce: " + this.processorContext.getUniqueIdentifier()));
        if (outputs.size() <= 0 || outputs.size() > 1) {
            throw new IOException("Invalid number of outputs, outputCount=" + outputs.size());
        }
        if (inputs.size() <= 0 || inputs.size() > 1) {
            throw new IOException("Invalid number of inputs, inputCount=" + inputs.size());
        }
        LogicalInput in = inputs.values().iterator().next();
        in.start();
        LinkedList<LogicalInput> pendingInputs = new LinkedList<LogicalInput>();
        pendingInputs.add(in);
        this.processorContext.waitForAllInputsReady(pendingInputs);
        LOG.info((Object)"Input is ready for consumption. Starting Output");
        LogicalOutput out = outputs.values().iterator().next();
        out.start();
        this.initTask(out);
        this.statusUpdate();
        Class keyClass = ConfigUtils.getIntermediateInputKeyClass((Configuration)this.jobConf);
        Class valueClass = ConfigUtils.getIntermediateInputValueClass((Configuration)this.jobConf);
        LOG.info((Object)("Using keyClass: " + keyClass));
        LOG.info((Object)("Using valueClass: " + valueClass));
        RawComparator comparator = ConfigUtils.getInputKeySecondaryGroupingComparator((Configuration)this.jobConf);
        LOG.info((Object)("Using comparator: " + comparator));
        this.reduceInputKeyCounter = this.mrReporter.getCounter((Enum<?>)TaskCounter.REDUCE_INPUT_GROUPS);
        this.reduceInputValueCounter = this.mrReporter.getCounter((Enum<?>)TaskCounter.REDUCE_INPUT_RECORDS);
        if (!(in instanceof ShuffledMergedInputLegacy)) {
            throw new IOException("Illegal input to reduce: " + in.getClass());
        }
        ShuffledMergedInputLegacy shuffleInput = (ShuffledMergedInputLegacy)in;
        KeyValuesReader kvReader = shuffleInput.getReader();
        KeyValueWriter kvWriter = null;
        if (out instanceof MROutputLegacy) {
            kvWriter = ((MROutputLegacy)out).getWriter();
        } else if (out instanceof OnFileSortedOutput) {
            kvWriter = ((OnFileSortedOutput)out).getWriter();
        } else {
            throw new IOException("Illegal output to reduce: " + in.getClass());
        }
        if (this.useNewApi) {
            try {
                this.runNewReducer(this.jobConf, this.mrReporter, shuffleInput, comparator, keyClass, valueClass, kvWriter);
            }
            catch (ClassNotFoundException cnfe) {
                throw new IOException(cnfe);
            }
        } else {
            this.runOldReducer(this.jobConf, this.mrReporter, kvReader, comparator, keyClass, valueClass, kvWriter);
        }
        this.done();
    }

    void runOldReducer(JobConf job, MRTaskReporter reporter, KeyValuesReader input, RawComparator comparator, Class keyClass, Class valueClass, final KeyValueWriter output) throws IOException, InterruptedException {
        Reducer reducer = (Reducer)ReflectionUtils.newInstance((Class)job.getReducerClass(), (Configuration)job);
        OutputCollector collector = new OutputCollector(){

            public void collect(Object key, Object value) throws IOException {
                output.write(key, value);
            }
        };
        try {
            ReduceValuesIterator values = new ReduceValuesIterator(input, (Progressable)reporter, this.reduceInputValueCounter);
            values.informReduceProgress();
            while (values.more()) {
                this.reduceInputKeyCounter.increment(1L);
                reducer.reduce(values.getKey(), values, collector, (Reporter)reporter);
                values.informReduceProgress();
            }
            reporter.setProgress(1.0f);
            reducer.close();
        }
        catch (IOException ioe) {
            try {
                reducer.close();
            }
            catch (IOException ignored) {
                // empty catch block
            }
            throw ioe;
        }
    }

    void runNewReducer(JobConf job, final MRTaskReporter reporter, ShuffledMergedInputLegacy input, RawComparator comparator, Class keyClass, Class valueClass, final KeyValueWriter out) throws IOException, InterruptedException, ClassNotFoundException {
        TaskAttemptContext taskContext = this.getTaskAttemptContext();
        org.apache.hadoop.mapreduce.Reducer reducer = (org.apache.hadoop.mapreduce.Reducer)ReflectionUtils.newInstance((Class)taskContext.getReducerClass(), (Configuration)job);
        final TezRawKeyValueIterator rawIter = input.getIterator();
        TezRawKeyValueIterator rIter = new TezRawKeyValueIterator(){

            public void close() throws IOException {
                rawIter.close();
            }

            public DataInputBuffer getKey() throws IOException {
                return rawIter.getKey();
            }

            public Progress getProgress() {
                return rawIter.getProgress();
            }

            public DataInputBuffer getValue() throws IOException {
                return rawIter.getValue();
            }

            public boolean next() throws IOException {
                boolean ret = rawIter.next();
                reporter.setProgress(rawIter.getProgress().getProgress());
                return ret;
            }
        };
        RecordWriter trackedRW = new RecordWriter(){

            public void write(Object key, Object value) throws IOException, InterruptedException {
                out.write(key, value);
            }

            public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            }
        };
        Reducer.Context reducerContext = ReduceProcessor.createReduceContext(reducer, (Configuration)job, this.taskAttemptId, rIter, (Counter)this.reduceInputKeyCounter, (Counter)this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass);
        reducer.run(reducerContext);
        reporter.setProgress(1.0f);
        trackedRW.close((TaskAttemptContext)reducerContext);
    }

    @Override
    public void localizeConfiguration(JobConf jobConf) throws IOException, InterruptedException {
        super.localizeConfiguration(jobConf);
        jobConf.setBoolean("mapreduce.task.ismap", false);
    }

    private static class ReduceValuesIterator<KEY, VALUE>
    implements Iterator<VALUE> {
        private Counters.Counter reduceInputValueCounter;
        private KeyValuesReader in;
        private Progressable reporter;
        private Object currentKey;
        private Iterator<Object> currentValues;

        public ReduceValuesIterator(KeyValuesReader in, Progressable reporter, Counters.Counter reduceInputValueCounter) throws IOException {
            this.reduceInputValueCounter = reduceInputValueCounter;
            this.in = in;
            this.reporter = reporter;
        }

        public boolean more() throws IOException {
            boolean more = this.in.next();
            if (more) {
                this.currentKey = this.in.getCurrentKey();
                this.currentValues = this.in.getCurrentValues().iterator();
            } else {
                this.currentKey = null;
                this.currentValues = null;
            }
            return more;
        }

        public KEY getKey() throws IOException {
            return (KEY)this.currentKey;
        }

        public void informReduceProgress() {
            this.reporter.progress();
        }

        @Override
        public boolean hasNext() {
            return this.currentValues.hasNext();
        }

        @Override
        public VALUE next() {
            this.reduceInputValueCounter.increment(1L);
            return (VALUE)this.currentValues.next();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }
}

