/*
 * Decompiled with CFR 0.152.
 */
package com.google.appengine.tools.mapreduce.impl;

import com.google.appengine.labs.repackaged.com.google.common.base.Preconditions;
import com.google.appengine.labs.repackaged.com.google.common.collect.ImmutableList;
import com.google.appengine.tools.mapreduce.InputReader;
import com.google.appengine.tools.mapreduce.KeyValue;
import com.google.appengine.tools.mapreduce.MapReduceResult;
import com.google.appengine.tools.mapreduce.MapReduceSpecification;
import com.google.appengine.tools.mapreduce.MapperContext;
import com.google.appengine.tools.mapreduce.Output;
import com.google.appengine.tools.mapreduce.OutputWriter;
import com.google.appengine.tools.mapreduce.ReducerContext;
import com.google.appengine.tools.mapreduce.ReducerInput;
import com.google.appengine.tools.mapreduce.impl.AbstractWorkerController;
import com.google.appengine.tools.mapreduce.impl.CountersImpl;
import com.google.appengine.tools.mapreduce.impl.MapShardTask;
import com.google.appengine.tools.mapreduce.impl.ReduceShardTask;
import com.google.appengine.tools.mapreduce.impl.ReducerInputs;
import com.google.appengine.tools.mapreduce.impl.ResultAndCounters;
import com.google.appengine.tools.mapreduce.impl.Shuffling;
import com.google.appengine.tools.mapreduce.impl.WorkerResult;
import com.google.appengine.tools.mapreduce.impl.shardedjob.InProcessShardedJobRunner;
import com.google.appengine.tools.mapreduce.outputs.InMemoryOutput;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.logging.Logger;

public class InProcessMapReduce {
    private static final Logger log = Logger.getLogger(InProcessMapReduce.class.getName());
    private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

    private static String abbrev(List<?> l) {
        if (l.size() <= 5) {
            return "" + l;
        }
        StringBuilder b = new StringBuilder("[");
        for (int i = 0; i < 5; ++i) {
            b.append(l.get(i) + ",");
        }
        return b + "...]";
    }

    public static <I, K, V, O, R> MapReduceResult<R> runMapReduce(MapReduceSpecification<I, K, V, O, R> mrSpec) throws IOException {
        String mapReduceId = "in-process-mr-" + DATE_FORMAT.format(new Date()) + "-" + new Random().nextInt(1000000);
        Impl<I, K, V, O, R> mapReduce = new Impl<I, K, V, O, R>(mapReduceId, mrSpec);
        log.info(mapReduce + " started");
        List<InputReader<I>> inputs = mrSpec.getInput().createReaders();
        MapResult mapResult = ((Impl)mapReduce).map(inputs);
        List<OutputWriter<O>> outputs = mrSpec.getOutput().createWriters();
        List reducerInputs = ((Impl)mapReduce).shuffle(mapResult.getMapShardOutputs(), outputs.size());
        MapReduceResult result = ((Impl)mapReduce).reduce(reducerInputs, mrSpec.getOutput(), outputs, mapResult.getCounters());
        log.info(mapReduce + " finished");
        return result;
    }

    private static class Impl<I, K, V, O, R> {
        private final String id;
        private final MapReduceSpecification<I, K, V, O, R> mrSpec;

        public Impl(String id, MapReduceSpecification<I, K, V, O, R> mrSpec) {
            this.id = Preconditions.checkNotNull(id, "Null id");
            this.mrSpec = Preconditions.checkNotNull(mrSpec, "Null mrSpec");
        }

        public String toString() {
            return "InProcessMapReduce.Impl(" + this.id + ")";
        }

