/*
 * Decompiled with CFR 0.152.
 */
package com.google.appengine.tools.mapreduce;

import com.google.appengine.api.files.AppEngineFile;
import com.google.appengine.labs.repackaged.com.google.common.base.Preconditions;
import com.google.appengine.labs.repackaged.com.google.common.collect.ImmutableList;
import com.google.appengine.tools.mapreduce.InputReader;
import com.google.appengine.tools.mapreduce.KeyValue;
import com.google.appengine.tools.mapreduce.MapReduceResult;
import com.google.appengine.tools.mapreduce.MapReduceSettings;
import com.google.appengine.tools.mapreduce.MapReduceSpecification;
import com.google.appengine.tools.mapreduce.MapperContext;
import com.google.appengine.tools.mapreduce.Output;
import com.google.appengine.tools.mapreduce.OutputWriter;
import com.google.appengine.tools.mapreduce.ReducerContext;
import com.google.appengine.tools.mapreduce.ReducerInput;
import com.google.appengine.tools.mapreduce.WorkerContext;
import com.google.appengine.tools.mapreduce.impl.AbstractWorkerController;
import com.google.appengine.tools.mapreduce.impl.CountersImpl;
import com.google.appengine.tools.mapreduce.impl.DeleteFilesJob;
import com.google.appengine.tools.mapreduce.impl.IntermediateOutput;
import com.google.appengine.tools.mapreduce.impl.MapShardTask;
import com.google.appengine.tools.mapreduce.impl.ReduceShardTask;
import com.google.appengine.tools.mapreduce.impl.ResultAndCounters;
import com.google.appengine.tools.mapreduce.impl.ShuffleJob;
import com.google.appengine.tools.mapreduce.impl.ShuffleResult;
import com.google.appengine.tools.mapreduce.impl.Util;
import com.google.appengine.tools.mapreduce.impl.WorkerResult;
import com.google.appengine.tools.mapreduce.impl.WorkerShardTask;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobServiceFactory;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobSettings;
import com.google.appengine.tools.pipeline.FutureValue;
import com.google.appengine.tools.pipeline.Job0;
import com.google.appengine.tools.pipeline.Job1;
import com.google.appengine.tools.pipeline.Job2;
import com.google.appengine.tools.pipeline.JobSetting;
import com.google.appengine.tools.pipeline.NoSuchObjectException;
import com.google.appengine.tools.pipeline.OrphanedObjectException;
import com.google.appengine.tools.pipeline.PipelineService;
import com.google.appengine.tools.pipeline.PipelineServiceFactory;
import com.google.appengine.tools.pipeline.PromisedValue;
import com.google.appengine.tools.pipeline.Value;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

