/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.FilterOutputStream;
import java.io.IOException;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import junit.framework.Assert;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.HadoopTestCase;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapred.WordCount;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.mortbay.jetty.HandlerContainer;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;

public abstract class NotificationTestCase
extends HadoopTestCase {
    private int port;
    private String contextPath = "/notification";
    private String servletPath = "/mapred";
    private Server webServer;

    protected NotificationTestCase(int mode) throws IOException {
        super(mode, 4, 1, 1);
    }

    private void startHttpServer() throws Exception {
        if (this.webServer != null) {
            this.webServer.stop();
            this.webServer = null;
        }
        this.webServer = new Server(0);
        Context context = new Context((HandlerContainer)this.webServer, this.contextPath);
        context.addServlet(new ServletHolder((Servlet)new NotificationServlet()), this.servletPath);
        this.webServer.start();
        this.port = this.webServer.getConnectors()[0].getLocalPort();
    }

    private void stopHttpServer() throws Exception {
        if (this.webServer != null) {
            this.webServer.stop();
            this.webServer.destroy();
            this.webServer = null;
        }
    }

    private String getNotificationUrlTemplate() {
        return "http://localhost:" + this.port + this.contextPath + this.servletPath + "?jobId=$jobId&amp;jobStatus=$jobStatus";
    }

    @Override
    protected JobConf createJobConf() {
        JobConf conf = super.createJobConf();
        conf.setJobEndNotificationURI(this.getNotificationUrlTemplate());
        conf.setInt("mapreduce.job.end-notification.retry.attempts", 3);
        conf.setInt("mapreduce.job.end-notification.retry.interval", 200);
        return conf;
    }

    @Override
    protected void setUp() throws Exception {
        super.setUp();
        this.startHttpServer();
    }

    @Override
    protected void tearDown() throws Exception {
        this.stopHttpServer();
        super.tearDown();
    }

    public void testMR() throws Exception {
        int tries;
        System.out.println(this.launchWordCount(this.createJobConf(), "a b c d e f g h", 1, 1));
        boolean keepTrying = true;
        for (int tries2 = 0; tries2 < 30 && keepTrying; ++tries2) {
            Thread.sleep(50L);
            keepTrying = NotificationServlet.counter != 2;
        }
        NotificationTestCase.assertEquals((int)2, (int)NotificationServlet.counter);
        NotificationTestCase.assertEquals((int)0, (int)NotificationServlet.failureCounter);
        Path inDir = new Path("notificationjob/input");
        Path outDir = new Path("notificationjob/output");
        if (this.isLocalFS()) {
            String localPathRoot = System.getProperty("test.build.data", "/tmp").toString().replace(' ', '+');
            inDir = new Path(localPathRoot, inDir);
            outDir = new Path(localPathRoot, outDir);
        }
        System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir, outDir).getID());
        keepTrying = true;
        for (tries = 0; tries < 30 && keepTrying; ++tries) {
            Thread.sleep(50L);
            keepTrying = NotificationServlet.counter != 4;
        }
        NotificationTestCase.assertEquals((int)4, (int)NotificationServlet.counter);
        NotificationTestCase.assertEquals((int)0, (int)NotificationServlet.failureCounter);
        System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir, outDir).getID());
        keepTrying = true;
        for (tries = 0; tries < 30 && keepTrying; ++tries) {
            Thread.sleep(50L);
            keepTrying = NotificationServlet.counter != 6;
        }
        NotificationTestCase.assertEquals((int)6, (int)NotificationServlet.counter);
        NotificationTestCase.assertEquals((int)0, (int)NotificationServlet.failureCounter);
    }

    private String launchWordCount(JobConf conf, String input, int numMaps, int numReduces) throws IOException {
        Path inDir = new Path("testing/wc/input");
        Path outDir = new Path("testing/wc/output");
        if (this.isLocalFS()) {
            String localPathRoot = System.getProperty("test.build.data", "/tmp").toString().replace(' ', '+');
            inDir = new Path(localPathRoot, inDir);
            outDir = new Path(localPathRoot, outDir);
        }
        FileSystem fs = FileSystem.get(conf);
        fs.delete(outDir, true);
        if (!fs.mkdirs(inDir)) {
            throw new IOException("Mkdirs failed to create " + inDir.toString());
        }
        FSDataOutputStream file = fs.create(new Path(inDir, "part-0"));
        file.writeBytes(input);
        ((FilterOutputStream)file).close();
        conf.setJobName("wordcount");
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setMapperClass(WordCount.MapClass.class);
        conf.setCombinerClass(WordCount.Reduce.class);
        conf.setReducerClass(WordCount.Reduce.class);
        FileInputFormat.setInputPaths(conf, inDir);
        FileOutputFormat.setOutputPath(conf, outDir);
        conf.setNumMapTasks(numMaps);
        conf.setNumReduceTasks(numReduces);
        JobClient.runJob(conf);
        return MapReduceTestUtil.readOutput(outDir, conf);
    }

    public static class NotificationServlet
    extends HttpServlet {
        public static volatile int counter = 0;
        public static volatile int failureCounter = 0;
        private static final long serialVersionUID = 1L;

        protected void doGet(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {
            String queryString = req.getQueryString();
            switch (counter) {
                case 0: {
                    this.verifyQuery(queryString, "SUCCEEDED");
                    break;
                }
                case 2: {
                    this.verifyQuery(queryString, "KILLED");
                    break;
                }
                case 4: {
                    this.verifyQuery(queryString, "FAILED");
                }
            }
            if (counter % 2 == 0) {
                res.sendError(400, "forcing error");
            } else {
                res.setStatus(200);
            }
            ++counter;
        }

        protected void verifyQuery(String query, String expected) throws IOException {
            if (query.contains(expected)) {
                return;
            }
            ++failureCounter;
            Assert.assertTrue((String)("The request (" + query + ") does not contain " + expected), (boolean)false);
        }
    }
}

