/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.File;
import java.io.FilterOutputStream;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.JobCounter;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestJobCleanup {
    private static String TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp") + "/" + "test-job-cleanup").toString();
    private static final String CUSTOM_CLEANUP_FILE_NAME = "_custom_cleanup";
    private static final String ABORT_KILLED_FILE_NAME = "_custom_abort_killed";
    private static final String ABORT_FAILED_FILE_NAME = "_custom_abort_failed";
    private static FileSystem fileSys = null;
    private static MiniMRCluster mr = null;
    private static Path inDir = null;
    private static Path emptyInDir = null;
    private static int outDirs = 0;
    private static Log LOG = LogFactory.getLog(TestJobCleanup.class);

    @BeforeClass
    public static void setUp() throws IOException {
        JobConf conf = new JobConf();
        fileSys = FileSystem.get(conf);
        fileSys.delete(new Path(TEST_ROOT_DIR), true);
        conf.set("mapred.job.tracker.handler.count", "1");
        conf.set("mapred.job.tracker", "127.0.0.1:0");
        conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
        conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
        conf.set("mapreduce.jobhistory.intermediate-done-dir", TEST_ROOT_DIR + "/intermediate");
        conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
        mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
        inDir = new Path(TEST_ROOT_DIR, "test-input");
        String input = "The quick brown fox\nhas many silly\nred fox sox\n";
        FSDataOutputStream file = fileSys.create(new Path(inDir, "part-0"));
        file.writeBytes(input);
        ((FilterOutputStream)file).close();
        emptyInDir = new Path(TEST_ROOT_DIR, "empty-input");
        fileSys.mkdirs(emptyInDir);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (fileSys != null) {
            fileSys.close();
        }
        if (mr != null) {
            mr.shutdown();
        }
    }

    private Path getNewOutputDir() {
        return new Path(TEST_ROOT_DIR, "output-" + outDirs++);
    }

    private void configureJob(JobConf jc, String jobName, int maps, int reds, Path outDir) {
        jc.setJobName(jobName);
        jc.setInputFormat(TextInputFormat.class);
        jc.setOutputKeyClass(LongWritable.class);
        jc.setOutputValueClass(Text.class);
        FileInputFormat.setInputPaths(jc, inDir);
        FileOutputFormat.setOutputPath(jc, outDir);
        jc.setMapperClass(IdentityMapper.class);
        jc.setReducerClass(IdentityReducer.class);
        jc.setNumMapTasks(maps);
        jc.setNumReduceTasks(reds);
    }

    private void testSuccessfulJob(String filename, Class<? extends OutputCommitter> committer, String[] exclude) throws IOException {
        JobConf jc = mr.createJobConf();
        Path outDir = this.getNewOutputDir();
        this.configureJob(jc, "job with cleanup()", 1, 0, outDir);
        jc.setOutputCommitter(committer);
        JobClient jobClient = new JobClient(jc);
        RunningJob job = jobClient.submitJob(jc);
        JobID id = job.getID();
        job.waitForCompletion();
        LOG.info((Object)("Job finished : " + job.isComplete()));
        Path testFile = new Path(outDir, filename);
        Assert.assertTrue((String)("Done file \"" + testFile + "\" missing for job " + id), (boolean)fileSys.exists(testFile));
        for (String ex : exclude) {
            Path file = new Path(outDir, ex);
            Assert.assertFalse((String)("File " + file + " should not be present for successful job " + id), (boolean)fileSys.exists(file));
        }
    }

    private void testFailedJob(String fileName, Class<? extends OutputCommitter> committer, String[] exclude) throws IOException {
        JobConf jc = mr.createJobConf();
        Path outDir = this.getNewOutputDir();
        this.configureJob(jc, "fail job with abort()", 1, 0, outDir);
        jc.setMaxMapAttempts(1);
        jc.setMapperClass(UtilsForTests.FailMapper.class);
        jc.setOutputCommitter(committer);
        JobClient jobClient = new JobClient(jc);
        RunningJob job = jobClient.submitJob(jc);
        JobID id = job.getID();
        job.waitForCompletion();
        Assert.assertEquals((String)"Job did not fail", (long)JobStatus.FAILED, (long)job.getJobState());
        if (fileName != null) {
            Path testFile = new Path(outDir, fileName);
            Assert.assertTrue((String)("File " + testFile + " missing for failed job " + id), (boolean)fileSys.exists(testFile));
        }
        for (String ex : exclude) {
            Path file = new Path(outDir, ex);
            Assert.assertFalse((String)("File " + file + " should not be present for failed job " + id), (boolean)fileSys.exists(file));
        }
    }

    private void testKilledJob(String fileName, Class<? extends OutputCommitter> committer, String[] exclude) throws IOException {
        JobConf jc = mr.createJobConf();
        Path outDir = this.getNewOutputDir();
        this.configureJob(jc, "kill job with abort()", 1, 0, outDir);
        jc.setMapperClass(UtilsForTests.KillMapper.class);
        jc.setOutputCommitter(committer);
        JobClient jobClient = new JobClient(jc);
        RunningJob job = jobClient.submitJob(jc);
        JobID id = job.getID();
        Counters counters = job.getCounters();
        while (counters.getCounter(JobCounter.TOTAL_LAUNCHED_MAPS) != 1L) {
            LOG.info((Object)"Waiting for a map task to be launched");
            UtilsForTests.waitFor(100L);
            counters = job.getCounters();
        }
        job.killJob();
        job.waitForCompletion();
        Assert.assertEquals((String)"Job was not killed", (long)JobStatus.KILLED, (long)job.getJobState());
        if (fileName != null) {
            Path testFile = new Path(outDir, fileName);
            Assert.assertTrue((String)("File " + testFile + " missing for job " + id), (boolean)fileSys.exists(testFile));
        }
        for (String ex : exclude) {
            Path file = new Path(outDir, ex);
            Assert.assertFalse((String)("File " + file + " should not be present for killed job " + id), (boolean)fileSys.exists(file));
        }
    }

    @Test
    public void testDefaultCleanupAndAbort() throws IOException {
        this.testSuccessfulJob("_SUCCESS", FileOutputCommitter.class, new String[0]);
        this.testFailedJob(null, FileOutputCommitter.class, new String[]{"_SUCCESS"});
        this.testKilledJob(null, FileOutputCommitter.class, new String[]{"_SUCCESS"});
    }

    @Test
    public void testCustomAbort() throws IOException {
        this.testSuccessfulJob("_SUCCESS", CommitterWithCustomAbort.class, new String[]{ABORT_FAILED_FILE_NAME, ABORT_KILLED_FILE_NAME});
        this.testFailedJob(ABORT_FAILED_FILE_NAME, CommitterWithCustomAbort.class, new String[]{"_SUCCESS", ABORT_KILLED_FILE_NAME});
        this.testKilledJob(ABORT_KILLED_FILE_NAME, CommitterWithCustomAbort.class, new String[]{"_SUCCESS", ABORT_FAILED_FILE_NAME});
    }

    @Test
    public void testCustomCleanup() throws IOException {
        this.testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME, CommitterWithCustomDeprecatedCleanup.class, new String[0]);
        this.testFailedJob(CUSTOM_CLEANUP_FILE_NAME, CommitterWithCustomDeprecatedCleanup.class, new String[]{"_SUCCESS"});
        this.testKilledJob(CUSTOM_CLEANUP_FILE_NAME, CommitterWithCustomDeprecatedCleanup.class, new String[]{"_SUCCESS"});
    }

    static class CommitterWithCustomAbort
    extends FileOutputCommitter {
        CommitterWithCustomAbort() {
        }

        @Override
        public void abortJob(JobContext context, int state) throws IOException {
            JobConf conf = context.getJobConf();
            Path outputPath = FileOutputFormat.getOutputPath(conf);
            FileSystem fs = outputPath.getFileSystem(conf);
            String fileName = state == JobStatus.FAILED ? TestJobCleanup.ABORT_FAILED_FILE_NAME : TestJobCleanup.ABORT_KILLED_FILE_NAME;
            fs.create(new Path(outputPath, fileName)).close();
        }
    }

    static class CommitterWithCustomDeprecatedCleanup
    extends FileOutputCommitter {
        CommitterWithCustomDeprecatedCleanup() {
        }

        @Override
        public void cleanupJob(JobContext context) throws IOException {
            System.err.println("---- HERE ----");
            JobConf conf = context.getJobConf();
            Path outputPath = FileOutputFormat.getOutputPath(conf);
            FileSystem fs = outputPath.getFileSystem(conf);
            fs.create(new Path(outputPath, TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME)).close();
        }

        @Override
        public void commitJob(JobContext context) throws IOException {
            this.cleanupJob(context);
        }

        @Override
        public void abortJob(JobContext context, int i) throws IOException {
            this.cleanupJob(context);
        }
    }
}