        private MapResult<K, V> map(List<? extends InputReader<I>> inputs) throws IOException {
            log.info("Map phase started");
            InMemoryOutput output = InMemoryOutput.create(inputs.size());
            List outputs = output.createWriters();
            ImmutableList.Builder tasks = ImmutableList.builder();
            for (int shard = 0; shard < inputs.size(); ++shard) {
                MapShardTask<I, K, V> task = new MapShardTask<I, K, V>(this.id, shard, inputs.size(), inputs.get(shard), this.mrSpec.getMapper(), outputs.get(shard), Long.MAX_VALUE);
                tasks.add(task);
            }
            final CountersImpl[] counters = new CountersImpl[1];
            WorkerResult result = (WorkerResult)InProcessShardedJobRunner.runJob(tasks.build(), new AbstractWorkerController<I, KeyValue<K, V>, MapperContext<K, V>>(this.mrSpec.getJobName() + "-map"){
                private static final long serialVersionUID = 661198005749484951L;

                @Override
                public void completed(WorkerResult<KeyValue<K, V>> finalCombinedResult) {
                    counters[0] = finalCombinedResult.getCounters();
                }
            });
            log.info("Map phase completed");
            Object pairs = output.finish(outputs);
            return new MapResult(pairs, result.getCounters());
        }

        private List<List<KeyValue<K, List<V>>>> shuffle(List<List<KeyValue<K, V>>> mapperOutputs, int reduceShardCount) {
            log.info("Shuffle phase started");
            List<List<KeyValue<K, List<V>>>> out = Shuffling.shuffle(mapperOutputs, this.mrSpec.getIntermediateKeyMarshaller(), reduceShardCount);
            log.info("Shuffle phase completed");
            return out;
        }

        InputReader<KeyValue<K, ReducerInput<V>>> getReducerInputReader(final List<KeyValue<K, List<V>>> data) {
            return new InputReader<KeyValue<K, ReducerInput<V>>>(){
                private static final long serialVersionUID = 310424169122893265L;
                int i = 0;

                @Override
                public Double getProgress() {
                    return null;
                }

                @Override
                public KeyValue<K, ReducerInput<V>> next() {
                    if (this.i >= data.size()) {
                        throw new NoSuchElementException();
                    }
                    KeyValue result = KeyValue.of(((KeyValue)data.get(this.i)).getKey(), ReducerInputs.fromIterable((Iterable)((KeyValue)data.get(this.i)).getValue()));
                    ++this.i;
                    return result;
                }
            };
        }

        private MapReduceResult<R> reduce(List<List<KeyValue<K, List<V>>>> inputs, Output<O, R> output, List<? extends OutputWriter<O>> outputs, CountersImpl mapCounters) throws IOException {
            Preconditions.checkArgument(inputs.size() == outputs.size(), "%s reduce inputs, %s outputs", inputs.size(), outputs.size());
            log.info("Reduce phase started");
            ImmutableList.Builder tasks = ImmutableList.builder();
            for (int shard = 0; shard < outputs.size(); ++shard) {
                tasks.add(new ReduceShardTask<K, V, O>(this.id, shard, outputs.size(), this.getReducerInputReader(inputs.get(shard)), this.mrSpec.getReducer(), outputs.get(shard), Long.MAX_VALUE));
            }
            final CountersImpl[] counters = new CountersImpl[1];
            WorkerResult result = (WorkerResult)InProcessShardedJobRunner.runJob(tasks.build(), new AbstractWorkerController<KeyValue<K, ReducerInput<V>>, O, ReducerContext<O>>(this.mrSpec.getJobName() + "-reduce"){
                private static final long serialVersionUID = 575338448598450119L;

                @Override
                public void completed(WorkerResult<O> result) {
                    counters[0] = result.getCounters();
                }
            });
            log.info("Reduce phase completed, reduce counters=" + counters[0]);
            counters[0].addAll(mapCounters);
            log.info("combined counters=" + counters[0]);
            return new ResultAndCounters<R>(output.finish(outputs), counters[0]);
        }
    }

    private static class MapResult<K, V> {
        private final List<List<KeyValue<K, V>>> mapShardOutputs;
        private final CountersImpl counters;

        public MapResult(List<List<KeyValue<K, V>>> mapShardOutputs, CountersImpl counters) {
            this.mapShardOutputs = Preconditions.checkNotNull(mapShardOutputs, "Null mapShardOutputs");
            this.counters = Preconditions.checkNotNull(counters, "Null counters");
        }

        public List<List<KeyValue<K, V>>> getMapShardOutputs() {
            return this.mapShardOutputs;
        }

        public CountersImpl getCounters() {
            return this.counters;
        }

        public String toString() {
            return this.getClass().getSimpleName() + "(" + this.mapShardOutputs + ", " + this.counters + ")";
        }
    }
}

