/*
 * Decompiled with CFR 0.152.
 */
package com.google.appengine.tools.mapreduce.impl.shardedjob;

import com.google.appengine.api.backends.BackendServiceFactory;
import com.google.appengine.api.datastore.DatastoreService;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.EntityNotFoundException;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.api.datastore.Transaction;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TaskOptions;
import com.google.appengine.labs.repackaged.com.google.common.base.Preconditions;
import com.google.appengine.labs.repackaged.com.google.common.collect.ImmutableCollection;
import com.google.appengine.labs.repackaged.com.google.common.collect.ImmutableList;
import com.google.appengine.tools.mapreduce.impl.shardedjob.IncrementalTask;
import com.google.appengine.tools.mapreduce.impl.shardedjob.IncrementalTaskState;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobController;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobSettings;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobState;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobStateImpl;
import com.google.appengine.tools.mapreduce.impl.shardedjob.Status;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

class ShardedJobRunner<T extends IncrementalTask<T, R>, R extends Serializable> {
    private static final Logger log = Logger.getLogger(ShardedJobRunner.class.getName());
    static final String JOB_ID_PARAM = "job";
    static final String TASK_ID_PARAM = "task";
    static final String SEQUENCE_NUMBER_PARAM = "seq";
    private static final DatastoreService DATASTORE = DatastoreServiceFactory.getDatastoreService();

    ShardedJobRunner() {
    }

    private ShardedJobStateImpl<T, R> lookupJobState(Transaction tx, String jobId) {
        Entity entity;
        try {
            entity = DATASTORE.get(tx, ShardedJobStateImpl.Serializer.makeKey(jobId));
        }
        catch (EntityNotFoundException e) {
            return null;
        }
        return ShardedJobStateImpl.Serializer.fromEntity(entity);
    }

    private IncrementalTaskState<T, R> lookupTaskState(Transaction tx, String taskId) {
        Entity entity;
        try {
            entity = DATASTORE.get(tx, IncrementalTaskState.Serializer.makeKey(taskId));
        }
        catch (EntityNotFoundException e) {
            return null;
        }
        return IncrementalTaskState.Serializer.fromEntity(entity);
    }

    private List<IncrementalTaskState<T, R>> lookupTasks(ShardedJobState jobState) {
        ImmutableList.Builder b = ImmutableList.builder();
        for (int i = 0; i < jobState.getTotalTaskCount(); ++i) {
            b.add(IncrementalTaskState.Serializer.makeKey(this.getTaskId(jobState.getJobId(), i)));
        }
        ImmutableCollection keys = b.build();
        Map entities = DATASTORE.get((Iterable)keys);
        ImmutableList.Builder out = ImmutableList.builder();
        for (Key key : keys) {
            Entity entity = (Entity)entities.get(key);
            Preconditions.checkState(entity != null, "%s: Missing task: %s", this, key);
            out.add(IncrementalTaskState.Serializer.fromEntity(entity));
        }
        return out.build();
    }

    private int countActiveTasks(List<IncrementalTaskState<T, R>> taskStates) {
        int count = 0;
        for (IncrementalTaskState<T, R> taskState : taskStates) {
            if (taskState.getNextTask() == null) continue;
            ++count;
        }
        return count;
    }

    private R aggregateState(ShardedJobController<T, R> controller, List<IncrementalTaskState<T, R>> taskStates) {
        ImmutableList.Builder results = ImmutableList.builder();
        for (IncrementalTaskState<T, R> taskState : taskStates) {
            results.add(taskState.getPartialResult());
        }
        return controller.combineResults(results.build());
    }

    private void scheduleControllerTask(Transaction tx, ShardedJobStateImpl<T, R> state) {
        ShardedJobSettings settings = state.getSettings();
        TaskOptions taskOptions = TaskOptions.Builder.withMethod((TaskOptions.Method)TaskOptions.Method.POST).url(settings.getControllerPath()).param(JOB_ID_PARAM, state.getJobId()).param(SEQUENCE_NUMBER_PARAM, "" + state.getNextSequenceNumber()).countdownMillis((long)settings.getMillisBetweenPolls());
        if (settings.getControllerBackend() != null) {
            taskOptions.header("Host", BackendServiceFactory.getBackendService().getBackendAddress(settings.getControllerBackend()));
        }
        QueueFactory.getQueue((String)settings.getControllerQueueName()).add(tx, taskOptions);
    }

