package org.apache.hadoop.mapred;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.QueueManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/mapred/TestRecoveryManager.class */
public class TestRecoveryManager {
    private static final Log LOG = LogFactory.getLog(TestRecoveryManager.class);
    private static final Path TEST_DIR = new Path(System.getProperty("test.build.data", "/tmp"), "test-recovery-manager");
    private FileSystem fs;
    private JobConf conf;
    private MiniMRCluster mr;

    @Before
    public void setUp() {
        JobConf jobConf = new JobConf();
        try {
            this.fs = FileSystem.get(new Configuration());
            this.fs.delete(TEST_DIR, true);
            jobConf.set("mapred.jobtracker.job.history.block.size", "1024");
            jobConf.set("mapred.jobtracker.job.history.buffer.size", "1024");
            this.mr = new MiniMRCluster(1, "file:///", 1, (String[]) null, (String[]) null, jobConf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @After
    public void tearDown() {
        if (this.mr.getJobTrackerRunner().getJobTracker().getClusterStatus(false).getJobTrackerState() == JobTracker.State.RUNNING) {
            this.mr.shutdown();
        }
    }

    @Test(timeout = 120000)
    public void testJobTrackerRestartsWithMissingJobFile() throws Exception {
        LOG.info("Testing jobtracker restart with faulty job");
        String path = new Path(TEST_DIR, "signal").toString();
        JobConf createJobConf = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(createJobConf, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0, "test-recovery-manager", path, path);
        RunningJob submitJob = new JobClient(createJobConf).submitJob(createJobConf);
        LOG.info("Submitted job " + submitJob.getID());
        while (submitJob.mapProgress() < 0.5f) {
            LOG.info("Waiting for job " + submitJob.getID() + " to be 50% done");
            UtilsForTests.waitFor(100L);
        }
        JobConf createJobConf2 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(createJobConf2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 30, 0, "test-recovery-manager", path, path);
        RunningJob submitJob2 = new JobClient(createJobConf2).submitJob(createJobConf2);
        LOG.info("Submitted job " + submitJob2.getID());
        while (submitJob2.mapProgress() < 0.5f) {
            LOG.info("Waiting for job " + submitJob2.getID() + " to be 50% done");
            UtilsForTests.waitFor(100L);
        }
        LOG.info("Stopping jobtracker");
        String systemDir = this.mr.getJobTrackerRunner().getJobTracker().getSystemDir();
        this.mr.stopJobTracker();
        Path path2 = new Path(systemDir, submitJob.getID().toString() + "/job-info");
        LOG.info("Deleting job token file : " + path2.toString());
        Assert.assertTrue(this.fs.delete(path2, false));
        FSDataOutputStream create = this.fs.create(path2);
        create.write(1);
        create.close();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        LOG.info("Starting jobtracker");
        this.mr.startJobTracker();
        JobTracker jobTracker = this.mr.getJobTrackerRunner().getJobTracker();
        Assert.assertEquals("JobTracker crashed!", JobTracker.State.RUNNING, jobTracker.getClusterStatus(false).getJobTrackerState());
        JobInProgress job = jobTracker.getJob(submitJob2.getID());
        while (!job.isComplete()) {
            LOG.info("Waiting for job " + submitJob2.getID() + " to be successful");
            this.fs.create(new Path(TEST_DIR, "signal"));
            UtilsForTests.waitFor(100L);
        }
        Assert.assertTrue("Job should be successful", submitJob2.isSuccessful());
    }

    @Test(timeout = 120000)
    public void testJobResubmission() throws Exception {
        LOG.info("Testing Job Resubmission");
        String path = new Path(TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        this.mr.getJobTrackerRunner().getJobTracker();
        JobConf createJobConf = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(createJobConf, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 2, 0, "test-resubmission", path, path);
        JobClient jobClient = new JobClient(createJobConf);
        RunningJob submitJob = jobClient.submitJob(createJobConf);
        LOG.info("Submitted first job " + submitJob.getID());
        while (submitJob.mapProgress() < 0.5f) {
            LOG.info("Waiting for job " + submitJob.getID() + " to be 50% done");
            UtilsForTests.waitFor(100L);
        }
        LOG.info("Stopping jobtracker");
        this.mr.stopJobTracker();
        LOG.info("Starting jobtracker");
        this.mr.startJobTracker();
        UtilsForTests.waitForJobTracker(jobClient);
        JobTracker jobTracker = this.mr.getJobTrackerRunner().getJobTracker();
        Assert.assertEquals("Resubmission failed ", 1L, jobTracker.getAllJobs().length);
        JobInProgress job = jobTracker.getJob(submitJob.getID());
        while (!job.isComplete()) {
            LOG.info("Waiting for job " + submitJob.getID() + " to be successful");
            this.fs.create(new Path(TEST_DIR, "signal"));
            UtilsForTests.waitFor(100L);
        }
        Assert.assertTrue("Task should be successful", submitJob.isSuccessful());
    }

    @Test(timeout = 120000)
    public void testJobTrackerRestartWithBadJobs() throws Exception {
        LOG.info("Testing recovery-manager");
        String path = new Path(TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        JobTracker jobTracker = this.mr.getJobTrackerRunner().getJobTracker();
        JobConf createJobConf = this.mr.createJobConf();
        createJobConf.setJobPriority(JobPriority.HIGH);
        UtilsForTests.configureWaitingJobConf(createJobConf, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 30, 0, "test-recovery-manager", path, path);
        JobClient jobClient = new JobClient(createJobConf);
        RunningJob submitJob = jobClient.submitJob(createJobConf);
        LOG.info("Submitted first job " + submitJob.getID());
        while (submitJob.mapProgress() < 0.5f) {
            LOG.info("Waiting for job " + submitJob.getID() + " to be 50% done");
            UtilsForTests.waitFor(100L);
        }
        JobConf createJobConf2 = this.mr.createJobConf();
        String path2 = new Path(TEST_DIR, "signal1").toString();
        UtilsForTests.configureWaitingJobConf(createJobConf2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 20, 0, "test-recovery-manager", path2, path2);
        RunningJob submitJob2 = new JobClient(createJobConf2).submitJob(createJobConf2);
        LOG.info("Submitted job " + submitJob2.getID());
        JobInProgress job = jobTracker.getJob(submitJob2.getID());
        while (!job.inited()) {
            LOG.info("Waiting for job " + job.getJobID() + " to be inited");
            UtilsForTests.waitFor(100L);
        }
        final JobConf createJobConf3 = this.mr.createJobConf();
        UserGroupInformation createUserForTesting = UserGroupInformation.createUserForTesting("abc", new String[]{"users"});
        UtilsForTests.configureWaitingJobConf(createJobConf3, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 1, 0, "test-recovery-manager", path, path);
        RunningJob runningJob = (RunningJob) createUserForTesting.doAs(new PrivilegedExceptionAction<RunningJob>() { // from class: org.apache.hadoop.mapred.TestRecoveryManager.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.security.PrivilegedExceptionAction
            public RunningJob run() throws IOException {
                return new JobClient(createJobConf3).submitJob(createJobConf3);
            }
        });
        LOG.info("Submitted job " + runningJob.getID() + " with different user");
        JobInProgress job2 = jobTracker.getJob(runningJob.getID());
        while (!job2.inited()) {
            LOG.info("Waiting for job " + job2.getJobID() + " to be inited");
            UtilsForTests.waitFor(100L);
        }
        LOG.info("Stopping jobtracker");
        this.mr.stopJobTracker();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        this.mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 25);
        this.mr.getJobTrackerConf().setBoolean("mapred.acls.enabled", true);
        this.mr.getJobTrackerConf().set(QueueManager.toFullPropertyName("default", QueueManager.QueueACL.SUBMIT_JOB.getAclName()), UserGroupInformation.getLoginUser().getUserName());
        LOG.info("Starting jobtracker");
        this.mr.startJobTracker();
        UtilsForTests.waitForJobTracker(jobClient);
        JobTracker jobTracker2 = this.mr.getJobTrackerRunner().getJobTracker();
        Assert.assertEquals("Recovery manager failed to tolerate job failures", 1L, jobTracker2.getAllJobs().length);
        Assert.assertNull("Faulty job should not be resubmitted", jobTracker2.getJobStatus(submitJob.getID()));
        JobInProgress job3 = jobTracker2.getJob(submitJob2.getID());
        Assert.assertFalse("Job should be running", job3.isComplete());
        Assert.assertNull("Job should be missing because of ACL changed", jobTracker2.getJobStatus(runningJob.getID()));
        while (!job3.isComplete()) {
            LOG.info("Waiting for job " + submitJob2.getID() + " to be successful");
            this.fs.create(new Path(TEST_DIR, "signal1"));
            UtilsForTests.waitFor(100L);
        }
        Assert.assertTrue("Job should be successful", submitJob2.isSuccessful());
    }

    @Test(timeout = 120000)
    public void testRestartCount() throws Exception {
        LOG.info("Testing Job Restart Count");
        String path = new Path(TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        JobTracker jobTracker = this.mr.getJobTrackerRunner().getJobTracker();
        JobConf createJobConf = this.mr.createJobConf();
        createJobConf.setJobPriority(JobPriority.HIGH);
        UtilsForTests.configureWaitingJobConf(createJobConf, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 30, 0, "test-restart", path, path);
        JobClient jobClient = new JobClient(createJobConf);
        RunningJob submitJob = jobClient.submitJob(createJobConf);
        LOG.info("Submitted first job " + submitJob.getID());
        JobInProgress job = jobTracker.getJob(submitJob.getID());
        while (!job.inited()) {
            LOG.info("Waiting for job " + job.getJobID() + " to be inited");
            UtilsForTests.waitFor(100L);
        }
        for (int i = 1; i <= 2; i++) {
            LOG.info("Stopping jobtracker for " + i + " time");
            this.mr.stopJobTracker();
            LOG.info("Starting jobtracker for " + i + " time");
            this.mr.startJobTracker();
            UtilsForTests.waitForJobTracker(jobClient);
            jobTracker = this.mr.getJobTrackerRunner().getJobTracker();
            Assert.assertEquals("Recovery manager failed to recover restart count", 0L, job.getNumRestarts());
        }
        submitJob.killJob();
        JobConf createJobConf2 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(createJobConf2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output8"), 50, 0, "test-restart-manager", path, path);
        LOG.info("Submitted first job after restart" + jobClient.submitJob(createJobConf2).getID());
        Assert.assertEquals("Restart count for new job is incorrect", 0L, jobTracker.getJob(r0.getID()).getNumRestarts());
        LOG.info("Stopping jobtracker for testing the fs errors");
        this.mr.stopJobTracker();
        Path restartCountFile = jobTracker.recoveryManager.getRestartCountFile();
        this.fs.delete(restartCountFile, false);
        FSDataOutputStream create = this.fs.create(restartCountFile);
        create.writeBoolean(true);
        create.close();
        LOG.info("Starting jobtracker with fs errors");
        this.mr.startJobTracker();
        Assert.assertFalse("JobTracker is still alive", this.mr.getJobTrackerRunner().isActive());
    }

    @Test(timeout = 120000)
    public void testJobTrackerInfoCreation() throws Exception {
        LOG.info("Testing jobtracker.info file");
        MiniDFSCluster miniDFSCluster = new MiniDFSCluster(new Configuration(), 1, true, null);
        String str = miniDFSCluster.getFileSystem().getUri().getHost() + ":" + miniDFSCluster.getFileSystem().getUri().getPort();
        miniDFSCluster.shutdownDataNodes();
        JobConf jobConf = new JobConf();
        FileSystem.setDefaultUri(jobConf, str);
        jobConf.set("mapred.job.tracker", "localhost:0");
        jobConf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
        JobTracker jobTracker = new JobTracker(jobConf);
        jobTracker.setSafeModeInternal(JobTracker.SafeModeAction.SAFEMODE_ENTER);
        jobTracker.initializeFilesystem();
        jobTracker.setSafeModeInternal(JobTracker.SafeModeAction.SAFEMODE_LEAVE);
        jobTracker.initialize();
        boolean z = false;
        try {
            jobTracker.recoveryManager.updateRestartCount();
        } catch (IOException e) {
            z = true;
        }
        Assert.assertTrue("JobTracker created info files without datanodes!!!", z);
        Path restartCountFile = jobTracker.recoveryManager.getRestartCountFile();
        jobTracker.recoveryManager.getTempRestartCountFile();
        FileSystem fileSystem = miniDFSCluster.getFileSystem();
        Assert.assertFalse("Info file exists after update failure", fileSystem.exists(restartCountFile));
        Assert.assertFalse("Temporary restart-file exists after update failure", fileSystem.exists(restartCountFile));
        miniDFSCluster.startDataNodes(jobConf, 1, true, null, null, null, null);
        miniDFSCluster.waitActive();
        boolean z2 = false;
        try {
            jobTracker.recoveryManager.updateRestartCount();
        } catch (IOException e2) {
            z2 = true;
        }
        Assert.assertFalse("JobTracker failed to create info files with datanodes!", z2);
    }
}
