/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.NetUtils;
import org.junit.Assert;
import org.junit.Test;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.duration.FiniteDuration;

public class TaskManagerProcessReapingTest {
    @Test
    public void testReapProcessOnFailure() {
        Process taskManagerProcess = null;
        ActorSystem jmActorSystem = null;
        StringWriter processOutput = new StringWriter();
        try {
            String javaCommand = CommonTestUtils.getJavaCommandPath();
            if (javaCommand == null) {
                System.out.println("---- Skipping TaskManagerProcessReapingTest : Could not find java executable ----");
                return;
            }
            File tempLogFile = File.createTempFile("testlogconfig", "properties");
            tempLogFile.deleteOnExit();
            CommonTestUtils.printLog4jDebugConfig(tempLogFile);
            InetAddress localhost = InetAddress.getByName("localhost");
            int jobManagerPort = NetUtils.getAvailablePort();
            Tuple2 localAddress = new Tuple2((Object)localhost.getHostAddress(), (Object)jobManagerPort);
            jmActorSystem = AkkaUtils.createActorSystem((Configuration)new Configuration(), (Option)new Some((Object)localAddress));
            JobManager.startJobManagerActors((Configuration)new Configuration(), (ActorSystem)jmActorSystem, JobManager.class, MemoryArchivist.class);
            int taskManagerPort = NetUtils.getAvailablePort();
            String[] command = new String[]{javaCommand, "-Dlog.level=DEBUG", "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), "-Xms256m", "-Xmx256m", "-classpath", CommonTestUtils.getCurrentClasspath(), TaskManagerTestEntryPoint.class.getName(), String.valueOf(jobManagerPort), String.valueOf(taskManagerPort)};
            ProcessBuilder bld = new ProcessBuilder(command);
            taskManagerProcess = bld.start();
            new PipeForwarder(taskManagerProcess.getErrorStream(), processOutput);
            String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s", NetUtils.ipAddressAndPortToUrlString((InetAddress)localhost, (int)taskManagerPort), TaskManager.TASK_MANAGER_NAME());
            ActorRef taskManagerRef = null;
            Throwable lastError = null;
            for (int i = 0; i < 40; ++i) {
                try {
                    taskManagerRef = TaskManager.getTaskManagerRemoteReference((String)taskManagerActorName, (ActorSystem)jmActorSystem, (FiniteDuration)new FiniteDuration(25L, TimeUnit.SECONDS));
                    break;
                }
                catch (Throwable t) {
                    lastError = t;
                    Thread.sleep(500L);
                    continue;
                }
            }
            Assert.assertTrue((String)"TaskManager process died", (boolean)CommonTestUtils.isProcessAlive(taskManagerProcess));
            if (taskManagerRef == null) {
                if (lastError != null) {
                    lastError.printStackTrace();
                }
                Assert.fail((String)("TaskManager process did not launch the TaskManager properly. Failed to look up " + taskManagerActorName));
            }
            taskManagerRef.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            long now = System.currentTimeMillis();
            long deadline = now + 10000L;
            while (now < deadline && CommonTestUtils.isProcessAlive(taskManagerProcess)) {
                Thread.sleep(100L);
                now = System.currentTimeMillis();
            }
            Assert.assertFalse((String)"TaskManager process did not terminate upon actor death", (boolean)CommonTestUtils.isProcessAlive(taskManagerProcess));
            int returnCode = taskManagerProcess.exitValue();
            Assert.assertEquals((String)"TaskManager died, but not because of the process reaper", (long)TaskManager.RUNTIME_FAILURE_RETURN_CODE(), (long)returnCode);
        }
        catch (Exception e) {
            e.printStackTrace();
            TaskManagerProcessReapingTest.printProcessLog(processOutput.toString());
            Assert.fail((String)e.getMessage());
        }
        catch (Error e) {
            e.printStackTrace();
            TaskManagerProcessReapingTest.printProcessLog(processOutput.toString());
            throw e;
        }
        finally {
            if (taskManagerProcess != null) {
                taskManagerProcess.destroy();
            }
            if (jmActorSystem != null) {
                jmActorSystem.shutdown();
            }
        }
    }

    private static void printProcessLog(String log) {
        System.out.println("-----------------------------------------");
        System.out.println("       BEGIN SPAWNED PROCESS LOG");
        System.out.println("-----------------------------------------");
        System.out.println(log);
        System.out.println("-----------------------------------------");
        System.out.println("        END SPAWNED PROCESS LOG");
        System.out.println("-----------------------------------------");
    }

    private static class PipeForwarder
    extends Thread {
        private final StringWriter target;
        private final InputStream source;

        public PipeForwarder(InputStream source, StringWriter target) {
            super("Pipe Forwarder");
            this.setDaemon(true);
            this.source = source;
            this.target = target;
            this.start();
        }

        @Override
        public void run() {
            try {
                int next;
                while ((next = this.source.read()) != -1) {
                    this.target.write(next);
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    public static class TaskManagerTestEntryPoint {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static void main(String[] args) {
            try {
                Object lock;
                int jobManagerPort = Integer.parseInt(args[0]);
                int taskManagerPort = Integer.parseInt(args[1]);
                Configuration cfg = new Configuration();
                cfg.setString("jobmanager.rpc.address", "localhost");
                cfg.setInteger("jobmanager.rpc.port", jobManagerPort);
                cfg.setInteger("taskmanager.memory.size", 4);
                cfg.setInteger("taskmanager.network.numberOfBuffers", 256);
                TaskManager.runTaskManager((String)"localhost", (int)taskManagerPort, (Configuration)cfg);
                Object object = lock = new Object();
                synchronized (object) {
                    lock.wait();
                }
            }
            catch (Throwable t) {
                System.exit(1);
            }
        }
    }
}