    private void scheduleWorkerTask(Transaction tx, ShardedJobSettings settings, IncrementalTaskState<T, R> state) {
        TaskOptions taskOptions = TaskOptions.Builder.withMethod((TaskOptions.Method)TaskOptions.Method.POST).url(settings.getWorkerPath()).param(TASK_ID_PARAM, state.getTaskId()).param(JOB_ID_PARAM, state.getJobId()).param(SEQUENCE_NUMBER_PARAM, "" + state.getNextSequenceNumber());
        if (settings.getWorkerBackend() != null) {
            taskOptions.header("Host", BackendServiceFactory.getBackendService().getBackendAddress(settings.getWorkerBackend()));
        }
        QueueFactory.getQueue((String)settings.getWorkerQueueName()).add(tx, taskOptions);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void pollTaskStates(String jobId, int sequenceNumber) {
        ShardedJobStateImpl<T, R> jobState = this.lookupJobState(null, jobId);
        if (jobState == null) {
            log.info(jobId + ": Job gone");
            return;
        }
        log.info("Polling task states for job " + jobId + ", sequence number " + sequenceNumber);
        Preconditions.checkState(jobState.getStatus() != Status.INITIALIZING, "Should be done initializing: %s", jobState);
        if (!jobState.getStatus().isActive()) {
            log.info(jobId + ": Job no longer active: " + jobState);
            return;
        }
        if (jobState.getNextSequenceNumber() != (long)sequenceNumber) {
            Preconditions.checkState(jobState.getNextSequenceNumber() > (long)sequenceNumber, "%s: Job state is from the past: %s", jobId, jobState);
            log.info(jobId + ": Poll sequence number " + sequenceNumber + " already completed: " + jobState);
            return;
        }
        long currentPollTimeMillis = System.currentTimeMillis();
        List<IncrementalTaskState<T, R>> taskStates = this.lookupTasks(jobState);
        int activeTasks = this.countActiveTasks(taskStates);
        jobState.setMostRecentUpdateTimeMillis(currentPollTimeMillis);
        jobState.setActiveTaskCount(activeTasks);
        if (activeTasks == 0) {
            jobState.setStatus(Status.DONE);
            R aggregateResult = this.aggregateState(jobState.getController(), taskStates);
            jobState.getController().completed(aggregateResult);
        } else {
            jobState.setNextSequenceNumber(sequenceNumber + 1);
        }
        log.fine(jobId + ": Writing " + jobState);
        Transaction tx = DATASTORE.beginTransaction();
        try {
            ShardedJobStateImpl<T, R> existing = this.lookupJobState(tx, jobId);
            if (existing == null) {
                log.info(jobId + ": Job gone after poll");
                return;
            }
            if (existing.getNextSequenceNumber() != (long)sequenceNumber) {
                log.info(jobId + ": Job processed concurrently; was sequence number " + sequenceNumber + ", now " + existing.getNextSequenceNumber());
                return;
            }
            DATASTORE.put(tx, ShardedJobStateImpl.Serializer.toEntity(jobState));
            if (jobState.getStatus().isActive()) {
                this.scheduleControllerTask(tx, jobState);
            }
            tx.commit();
        }
        finally {
            if (tx.isActive()) {
                tx.rollback();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void runTask(String taskId, String jobId, int sequenceNumber) {
        ShardedJobStateImpl<T, R> jobState = this.lookupJobState(null, jobId);
        if (jobState == null) {
            log.info(taskId + ": Job gone");
            return;
        }
        if (!jobState.getStatus().isActive()) {
            log.info(taskId + ": Job no longer active: " + jobState);
            return;
        }
        IncrementalTaskState taskState = this.lookupTaskState(null, taskId);
        if (taskState == null) {
            log.info(taskId + ": Task gone");
            return;
        }
        log.info("Running task " + taskId + " (job " + jobId + "), sequence number " + sequenceNumber);
        if (taskState.getNextSequenceNumber() != sequenceNumber) {
            Preconditions.checkState(taskState.getNextSequenceNumber() > sequenceNumber, "%s: Task state is from the past: %s", taskId, taskState);
            log.info(taskId + ": Task sequence number " + sequenceNumber + " already completed: " + taskState);
            return;
        }
        Preconditions.checkState(taskState.getNextTask() != null, "%s: Next task is null", taskState);
        log.fine("About to run task: " + taskState);
        IncrementalTask.RunResult result = taskState.getNextTask().run();
        taskState.setPartialResult(jobState.getController().combineResults(ImmutableList.of(taskState.getPartialResult(), result.getPartialResult())));
        taskState.setNextTask(result.getFollowupTask());
        taskState.setMostRecentUpdateMillis(System.currentTimeMillis());
        taskState.setNextSequenceNumber(sequenceNumber + 1);
        Transaction tx = DATASTORE.beginTransaction();
        try {
            IncrementalTaskState<T, R> existing = this.lookupTaskState(tx, taskId);
            if (existing == null) {
                log.info(taskId + ": Task disappeared while processing");
                return;
            }
            if (existing.getNextSequenceNumber() != sequenceNumber) {
                log.info(taskId + ": Task processed concurrently; was sequence number " + sequenceNumber + ", now " + existing.getNextSequenceNumber());
                return;
            }
            DATASTORE.put(tx, IncrementalTaskState.Serializer.toEntity(taskState));
            if (result.getFollowupTask() != null) {
                this.scheduleWorkerTask(tx, jobState.getSettings(), taskState);
            }
            tx.commit();
        }
        finally {
            if (tx.isActive()) {
                tx.rollback();
            }
        }
    }

    private String getTaskId(String jobId, int taskNumber) {
        return jobId + "-task" + taskNumber;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createTasks(ShardedJobController<T, R> controller, ShardedJobSettings settings, String jobId, List<? extends T> initialTasks, long startTimeMillis) {
        log.info(jobId + ": Creating " + initialTasks.size() + " tasks");
        for (int i = 0; i < initialTasks.size(); ++i) {
            String taskId = this.getTaskId(jobId, i);
            Transaction tx = DATASTORE.beginTransaction();
            try {
                IncrementalTaskState<T, R> existing = this.lookupTaskState(tx, taskId);
                if (existing != null) {
                    log.info(jobId + ": Task already exists: " + existing);
                    continue;
                }
                IncrementalTaskState<IncrementalTask, R> taskState = new IncrementalTaskState<IncrementalTask, R>(taskId, jobId, startTimeMillis, (IncrementalTask)initialTasks.get(i), controller.combineResults(ImmutableList.of()));
                DATASTORE.put(tx, IncrementalTaskState.Serializer.toEntity(taskState));
                this.scheduleWorkerTask(tx, settings, taskState);
                tx.commit();
                continue;
            }
            finally {
                if (tx.isActive()) {
                    tx.rollback();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean writeInitialJobState(ShardedJobStateImpl<T, R> jobState) {
        String jobId = jobState.getJobId();
        log.fine(jobId + ": Writing initial job state");
        Transaction tx = DATASTORE.beginTransaction();
        try {
            ShardedJobStateImpl<T, R> existing = this.lookupJobState(tx, jobId);
            if (existing == null) {
                DATASTORE.put(tx, ShardedJobStateImpl.Serializer.toEntity(jobState));
                tx.commit();
            } else {
                if (!existing.getStatus().isActive()) {
                    log.info(jobId + ": Attempt to reinitialize inactive job: " + existing);
                    boolean bl = false;
                    return bl;
                }
                log.info(jobId + ": Reinitializing job: " + existing);
            }
            boolean bl = true;
            return bl;
        }
        finally {
            if (tx.isActive()) {
                tx.rollback();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void scheduleControllerAndMarkActive(ShardedJobStateImpl<T, R> jobState) {
        String jobId = jobState.getJobId();
        log.fine(jobId + ": Scheduling controller and marking active");
        Transaction tx = DATASTORE.beginTransaction();
        try {
            ShardedJobStateImpl<T, R> existing = this.lookupJobState(tx, jobId);
            if (existing == null) {
                log.warning(jobId + ": Job disappeared while initializing");
                return;
            }
            if (existing.getStatus() != Status.INITIALIZING) {
                log.info(jobId + ": Job changed status while initializing: " + jobState);
                return;
            }
            DATASTORE.put(tx, ShardedJobStateImpl.Serializer.toEntity(jobState));
            this.scheduleControllerTask(tx, jobState);
            tx.commit();
        }
        finally {
            if (tx.isActive()) {
                tx.rollback();
            }
        }
        log.info(jobId + ": Started");
    }

    void startJob(String jobId, List<? extends T> rawInitialTasks, ShardedJobController<T, R> controller, ShardedJobSettings settings) {
        long startTimeMillis = System.currentTimeMillis();
        ImmutableList<T> initialTasks = ImmutableList.copyOf(rawInitialTasks);
        ShardedJobStateImpl<T, Object> jobState = new ShardedJobStateImpl<T, Object>(jobId, controller, settings, initialTasks.size(), startTimeMillis, Status.INITIALIZING, null);
        if (initialTasks.isEmpty()) {
            log.info(jobId + ": No tasks, immediately complete: " + controller);
            jobState.setStatus(Status.DONE);
            DATASTORE.put(ShardedJobStateImpl.Serializer.toEntity(jobState));
            controller.completed(controller.combineResults(ImmutableList.of()));
            return;
        }
        if (!this.writeInitialJobState(jobState)) {
            return;
        }
        this.createTasks(controller, settings, jobId, initialTasks, startTimeMillis);
        jobState.setStatus(Status.RUNNING);
        this.scheduleControllerAndMarkActive(jobState);
    }

    ShardedJobState<T, R> getJobState(String jobId) {
        ShardedJobStateImpl<T, R> jobState = this.lookupJobState(null, jobId);
        if (jobState == null) {
            return null;
        }
        Preconditions.checkState(jobState.getAggregateResult() == null, "%s: Non-null aggregate result: %s", jobState, jobState.getAggregateResult());
        List<IncrementalTaskState<T, R>> taskStates = this.lookupTasks(jobState);
        R aggregateResult = this.aggregateState(jobState.getController(), taskStates);
        jobState.setAggregateResult(aggregateResult);
        return jobState;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void abortJob(String jobId) {
        log.info(jobId + ": Aborting");
        Transaction tx = DATASTORE.beginTransaction();
        try {
            ShardedJobStateImpl<T, R> jobState = this.lookupJobState(tx, jobId);
            if (jobState == null || !jobState.getStatus().isActive()) {
                log.info(jobId + ": Job not active, not aborting: " + jobState);
                return;
            }
            jobState.setStatus(Status.ABORTED);
            DATASTORE.put(tx, ShardedJobStateImpl.Serializer.toEntity(jobState));
            tx.commit();
        }
        finally {
            if (tx.isActive()) {
                tx.rollback();
            }
        }
    }
}

