package com.google.appengine.tools.mapreduce.impl;

import com.google.appengine.api.files.AppEngineFile;
import com.google.appengine.api.files.FileService;
import com.google.appengine.api.files.FileServiceFactory;
import com.google.appengine.api.files.FileServicePb;
import com.google.appengine.api.files.RecordWriteChannel;
import com.google.appengine.labs.repackaged.com.google.common.base.Preconditions;
import com.google.appengine.labs.repackaged.com.google.common.collect.ImmutableList;
import com.google.appengine.repackaged.com.google.protobuf.ByteString;
import com.google.appengine.tools.mapreduce.KeyValue;
import com.google.appengine.tools.mapreduce.MapReduceSpecification;
import com.google.appengine.tools.mapreduce.impl.util.FileUtil;
import com.google.appengine.tools.mapreduce.impl.util.SerializationUtil;
import com.google.appengine.tools.pipeline.Job3;
import com.google.appengine.tools.pipeline.Value;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Logger;

/* loaded from: input_file:com/google/appengine/tools/mapreduce/impl/InMemoryShuffleJob.class */
public class InMemoryShuffleJob<K, V, O> extends Job3<ShuffleResult<K, V, O>, List<AppEngineFile>, List<AppEngineFile>, ShuffleResult<K, V, O>> {
    private static final long serialVersionUID = 176754702347404887L;
    private static final Logger log = Logger.getLogger(InMemoryShuffleJob.class.getName());
    private static final FileService FILE_SERVICE = FileServiceFactory.getFileService();
    private final String mrJobId;
    private final MapReduceSpecification<?, K, V, O, ?> mrSpec;

    /* JADX INFO: Access modifiers changed from: package-private */
    public InMemoryShuffleJob(String str, MapReduceSpecification<?, K, V, O, ?> mapReduceSpecification) {
        this.mrJobId = (String) Preconditions.checkNotNull(str, "Null mrJobId");
        this.mrSpec = (MapReduceSpecification) Preconditions.checkNotNull(mapReduceSpecification, "Null mrSpec");
    }

    private List<KeyValue<K, V>> readInput(AppEngineFile appEngineFile) throws IOException {
        ImmutableList.Builder builder = ImmutableList.builder();
        NonSpammingRecordReadChannel nonSpammingRecordReadChannel = new NonSpammingRecordReadChannel(FILE_SERVICE.openReadChannel(appEngineFile, false));
        while (true) {
            ByteBuffer readRecord = nonSpammingRecordReadChannel.readRecord();
            if (readRecord == null) {
                return builder.build();
            }
            FileServicePb.KeyValue parseFrom = FileServicePb.KeyValue.parseFrom(SerializationUtil.getBytes(readRecord));
            builder.add((ImmutableList.Builder) KeyValue.of(this.mrSpec.getIntermediateKeyMarshaller().fromBytes(parseFrom.getKey().asReadOnlyByteBuffer()), this.mrSpec.getIntermediateValueMarshaller().fromBytes(parseFrom.getValue().asReadOnlyByteBuffer())));
        }
    }

    private List<List<KeyValue<K, V>>> readInputs(List<AppEngineFile> list) throws IOException {
        ImmutableList.Builder builder = ImmutableList.builder();
        Iterator<AppEngineFile> it = list.iterator();
        while (it.hasNext()) {
            builder.add((ImmutableList.Builder) readInput(it.next()));
        }
        return builder.build();
    }

    private void writeOutput(AppEngineFile appEngineFile, List<KeyValue<K, List<V>>> list) throws IOException {
        RecordWriteChannel openRecordWriteChannel = FILE_SERVICE.openRecordWriteChannel(appEngineFile, false);
        for (KeyValue<K, List<V>> keyValue : list) {
            FileServicePb.KeyValues.Builder newBuilder = FileServicePb.KeyValues.newBuilder();
            newBuilder.setKey(ByteString.copyFrom(this.mrSpec.getIntermediateKeyMarshaller().toBytes(keyValue.getKey())));
            Iterator<V> it = keyValue.getValue().iterator();
            while (it.hasNext()) {
                newBuilder.addValue(ByteString.copyFrom(this.mrSpec.getIntermediateValueMarshaller().toBytes(it.next())));
            }
            openRecordWriteChannel.write(ByteBuffer.wrap(newBuilder.build().toByteArray()), (String) null);
        }
    }

    private void writeOutputs(List<AppEngineFile> list, List<List<KeyValue<K, List<V>>>> list2) throws IOException {
        Preconditions.checkArgument(list.size() == list2.size(), "%s != %s", Integer.valueOf(list.size()), Integer.valueOf(list2.size()));
        for (int i = 0; i < list.size(); i++) {
            writeOutput(list.get(i), list2.get(i));
        }
    }

    @Override // com.google.appengine.tools.pipeline.Job3
    public Value<ShuffleResult<K, V, O>> run(List<AppEngineFile> list, List<AppEngineFile> list2, ShuffleResult<K, V, O> shuffleResult) {
        try {
            writeOutputs(list2, Shuffling.shuffle(readInputs(list), this.mrSpec.getIntermediateKeyMarshaller(), list2.size()));
            Iterator<AppEngineFile> it = shuffleResult.getReducerInputFiles().iterator();
            while (it.hasNext()) {
                FileUtil.ensureFinalized(it.next());
            }
            return immediate(shuffleResult);
        } catch (IOException e) {
            throw new RuntimeException(this + ": IOException while shuffling", e);
        }
    }
}
