/*
 * Decompiled with CFR 0.152.
 */
package com.google.appengine.tools.mapreduce.impl;

import com.google.appengine.api.files.AppEngineFile;
import com.google.appengine.api.files.FileService;
import com.google.appengine.api.files.FileServiceFactory;
import com.google.appengine.labs.repackaged.com.google.common.base.Preconditions;
import com.google.appengine.labs.repackaged.com.google.common.collect.ImmutableCollection;
import com.google.appengine.labs.repackaged.com.google.common.collect.ImmutableList;
import com.google.appengine.tools.mapreduce.MapReduceSettings;
import com.google.appengine.tools.mapreduce.MapReduceSpecification;
import com.google.appengine.tools.mapreduce.OutputWriter;
import com.google.appengine.tools.mapreduce.impl.InMemoryShuffleJob;
import com.google.appengine.tools.mapreduce.impl.IntermediateInput;
import com.google.appengine.tools.mapreduce.impl.ResultAndCounters;
import com.google.appengine.tools.mapreduce.impl.ShuffleResult;
import com.google.appengine.tools.mapreduce.impl.ShuffleService;
import com.google.appengine.tools.mapreduce.impl.Util;
import com.google.appengine.tools.mapreduce.impl.util.FileUtil;
import com.google.appengine.tools.mapreduce.inputs.NoInput;
import com.google.appengine.tools.pipeline.Job1;
import com.google.appengine.tools.pipeline.Job2;
import com.google.appengine.tools.pipeline.JobSetting;
import com.google.appengine.tools.pipeline.PromisedValue;
import com.google.appengine.tools.pipeline.Value;
import java.io.IOException;
import java.util.List;
import java.util.logging.Logger;

public class ShuffleJob<K, V, O>
extends Job1<ShuffleResult<K, V, O>, ResultAndCounters<List<AppEngineFile>>> {
    private static final long serialVersionUID = 394826723385510650L;
    private static final Logger log = Logger.getLogger(ShuffleJob.class.getName());
    private static final FileService FILE_SERVICE = FileServiceFactory.getFileService();
    private final String mrJobId;
    private final MapReduceSpecification<?, K, V, O, ?> mrSpec;
    private final MapReduceSettings settings;

    public ShuffleJob(String mrJobId, MapReduceSpecification<?, K, V, O, ?> mrSpec, MapReduceSettings settings) {
        this.mrJobId = Preconditions.checkNotNull(mrJobId, "Null mrJobId");
        this.mrSpec = Preconditions.checkNotNull(mrSpec, "Null mrSpec");
        this.settings = Preconditions.checkNotNull(settings, "Null settings");
    }

    public String toString() {
        return this.getClass().getSimpleName() + "(" + this.mrJobId + ")";
    }

    @Override
    public Value<ShuffleResult<K, V, O>> run(ResultAndCounters<List<AppEngineFile>> mapResult) {
        List<AppEngineFile> mapOutputs = mapResult.getOutputResult();
        List<OutputWriter<O>> reducerWriters = Util.createWriters(this.mrSpec.getOutput());
        int reduceShardCount = reducerWriters.size();
        if (mapOutputs.isEmpty()) {
            return ShuffleJob.immediate(new ShuffleResult(ImmutableList.<AppEngineFile>of(), reducerWriters, Util.createReaders(NoInput.create(reduceShardCount))));
        }
        ImmutableList.Builder b = ImmutableList.builder();
        for (int i = 0; i < reduceShardCount; ++i) {
            try {
                b.add(FILE_SERVICE.createNewBlobFile("application/vnd.appengine.mapreduce.reduce-input.records", this.mrJobId + ": reduce input, shard " + i));
                continue;
            }
            catch (IOException e) {
                throw new RuntimeException(this + ": IOException creating reduce input file " + i, e);
            }
        }
        ImmutableCollection reduceInputs = b.build();
        ShuffleResult shuffleResult = new ShuffleResult((List<AppEngineFile>)((Object)reduceInputs), reducerWriters, Util.createReaders(new IntermediateInput<K, V>((List<AppEngineFile>)((Object)reduceInputs), this.mrSpec.getIntermediateKeyMarshaller(), this.mrSpec.getIntermediateValueMarshaller())));
        if (new ShuffleService().isAvailable()) {
            PromisedValue<String> shuffleError = this.newPromise(String.class);
            new ShuffleService().shuffle("Shuffle-for-MR-" + this.mrJobId, mapOutputs, (List<AppEngineFile>)((Object)reduceInputs), new ShuffleService.ShuffleCallback(this.settings.getBaseUrl() + "shuffleCallback" + "?promiseHandle=" + shuffleError.getHandle()).setMethod("GET").setQueue(this.settings.getControllerQueueName()));
            return this.futureCall(new WaitForShuffleJob(this.mrJobId), ShuffleJob.immediate(shuffleResult), shuffleError, Util.jobSettings(this.settings, new JobSetting[0]));
        }
        return this.futureCall(new InMemoryShuffleJob<K, V, O>(this.mrJobId, this.mrSpec), ShuffleJob.immediate(mapOutputs), ShuffleJob.immediate(reduceInputs), ShuffleJob.immediate(shuffleResult), Util.jobSettings(this.settings, new JobSetting[0]));
    }

    private static class WaitForShuffleJob<K, V, O>
    extends Job2<ShuffleResult<K, V, O>, ShuffleResult<K, V, O>, String> {
        private static final long serialVersionUID = 308217691163421115L;
        private final String mrJobId;

        private WaitForShuffleJob(String mrJobId) {
            this.mrJobId = Preconditions.checkNotNull(mrJobId, "Null mrJobId");
        }

        public String toString() {
            return this.getClass().getSimpleName() + "(" + this.mrJobId + ")";
        }

        @Override
        public Value<ShuffleResult<K, V, O>> run(ShuffleResult<K, V, O> shuffleResult, String shuffleError) {
            if (shuffleError != null) {
                throw new RuntimeException("Shuffler signalled an error: " + shuffleError);
            }
            for (AppEngineFile file : shuffleResult.getReducerInputFiles()) {
                FileUtil.ensureFinalized(file);
            }
            return WaitForShuffleJob.immediate(shuffleResult);
        }
    }
}