public class MapReduceJob<I, K, V, O, R>
extends Job2<MapReduceResult<R>, MapReduceSpecification<I, K, V, O, R>, MapReduceSettings> {
    private static final long serialVersionUID = 723635736794527552L;
    private static final Logger log = Logger.getLogger(MapReduceJob.class.getName());

    public static <I, K, V, O, R> String start(MapReduceSpecification<I, K, V, O, R> specification, MapReduceSettings settings) {
        PipelineService pipelineService = PipelineServiceFactory.newPipelineService();
        return pipelineService.startNewPipeline(new MapReduceJob<I, K, V, O, R>(), specification, settings, Util.jobSettings(settings, new JobSetting[0]));
    }

    public String toString() {
        return this.getClass().getSimpleName() + "()";
    }

    private static ShardedJobSettings makeShardedJobSettings(MapReduceSettings mrSettings) {
        return new ShardedJobSettings().setControllerPath(mrSettings.getBaseUrl() + "controllerCallback").setWorkerPath(mrSettings.getBaseUrl() + "workerCallback").setControllerBackend(mrSettings.getBackend()).setWorkerBackend(mrSettings.getBackend()).setControllerQueueName(mrSettings.getControllerQueueName()).setWorkerQueueName(mrSettings.getWorkerQueueName());
    }

    private static <I, O, R, C extends WorkerContext> void startShardedJob(String shardedJobName, String shardedJobId, CountersImpl initialCounters, List<? extends InputReader<I>> readers, Output<O, R> output, List<? extends OutputWriter<O>> writers, TaskCreator<I, O, C> taskCreator, String resultPromiseHandle, MapReduceSettings settings) {
        Preconditions.checkArgument(readers.size() == writers.size(), "%s: %s readers, %s writers", shardedJobName, readers.size(), writers.size());
        ImmutableList.Builder initialTasks = ImmutableList.builder();
        for (int i = 0; i < readers.size(); ++i) {
            initialTasks.add(taskCreator.createTask(i, readers.size(), readers.get(i), writers.get(i)));
        }
        ShardedJobServiceFactory.getShardedJobService().startJob(shardedJobId, initialTasks.build(), new WorkerController(shardedJobName, initialCounters, output, resultPromiseHandle), MapReduceJob.makeShardedJobSettings(settings));
    }

    @Override
    public Value<MapReduceResult<R>> run(MapReduceSpecification<I, K, V, O, R> mrSpec, MapReduceSettings settings) {
        String mrJobId = this.getJobKey().getName();
        FutureValue<ResultAndCounters<List<AppEngineFile>>> mapResult = this.futureCall(new MapJob(mrJobId, mrSpec, settings), Util.jobSettings(settings, new JobSetting[0]));
        FutureValue shuffleResult = this.futureCall(new ShuffleJob<K, V, O>(mrJobId, mrSpec, settings), mapResult, Util.jobSettings(settings, new JobSetting[0]));
        this.futureCall(new IntermediateCleanupJob(mrJobId, settings), mapResult, Util.jobSettings(settings, MapReduceJob.waitFor(shuffleResult)));
        FutureValue<MapReduceResult<R>> reduceResult = this.futureCall(new ReduceJob(mrJobId, mrSpec, settings), mapResult, shuffleResult, Util.jobSettings(settings, new JobSetting[0]));
        this.futureCall(new FinalCleanupJob(mrJobId, settings), shuffleResult, Util.jobSettings(settings, MapReduceJob.waitFor(reduceResult)));
        return reduceResult;
    }

    private static class FinalCleanupJob<K, V, O>
    extends Job1<Void, ShuffleResult<K, V, O>> {
        private static final long serialVersionUID = 121832907494231026L;
        private final String mrJobId;
        private final MapReduceSettings settings;

        private FinalCleanupJob(String mrJobId, MapReduceSettings settings) {
            this.mrJobId = Preconditions.checkNotNull(mrJobId, "Null mrJobId");
            this.settings = Preconditions.checkNotNull(settings, "Null settings");
        }

        public String toString() {
            return this.getClass().getSimpleName() + "(" + this.mrJobId + ")";
        }

        @Override
        public Value<Void> run(ShuffleResult<K, V, O> shuffleResult) {
            this.futureCall(new DeleteFilesJob("" + this), FinalCleanupJob.immediate(shuffleResult.getReducerInputFiles()), Util.jobSettings(this.settings, new JobSetting[0]));
            return FinalCleanupJob.immediate(null);
        }
    }

    private static class IntermediateCleanupJob
    extends Job1<Void, ResultAndCounters<List<AppEngineFile>>> {
        private static final long serialVersionUID = 354137030664235135L;
        private final String mrJobId;
        private final MapReduceSettings settings;

        private IntermediateCleanupJob(String mrJobId, MapReduceSettings settings) {
            this.mrJobId = Preconditions.checkNotNull(mrJobId, "Null mrJobId");
            this.settings = Preconditions.checkNotNull(settings, "Null settings");
        }

        public String toString() {
            return this.getClass().getSimpleName() + "(" + this.mrJobId + ")";
        }

        @Override
        public Value<Void> run(ResultAndCounters<List<AppEngineFile>> mapResult) {
            this.futureCall(new DeleteFilesJob("" + this), IntermediateCleanupJob.immediate(mapResult.getOutputResult()), Util.jobSettings(this.settings, new JobSetting[0]));
            return IntermediateCleanupJob.immediate(null);
        }
    }

    private static class ReduceJob<K, V, O, R>
    extends Job2<MapReduceResult<R>, ResultAndCounters<List<AppEngineFile>>, ShuffleResult<K, V, O>> {
        private static final long serialVersionUID = 590237832617368335L;
        private final String mrJobId;
        private final MapReduceSpecification<?, K, V, O, R> mrSpec;
        private final MapReduceSettings settings;

        private ReduceJob(String mrJobId, MapReduceSpecification<?, K, V, O, R> mrSpec, MapReduceSettings settings) {
            this.mrJobId = Preconditions.checkNotNull(mrJobId, "Null mrJobId");
            this.mrSpec = Preconditions.checkNotNull(mrSpec, "Null mrSpec");
            this.settings = Preconditions.checkNotNull(settings, "Null settings");
        }

        public String toString() {
            return this.getClass().getSimpleName() + "(" + this.mrJobId + ")";
        }

        @Override
        public Value<MapReduceResult<R>> run(ResultAndCounters<List<AppEngineFile>> mapResult, ShuffleResult<K, V, O> shuffleResult) {
            PromisedValue<MapReduceResult<R>> result = this.newPromise(MapReduceResult.class);
            String shardedJobId = this.mrJobId + "-reduce";
            MapReduceJob.startShardedJob(this.mrSpec.getJobName() + " (reduce phase)", shardedJobId, mapResult.getCounters(), shuffleResult.getReducerReaders(), this.mrSpec.getOutput(), shuffleResult.getReducerWriters(), new TaskCreator<KeyValue<K, ReducerInput<V>>, O, ReducerContext<O>>(){

                @Override
                public WorkerShardTask<KeyValue<K, ReducerInput<V>>, O, ReducerContext<O>> createTask(int shard, int shardCount, InputReader<KeyValue<K, ReducerInput<V>>> reader, OutputWriter<O> writer) {
                    return new ReduceShardTask(ReduceJob.this.mrJobId, shard, shardCount, reader, ReduceJob.this.mrSpec.getReducer(), writer, ReduceJob.this.settings.getMillisPerSlice());
                }
            }, result.getHandle(), this.settings);
            this.setStatusConsoleUrl(this.settings.getBaseUrl() + "detail?mapreduce_id=" + shardedJobId);
            return result;
        }
    }

    private static class MapJob<I, K, V>
    extends Job0<ResultAndCounters<List<AppEngineFile>>> {
        private static final long serialVersionUID = 274712180795282822L;
        private final String mrJobId;
        private final MapReduceSpecification<I, K, V, ?, ?> mrSpec;
        private final MapReduceSettings settings;

        private MapJob(String mrJobId, MapReduceSpecification<I, K, V, ?, ?> mrSpec, MapReduceSettings settings) {
            this.mrJobId = Preconditions.checkNotNull(mrJobId, "Null mrJobId");
            this.mrSpec = Preconditions.checkNotNull(mrSpec, "Null mrSpec");
            this.settings = Preconditions.checkNotNull(settings, "Null settings");
        }

        public String toString() {
            return this.getClass().getSimpleName() + "(" + this.mrJobId + ")";
        }

        @Override
        public Value<ResultAndCounters<List<AppEngineFile>>> run() {
            PromisedValue<ResultAndCounters<List<AppEngineFile>>> result = this.newPromise(ResultAndCounters.class);
            String shardedJobId = this.mrJobId + "-map";
            List<InputReader<I>> readers = Util.createReaders(this.mrSpec.getInput());
            IntermediateOutput<K, V> output = new IntermediateOutput<K, V>(this.mrJobId, readers.size(), this.mrSpec.getIntermediateKeyMarshaller(), this.mrSpec.getIntermediateValueMarshaller());
            MapReduceJob.startShardedJob(this.mrSpec.getJobName() + " (map phase)", shardedJobId, new CountersImpl(), readers, output, Util.createWriters(output), new TaskCreator<I, KeyValue<K, V>, MapperContext<K, V>>(){

                @Override
                public WorkerShardTask<I, KeyValue<K, V>, MapperContext<K, V>> createTask(int shard, int shardCount, InputReader<I> reader, OutputWriter<KeyValue<K, V>> writer) {
                    return new MapShardTask(MapJob.this.mrJobId, shard, shardCount, reader, MapJob.this.mrSpec.getMapper(), writer, MapJob.this.settings.getMillisPerSlice());
                }
            }, result.getHandle(), this.settings);
            this.setStatusConsoleUrl(this.settings.getBaseUrl() + "detail?mapreduce_id=" + shardedJobId);
            return result;
        }
    }

    private static interface TaskCreator<I, O, C extends WorkerContext> {
        public WorkerShardTask<I, O, C> createTask(int var1, int var2, InputReader<I> var3, OutputWriter<O> var4);
    }

    private static class WorkerController<I, O, R, C extends WorkerContext>
    extends AbstractWorkerController<I, O, C> {
        private static final long serialVersionUID = 931651840864967980L;
        private final CountersImpl initialCounters;
        private final Output<O, R> output;
        private final String resultPromiseHandle;

        WorkerController(String shardedJobName, CountersImpl initialCounters, Output<O, R> output, String resultPromiseHandle) {
            super(shardedJobName);
            this.initialCounters = Preconditions.checkNotNull(initialCounters, "Null initialCounters");
            this.output = Preconditions.checkNotNull(output, "Null output");
            this.resultPromiseHandle = Preconditions.checkNotNull(resultPromiseHandle, "Null resultPromiseHandle");
        }

        @Override
        public void completed(WorkerResult<O> finalCombinedResult) {
            R outputResult;
            Map<Integer, OutputWriter<O>> closedWriterMap = finalCombinedResult.getClosedWriters();
            ImmutableList.Builder closedWriters = ImmutableList.builder();
            for (int i = 0; i < closedWriterMap.size(); ++i) {
                Preconditions.checkState(closedWriterMap.containsKey(i), "%s: Missing closed writer %s: %s", this, i, closedWriterMap);
                closedWriters.add(closedWriterMap.get(i));
            }
            try {
                outputResult = this.output.finish((List<OutputWriter<O>>)((Object)closedWriters.build()));
            }
            catch (IOException e) {
                throw new RuntimeException(this.output + ".finish() threw IOException");
            }
            CountersImpl totalCounters = new CountersImpl();
            totalCounters.addAll(this.initialCounters);
            totalCounters.addAll(finalCombinedResult.getCounters());
            ResultAndCounters<R> result = new ResultAndCounters<R>(outputResult, totalCounters);
            PipelineServiceFactory.newPipelineService().startNewPipeline(new FillPromiseJob(), this.resultPromiseHandle, result, new JobSetting[0]);
        }
    }

    private static class FillPromiseJob
    extends Job2<Void, String, Object> {
        private static final long serialVersionUID = 850701484460334898L;

        FillPromiseJob() {
        }

        @Override
        public Value<Void> run(String promiseHandle, Object value) {
            try {
                PipelineServiceFactory.newPipelineService().submitPromisedValue(promiseHandle, value);
            }
            catch (OrphanedObjectException e) {
                throw new RuntimeException(promiseHandle + ": Object orphaned, can't submit result " + value, e);
            }
            catch (NoSuchObjectException e) {
                throw new RuntimeException(promiseHandle + ": Handle not found, can't submit result " + value, e);
            }
            return FillPromiseJob.immediate(null);
        }
    }
}

