/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import junit.framework.Assert;
import org.apache.flink.hadoop.shaded.com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Test;

public class TestSpeculativeExecutionWithMRApp {
    private static final int NUM_MAPPERS = 5;
    private static final int NUM_REDUCERS = 0;

    @Test
    public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
        SystemClock actualClock = new SystemClock();
        final ControlledClock clock = new ControlledClock(actualClock);
        clock.setTime(System.currentTimeMillis());
        MRApp app = new MRApp(5, 0, false, "test", true, (Clock)clock);
        Job job = app.submit(new Configuration(), true, true);
        app.waitForState(job, JobState.RUNNING);
        Map<TaskId, Task> tasks = job.getTasks();
        Assert.assertEquals((String)"Num tasks is not correct", (int)5, (int)tasks.size());
        Iterator<Task> taskIter = tasks.values().iterator();
        while (taskIter.hasNext()) {
            app.waitForState(taskIter.next(), TaskState.RUNNING);
        }
        clock.setTime(System.currentTimeMillis() + 2000L);
        EventHandler appEventHandler = app.getContext().getEventHandler();
        for (Map.Entry<TaskId, Task> mapTask : tasks.entrySet()) {
            for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : mapTask.getValue().getAttempts().entrySet()) {
                TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = this.createTaskAttemptStatus(taskAttempt.getKey(), 0.8f, TaskAttemptState.RUNNING);
                TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
                appEventHandler.handle(event);
            }
        }
        Random generator = new Random();
        Object[] taskValues = tasks.values().toArray();
        final Task taskToBeSpeculated = (Task)taskValues[generator.nextInt(taskValues.length)];
        for (Map.Entry<TaskId, Task> mapTask : tasks.entrySet()) {
            for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : mapTask.getValue().getAttempts().entrySet()) {
                if (mapTask.getKey() == taskToBeSpeculated.getID()) continue;
                appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), TaskAttemptEventType.TA_DONE));
                appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), TaskAttemptEventType.TA_CONTAINER_CLEANED));
                app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED);
            }
        }
        GenericTestUtils.waitFor(new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                if (taskToBeSpeculated.getAttempts().size() != 2) {
                    clock.setTime(System.currentTimeMillis() + 1000L);
                    return false;
                }
                return true;
            }
        }, 1000, 60000);
        TaskAttempt[] ta = TestSpeculativeExecutionWithMRApp.makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
        TestSpeculativeExecutionWithMRApp.verifySpeculationMessage(app, ta);
        app.waitForState(Service.STATE.STOPPED);
    }

    @Test
    public void testSepculateSuccessfulWithUpdateEvents() throws Exception {
        TaskAttemptStatusUpdateEvent event;
        TaskAttemptStatusUpdateEvent.TaskAttemptStatus status;
        SystemClock actualClock = new SystemClock();
        final ControlledClock clock = new ControlledClock(actualClock);
        clock.setTime(System.currentTimeMillis());
        MRApp app = new MRApp(5, 0, false, "test", true, (Clock)clock);
        Job job = app.submit(new Configuration(), true, true);
        app.waitForState(job, JobState.RUNNING);
        Map<TaskId, Task> tasks = job.getTasks();
        Assert.assertEquals((String)"Num tasks is not correct", (int)5, (int)tasks.size());
        Iterator<Task> taskIter = tasks.values().iterator();
        while (taskIter.hasNext()) {
            app.waitForState(taskIter.next(), TaskState.RUNNING);
        }
        clock.setTime(System.currentTimeMillis() + 1000L);
        EventHandler appEventHandler = app.getContext().getEventHandler();
        for (Map.Entry<TaskId, Task> mapTask : tasks.entrySet()) {
            for (Map.Entry<Comparable<TaskAttemptId>, Object> entry : mapTask.getValue().getAttempts().entrySet()) {
                TaskAttemptStatusUpdateEvent.TaskAttemptStatus status2 = this.createTaskAttemptStatus((TaskAttemptId)entry.getKey(), 0.5f, TaskAttemptState.RUNNING);
                TaskAttemptStatusUpdateEvent event2 = new TaskAttemptStatusUpdateEvent((TaskAttemptId)entry.getKey(), status2);
                appEventHandler.handle(event2);
            }
        }
        Task speculatedTask = null;
        int numTasksToFinish = 4;
        clock.setTime(System.currentTimeMillis() + 1000L);
        for (Map.Entry<Comparable<TaskAttemptId>, Object> entry : tasks.entrySet()) {
            for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : ((Task)entry.getValue()).getAttempts().entrySet()) {
                if (numTasksToFinish > 0) {
                    appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), TaskAttemptEventType.TA_DONE));
                    appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), TaskAttemptEventType.TA_CONTAINER_CLEANED));
                    --numTasksToFinish;
                    app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED);
                    continue;
                }
                status = this.createTaskAttemptStatus(taskAttempt.getKey(), 0.75f, TaskAttemptState.RUNNING);
                speculatedTask = (Task)entry.getValue();
                event = new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
                appEventHandler.handle(event);
            }
        }
        clock.setTime(System.currentTimeMillis() + 15000L);
        for (Map.Entry<Comparable<TaskAttemptId>, Object> entry : tasks.entrySet()) {
            for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : ((Task)entry.getValue()).getAttempts().entrySet()) {
                if (taskAttempt.getValue().getState() == TaskAttemptState.SUCCEEDED) continue;
                status = this.createTaskAttemptStatus(taskAttempt.getKey(), 0.75f, TaskAttemptState.RUNNING);
                event = new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
                appEventHandler.handle(event);
            }
        }
        final Task speculatedTaskConst = speculatedTask;
        GenericTestUtils.waitFor(new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                if (speculatedTaskConst.getAttempts().size() != 2) {
                    clock.setTime(System.currentTimeMillis() + 1000L);
                    return false;
                }
                return true;
            }
        }, 1000, 60000);
        TaskAttempt[] taskAttemptArray = TestSpeculativeExecutionWithMRApp.makeFirstAttemptWin(appEventHandler, speculatedTask);
        TestSpeculativeExecutionWithMRApp.verifySpeculationMessage(app, taskAttemptArray);
        app.waitForState(Service.STATE.STOPPED);
    }

    private static TaskAttempt[] makeFirstAttemptWin(EventHandler appEventHandler, Task speculatedTask) {
        Collection<TaskAttempt> attempts = speculatedTask.getAttempts().values();
        TaskAttempt[] ta = new TaskAttempt[attempts.size()];
        attempts.toArray(ta);
        appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE));
        appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_CONTAINER_CLEANED));
        return ta;
    }

    private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta) throws Exception {
        app.waitForState(ta[0], TaskAttemptState.SUCCEEDED);
    }

    private TaskAttemptStatusUpdateEvent.TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id, float progress, TaskAttemptState state) {
        TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
        status.id = id;
        status.progress = progress;
        status.taskState = state;
        return status;
    }
}

