/*
 * Decompiled with CFR 0.152.
 */
package com.google.appengine.tools.mapreduce.impl;

import com.google.appengine.labs.repackaged.com.google.common.base.Preconditions;
import com.google.appengine.labs.repackaged.com.google.common.base.Stopwatch;
import com.google.appengine.labs.repackaged.com.google.common.collect.ImmutableMap;
import com.google.appengine.tools.mapreduce.Counters;
import com.google.appengine.tools.mapreduce.EndSliceEventSource;
import com.google.appengine.tools.mapreduce.InputReader;
import com.google.appengine.tools.mapreduce.OutputWriter;
import com.google.appengine.tools.mapreduce.Worker;
import com.google.appengine.tools.mapreduce.WorkerContext;
import com.google.appengine.tools.mapreduce.impl.CountersImpl;
import com.google.appengine.tools.mapreduce.impl.WorkerResult;
import com.google.appengine.tools.mapreduce.impl.WorkerShardState;
import com.google.appengine.tools.mapreduce.impl.shardedjob.IncrementalTask;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.logging.Level;
import java.util.logging.Logger;

public abstract class WorkerShardTask<I, O, C extends WorkerContext>
implements IncrementalTask<WorkerShardTask<I, O, C>, WorkerResult<O>> {
    private static final Logger log = Logger.getLogger(WorkerShardTask.class.getName());
    private static final long serialVersionUID = 992552712402490981L;
    private final String mrJobId;
    private final int shardNumber;
    private final int shardCount;
    private final InputReader<I> in;
    private final Worker<C> worker;
    private final OutputWriter<O> out;
    private final long millisPerSlice;
    private final String workerCallsCounterName;
    private final String workerMillisCounterName;
    private boolean isFirstSlice = true;

    protected WorkerShardTask(String mrJobId, int shardNumber, int shardCount, InputReader<I> in, Worker<C> worker, OutputWriter<O> out, long millisPerSlice, String workerCallsCounterName, String workerMillisCounterName) {
        this.mrJobId = Preconditions.checkNotNull(mrJobId, "Null mrJobId");
        this.shardNumber = shardNumber;
        this.shardCount = shardCount;
        this.in = Preconditions.checkNotNull(in, "Null in");
        this.worker = Preconditions.checkNotNull(worker, "Null worker");
        this.out = Preconditions.checkNotNull(out, "Null out");
        this.millisPerSlice = millisPerSlice;
        this.workerCallsCounterName = Preconditions.checkNotNull(workerCallsCounterName, "Null workerCallsCounterName");
        this.workerMillisCounterName = Preconditions.checkNotNull(workerMillisCounterName, "Null workerMillisCounterName");
    }

    public String toString() {
        return this.getClass().getSimpleName() + "(" + this.mrJobId + ", " + this.shardNumber + "/" + this.shardCount + ")";
    }

    protected abstract C getWorkerContext(Counters var1);

    protected abstract void callWorker(I var1, C var2);

    protected abstract String formatLastWorkItem(I var1);

    @Override
    public IncrementalTask.RunResult<WorkerShardTask<I, O, C>, WorkerResult<O>> run() {
        try {
            this.in.beginSlice();
        }
        catch (IOException e) {
            throw new RuntimeException(this.in + ".beginSlice() threw IOException", e);
        }
        try {
            this.out.beginSlice();
        }
        catch (IOException e) {
            throw new RuntimeException(this.out + ".beginSlice() threw IOException", e);
        }
        CountersImpl counters = new CountersImpl();
        C context = this.getWorkerContext(counters);
        if (this.isFirstSlice) {
            this.isFirstSlice = false;
            this.worker.beginShard(context);
        }
        this.worker.beginSlice(context);
        Stopwatch overallStopwatch = new Stopwatch().start();
        Stopwatch workerStopwatch = new Stopwatch();
        int workerCalls = 0;
        boolean inputExhausted = false;
        I next = null;
        do {
            try {
                next = this.in.next();
            }
            catch (NoSuchElementException e) {
                inputExhausted = true;
                break;
            }
            catch (IOException e) {
                if (workerCalls > 0) {
                    log.log(Level.SEVERE, this.in + ".next() threw IOException, ending slice early", e);
                    break;
                }
                throw new RuntimeException(this.in + ".next() threw IOException", e);
            }
            ++workerCalls;
            workerStopwatch.start();
            this.callWorker(next, context);
            workerStopwatch.stop();
        } while (overallStopwatch.elapsedMillis() < this.millisPerSlice);
        overallStopwatch.stop();
        counters.getCounter(this.workerCallsCounterName).increment(workerCalls);
        counters.getCounter(this.workerMillisCounterName).increment(workerStopwatch.elapsedMillis());
        log.info("Ending slice, inputExhausted=" + inputExhausted + ", overallStopwatch=" + overallStopwatch + ", workerStopwatch=" + workerStopwatch);
        this.worker.endSlice(context);
        for (Runnable listener : ((EndSliceEventSource)context).getEndSliceListeners()) {
            listener.run();
        }
        if (inputExhausted) {
            this.worker.endShard(context);
        }
        try {
            this.out.endSlice();
        }
        catch (IOException e) {
            throw new RuntimeException(this.out + ".endSlice() threw IOException", e);
        }
        try {
            this.in.endSlice();
        }
        catch (IOException e) {
            throw new RuntimeException(this.in + ".endSlice() threw IOException", e);
        }
        ImmutableMap<Integer, WorkerShardState> workerShardStateMap = ImmutableMap.of(this.shardNumber, new WorkerShardState(workerCalls, System.currentTimeMillis(), this.formatLastWorkItem(next)));
        if (!inputExhausted) {
            return IncrementalTask.RunResult.of(new WorkerResult(ImmutableMap.of(), workerShardStateMap, counters), this);
        }
        try {
            this.out.close();
        }
        catch (IOException e) {
            throw new RuntimeException(this.out + ".close() threw IOException", e);
        }
        return IncrementalTask.RunResult.of(new WorkerResult<O>(ImmutableMap.of(this.shardNumber, this.out), workerShardStateMap, counters), null);
    }

    protected static String abbrev(Object x) {
        if (x == null) {
            return null;
        }
        String s = "" + x;
        if (s.length() > 100) {
            return s.substring(0, 100) + "...";
        }
        return s;
    }
}

