package org.apache.hama.bsp;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.TaskStatus;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.GroomProtocol;
import org.apache.hama.ipc.MasterProtocol;
import org.apache.hama.util.BSPNetUtils;
import org.apache.hama.zookeeper.QuorumPeer;
import org.apache.log4j.LogManager;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

/* loaded from: input_file:org/apache/hama/bsp/GroomServer.class */
public class GroomServer implements Runnable, GroomProtocol, BSPPeerProtocol, Watcher {
    static final String SUBDIR = "groomServer";
    final Configuration conf;
    String groomServerName;
    String localHostname;
    String groomHostName;
    InetSocketAddress bspMasterAddr;
    private Instructor instructor;
    private int failures;
    private int maxCurrentTasks;
    private String rpcServer;
    private Server workerServer;
    MasterProtocol masterClient;
    InetSocketAddress taskReportAddress;
    public static final Log LOG = LogFactory.getLog(GroomServer.class);
    private static volatile int REPORT_INTERVAL = 1000;
    private static ZooKeeper zk = null;
    volatile boolean initialized = false;
    volatile boolean running = true;
    volatile boolean shuttingDown = false;
    boolean justInited = true;
    GroomServerStatus status = null;
    Path systemDirectory = null;
    FileSystem systemFS = null;
    Map<TaskAttemptID, TaskInProgress> tasks = new HashMap();
    Map<TaskAttemptID, TaskInProgress> runningTasks = null;
    Map<TaskAttemptID, TaskInProgress> finishedTasks = null;
    Map<TaskAttemptID, Integer> assignedPeerNames = null;
    Map<BSPJobID, RunningJob> runningJobs = null;
    Server taskReportServer = null;

    /* loaded from: input_file:org/apache/hama/bsp/GroomServer$BSPPeerChild.class */
    public static final class BSPPeerChild {
        public static void main(String[] strArr) throws Throwable {
            if (GroomServer.LOG.isDebugEnabled()) {
                GroomServer.LOG.debug("BSPPeerChild starting");
            }
            HamaConfiguration hamaConfiguration = new HamaConfiguration();
            InetSocketAddress inetSocketAddress = new InetSocketAddress(strArr[0], Integer.parseInt(strArr[1]));
            TaskAttemptID forName = TaskAttemptID.forName(strArr[2]);
            BSPPeerProtocol bSPPeerProtocol = (BSPPeerProtocol) RPC.getProxy(BSPPeerProtocol.class, 1L, inetSocketAddress, hamaConfiguration);
            BSPTask bSPTask = (BSPTask) bSPPeerProtocol.getTask(forName);
            int assignedPortNum = bSPPeerProtocol.getAssignedPortNum(forName);
            hamaConfiguration.addResource(new Path(bSPTask.getJobFile()));
            BSPJob bSPJob = new BSPJob(bSPTask.getJobID(), bSPTask.getJobFile());
            hamaConfiguration.set(Constants.PEER_HOST, strArr[3]);
            if (null != strArr && 5 == strArr.length) {
                hamaConfiguration.setInt("bsp.checkpoint.port", Integer.parseInt(strArr[4]));
            }
            hamaConfiguration.setInt(Constants.PEER_PORT, assignedPortNum);
            try {
                try {
                    try {
                        FileSystem.get(bSPJob.getConf()).setWorkingDirectory(bSPJob.getWorkingDirectory());
                        bSPTask.run(bSPJob, new BSPPeerImpl<>(bSPJob, hamaConfiguration, forName, bSPPeerProtocol, bSPTask.partition, bSPTask.splitClass, bSPTask.split, bSPTask.getCounters()), bSPPeerProtocol);
                        RPC.stopProxy(bSPPeerProtocol);
                        LogManager.shutdown();
                    } catch (SyncException e) {
                        GroomServer.LOG.fatal("SyncError from child", e);
                        bSPPeerProtocol.fatalError(forName, e.toString());
                        e.printStackTrace(new PrintStream(new ByteArrayOutputStream()));
                        RPC.stopProxy(bSPPeerProtocol);
                        LogManager.shutdown();
                    }
                } catch (FSError e2) {
                    GroomServer.LOG.fatal("FSError from child", e2);
                    bSPPeerProtocol.fsError(forName, e2.getMessage());
                    RPC.stopProxy(bSPPeerProtocol);
                    LogManager.shutdown();
                } catch (Throwable th) {
                    GroomServer.LOG.warn("Error running child", th);
                    th.printStackTrace(new PrintStream(new ByteArrayOutputStream()));
                    RPC.stopProxy(bSPPeerProtocol);
                    LogManager.shutdown();
                }
            } catch (Throwable th2) {
                RPC.stopProxy(bSPPeerProtocol);
                LogManager.shutdown();
                throw th2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hama/bsp/GroomServer$DispatchTasksHandler.class */
    public class DispatchTasksHandler implements DirectiveHandler {
        private DispatchTasksHandler() {
        }

        @Override // org.apache.hama.bsp.DirectiveHandler
        public void handle(Directive directive) throws DirectiveException {
            GroomServerAction[] actions = ((DispatchTasksDirective) directive).getActions();
            if (GroomServer.LOG.isDebugEnabled()) {
                GroomServer.LOG.debug("Got Response from BSPMaster with " + (actions != null ? actions.length : 0) + " actions");
            }
            if (actions != null) {
                GroomServer.this.assignedPeerNames = new HashMap();
                int i = 61000;
                for (GroomServerAction groomServerAction : actions) {
                    if (groomServerAction instanceof LaunchTaskAction) {
                        Task task = ((LaunchTaskAction) groomServerAction).getTask();
                        i = BSPNetUtils.getNextAvailable(i);
                        GroomServer.this.assignedPeerNames.put(task.getTaskID(), Integer.valueOf(i));
                        GroomServer.LOG.info("Launch " + actions.length + " tasks.");
                        GroomServer.this.startNewTask((LaunchTaskAction) groomServerAction);
                    } else {
                        GroomServer.LOG.info("Kill " + actions.length + " tasks.");
                        KillTaskAction killTaskAction = (KillTaskAction) groomServerAction;
                        if (GroomServer.this.tasks.containsKey(killTaskAction.getTaskID())) {
                            TaskInProgress taskInProgress = GroomServer.this.tasks.get(killTaskAction.getTaskID());
                            taskInProgress.taskStatus.setRunState(TaskStatus.State.FAILED);
                            try {
                                taskInProgress.killAndCleanup(false);
                            } catch (IOException e) {
                                throw new DirectiveException("Error when killing a TaskInProgress.", e);
                            }
                        } else {
                            continue;
                        }
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hama/bsp/GroomServer$Instructor.class */
    public class Instructor extends Thread {
        final BlockingQueue<Directive> buffer;
        final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers;

        private Instructor() {
            this.buffer = new LinkedBlockingQueue();
            this.handlers = new ConcurrentHashMap();
        }

        public void bind(Class<? extends Directive> cls, DirectiveHandler directiveHandler) {
            this.handlers.putIfAbsent(cls, directiveHandler);
        }

        public void put(Directive directive) {
            try {
                this.buffer.put(directive);
            } catch (InterruptedException e) {
                GroomServer.LOG.error("Unable to put directive into queue.", e);
                Thread.currentThread().interrupt();
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Directive take = this.buffer.take();
                    this.handlers.get(take.getClass()).handle(take);
                } catch (InterruptedException e) {
                    GroomServer.LOG.error("Unable to retrieve directive from the queue.", e);
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    GroomServer.LOG.error("Fail to execute directive.", e2);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hama/bsp/GroomServer$RunningJob.class */
    public static class RunningJob {
        private BSPJobID jobid;
        private Path jobFile;
        boolean localized = false;
        Set<TaskInProgress> tasks = new HashSet();
        boolean keepJobFiles = false;

        RunningJob(BSPJobID bSPJobID, Path path) {
            this.jobid = bSPJobID;
            this.jobFile = path;
        }

        Path getJobFile() {
            return this.jobFile;
        }

        BSPJobID getJobId() {
            return this.jobid;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hama/bsp/GroomServer$State.class */
    public enum State {
        NORMAL,
        COMPUTE,
        SYNC,
        BARRIER,
        STALE,
        INTERRUPTED,
        DENIED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hama/bsp/GroomServer$TaskInProgress.class */
    public class TaskInProgress {
        Task task;
        BSPJob jobConf;
        BSPTaskRunner runner;
        private TaskStatus taskStatus;
        volatile boolean done = false;
        volatile boolean wasKilled = false;
        BSPJob localJobConf = null;

        public TaskInProgress(Task task, BSPJob bSPJob, String str) {
            this.task = task;
            this.jobConf = bSPJob;
            this.taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(), 0.0f, TaskStatus.State.UNASSIGNED, "init", str, TaskStatus.Phase.STARTING, task.getCounters());
        }

        private void localizeTask(Task task) throws IOException {
            Path localPath = this.jobConf.getLocalPath("groomServer/" + task.getTaskID() + "/job.xml");
            Path localPath2 = this.jobConf.getLocalPath("groomServer/" + task.getTaskID() + "/job.jar");
            GroomServer.this.systemFS.copyToLocalFile(new Path(task.getJobFile()), localPath);
            task.setJobFile(localPath.toString());
            this.localJobConf = new BSPJob(task.getJobID(), localPath.toString());
            this.localJobConf.set("bsp.task.id", task.getTaskID().toString());
            String jar = this.localJobConf.getJar();
            if (jar != null) {
                GroomServer.this.systemFS.copyToLocalFile(new Path(jar), localPath2);
                this.localJobConf.setJar(localPath2.toString());
            }
            task.setConf(this.localJobConf);
        }

        public synchronized void setJobConf(BSPJob bSPJob) {
            this.jobConf = bSPJob;
        }

        public synchronized BSPJob getJobConf() {
            return this.localJobConf;
        }

        public void launchTask() throws IOException {
            localizeTask(this.task);
            this.taskStatus.setRunState(TaskStatus.State.RUNNING);
            this.runner = this.task.createRunner(GroomServer.this);
            this.runner.start();
            GroomServer.LOG.info("Task '" + this.task.getTaskID().toString() + "' has started.");
        }

        public synchronized void killAndCleanup(boolean z) throws IOException {
            if (z) {
                GroomServer.access$512(GroomServer.this, 1);
                this.taskStatus.setRunState(TaskStatus.State.FAILED);
            } else {
                this.taskStatus.setRunState(TaskStatus.State.KILLED);
            }
            if (this.runner != null) {
                this.runner.killBsp();
            }
        }

        public Task getTask() {
            return this.task;
        }

        public synchronized TaskStatus getStatus() {
            return this.taskStatus;
        }

        public TaskStatus.State getRunState() {
            return this.taskStatus.getRunState();
        }

        public boolean wasKilled() {
            return this.wasKilled;
        }

        public boolean equals(Object obj) {
            return (obj instanceof TaskInProgress) && this.task.getTaskID().equals(((TaskInProgress) obj).getTask().getTaskID());
        }

        public int hashCode() {
            return this.task.getTaskID().hashCode();
        }

        public void reportProgress(TaskStatus taskStatus) {
            if (this.done) {
                GroomServer.LOG.info(this.task.getTaskID() + " Ignoring status-update since " + (this.done ? "task is 'done'" : "runState: " + this.taskStatus.getRunState()));
            } else {
                this.taskStatus.statusUpdate(taskStatus);
            }
        }

        public void reportDone() {
            if (this.taskStatus.getRunState() != TaskStatus.State.FAILED) {
                this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
            }
            this.taskStatus.setFinishTime(System.currentTimeMillis());
            this.done = true;
            this.runner.killBsp();
            GroomServer.LOG.info("Task " + this.task.getTaskID() + " is done.");
        }

        public void jobHasFinished(boolean z) throws IOException {
            synchronized (this) {
                if (getRunState() == TaskStatus.State.RUNNING || getRunState() == TaskStatus.State.UNASSIGNED || getRunState() == TaskStatus.State.COMMIT_PENDING) {
                    killAndCleanup(z);
                }
            }
        }
    }

    public GroomServer(Configuration configuration) throws IOException {
        LOG.info("groom start");
        this.conf = configuration;
        this.bspMasterAddr = BSPMaster.getAddress(configuration);
        if (this.bspMasterAddr == null) {
            System.out.println(BSPMaster.localModeMessage);
            LOG.info(BSPMaster.localModeMessage);
            System.exit(0);
        }
        try {
            zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(configuration), configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
        } catch (IOException e) {
            LOG.error("Exception during reinitialization!", e);
        }
    }

    public synchronized void initialize() throws IOException {
        if (this.conf.get(Constants.PEER_HOST) != null) {
            this.localHostname = this.conf.get(Constants.PEER_HOST);
        }
        if (this.localHostname == null) {
            this.localHostname = DNS.getDefaultHost(this.conf.get("bsp.dns.interface", "default"), this.conf.get("bsp.dns.nameserver", "default"));
        }
        checkLocalDirs(getLocalDirs());
        deleteLocalFiles(SUBDIR);
        this.tasks.clear();
        this.runningJobs = new TreeMap();
        this.runningTasks = new ConcurrentHashMap();
        this.finishedTasks = new LinkedHashMap();
        this.conf.set(Constants.PEER_HOST, this.localHostname);
        this.conf.set(Constants.GROOM_RPC_HOST, this.localHostname);
        this.maxCurrentTasks = this.conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3);
        int i = -1;
        String str = null;
        if (false == this.initialized) {
            str = this.conf.get(Constants.GROOM_RPC_HOST, "0.0.0.0");
            i = this.conf.getInt(Constants.GROOM_RPC_PORT, Constants.DEFAULT_GROOM_RPC_PORT);
            if (-1 == i || null == str) {
                throw new IllegalArgumentException("Error rpc address " + str + " port" + i);
            }
            this.workerServer = RPC.getServer(this, str, i, this.conf);
            this.workerServer.start();
            this.rpcServer = str + ":" + i;
            LOG.info("Worker rpc server --> " + this.rpcServer);
        }
        InetSocketAddress createSocketAddr = NetUtils.createSocketAddr(NetUtils.getServerAddress(this.conf, "bsp.groom.report.bindAddress", "bsp.groom.report.port", "bsp.groom.report.address"));
        this.taskReportServer = RPC.getServer(this, createSocketAddr.getHostName(), createSocketAddr.getPort(), 10, false, this.conf);
        this.taskReportServer.start();
        this.taskReportAddress = this.taskReportServer.getListenerAddress();
        this.conf.set("bsp.groom.report.address", this.taskReportAddress.getHostName() + ":" + this.taskReportAddress.getPort());
        LOG.info("GroomServer up at: " + this.taskReportAddress);
        this.groomHostName = str;
        this.groomServerName = "groomd_" + this.rpcServer.replace(':', '_');
        LOG.info("Starting groom: " + this.rpcServer);
        this.masterClient = (MasterProtocol) RPC.waitForProxy(MasterProtocol.class, 1L, this.bspMasterAddr, this.conf);
        if (-1 == i || null == str) {
            throw new IllegalArgumentException("Error rpc address " + str + " port" + i);
        }
        if (!this.masterClient.register(new GroomServerStatus(this.groomServerName, cloneAndResetRunningTaskStatuses(), this.failures, this.maxCurrentTasks, this.rpcServer, this.groomHostName))) {
            LOG.error("There is a problem in establishing communication link with BSPMaster");
            throw new IOException("There is a problem in establishing communication link with BSPMaster.");
        }
        this.instructor = new Instructor();
        this.instructor.bind(DispatchTasksDirective.class, new DispatchTasksHandler());
        this.instructor.start();
        this.running = true;
        this.initialized = true;
    }

    public synchronized InetSocketAddress getTaskTrackerReportAddress() {
        return this.taskReportAddress;
    }

    @Override // org.apache.hama.ipc.GroomProtocol
    public void dispatch(Directive directive) throws IOException {
        if (!this.instructor.isAlive()) {
            throw new IOException();
        }
        this.instructor.put(directive);
    }

    private static void checkLocalDirs(String[] strArr) throws DiskChecker.DiskErrorException {
        boolean z = false;
        LOG.debug(strArr);
        if (strArr != null) {
            for (int i = 0; i < strArr.length; i++) {
                try {
                    LOG.info(strArr[i]);
                    DiskChecker.checkDir(new File(strArr[i]));
                    z = true;
                } catch (DiskChecker.DiskErrorException e) {
                    LOG.warn("BSP Processor local " + e.getMessage());
                }
            }
        }
        if (!z) {
            throw new DiskChecker.DiskErrorException("all local directories are not writable");
        }
    }

    public String[] getLocalDirs() {
        return this.conf.getStrings("bsp.local.dir");
    }

    public void deleteLocalFiles() throws IOException {
        for (String str : getLocalDirs()) {
            FileSystem.getLocal(this.conf).delete(new Path(str), true);
        }
    }

    public void deleteLocalFiles(String str) throws IOException {
        try {
            for (String str2 : getLocalDirs()) {
                FileSystem.getLocal(this.conf).delete(new Path(str2, str), true);
            }
        } catch (NullPointerException e) {
            LOG.info(e);
        }
    }

    public void cleanupStorage() throws IOException {
        deleteLocalFiles();
    }

    private void startCleanupThreads() throws IOException {
    }

    public State offerService() throws Exception {
        while (this.running && !this.shuttingDown) {
            try {
                ArrayList arrayList = new ArrayList();
                Iterator<Map.Entry<TaskAttemptID, TaskInProgress>> it = this.runningTasks.entrySet().iterator();
                while (it.hasNext()) {
                    arrayList.add(it.next().getValue().getStatus());
                }
                doReport(arrayList);
                Thread.sleep(REPORT_INTERVAL);
            } catch (InterruptedException e) {
            }
            try {
                if (this.justInited) {
                    String systemDir = this.masterClient.getSystemDir();
                    if (systemDir == null) {
                        LOG.error("Fail to get system directory.");
                        throw new IOException("Fail to get system directory.");
                        break;
                    }
                    this.systemDirectory = new Path(systemDir);
                    this.systemFS = this.systemDirectory.getFileSystem(this.conf);
                }
                this.justInited = false;
            } catch (RemoteException e2) {
                return State.DENIED;
            } catch (DiskChecker.DiskErrorException e3) {
                LOG.error("Exiting groom server for disk error:\n" + StringUtils.stringifyException(e3));
                return State.STALE;
            } catch (Exception e4) {
                LOG.error("Caught exception: " + StringUtils.stringifyException(e4));
            }
            Thread.sleep(REPORT_INTERVAL);
        }
        return State.NORMAL;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startNewTask(LaunchTaskAction launchTaskAction) {
        Task task = launchTaskAction.getTask();
        BSPJob bSPJob = null;
        try {
            bSPJob = new BSPJob(task.getJobID(), task.getJobFile());
        } catch (IOException e) {
            LOG.error(e);
        }
        TaskInProgress taskInProgress = new TaskInProgress(task, bSPJob, this.groomServerName);
        synchronized (this) {
            this.tasks.put(task.getTaskID(), taskInProgress);
            this.runningTasks.put(task.getTaskID(), taskInProgress);
        }
        try {
            localizeJob(taskInProgress);
        } catch (Throwable th) {
            LOG.warn("Error initializing " + taskInProgress.getTask().getTaskID() + ":\n" + StringUtils.stringifyException(th));
            try {
                taskInProgress.killAndCleanup(true);
            } catch (IOException e2) {
                LOG.info("Error cleaning up " + taskInProgress.getTask().getTaskID() + ":\n" + StringUtils.stringifyException(e2));
            }
        }
    }

    public void doReport(List<TaskStatus> list) {
        GroomServerStatus groomServerStatus = new GroomServerStatus(this.groomServerName, updateTaskStatuses(list), this.failures, this.maxCurrentTasks, this.rpcServer, this.groomHostName);
        try {
            if (!this.masterClient.report(new ReportGroomStatusDirective(groomServerStatus))) {
                LOG.warn("Fail to renew BSPMaster's GroomServerStatus.  groom name: " + groomServerStatus.getGroomName() + " rpc server:" + this.rpcServer);
            }
        } catch (IOException e) {
            LOG.error("Fail to communicate with BSPMaster for reporting.", e);
        }
    }

    public List<TaskStatus> updateTaskStatuses(List<TaskStatus> list) {
        ArrayList arrayList = new ArrayList();
        for (TaskStatus taskStatus : list) {
            if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED || taskStatus.getRunState() == TaskStatus.State.FAILED) {
                synchronized (this.finishedTasks) {
                    TaskInProgress remove = this.runningTasks.remove(taskStatus.getTaskId());
                    arrayList.add((TaskStatus) taskStatus.clone());
                    this.finishedTasks.put(taskStatus.getTaskId(), remove);
                }
            } else if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
                arrayList.add((TaskStatus) taskStatus.clone());
            }
        }
        return arrayList;
    }

    private void localizeJob(TaskInProgress taskInProgress) throws IOException {
        BSPJob bSPJob;
        Task task = taskInProgress.getTask();
        this.conf.addResource(task.getJobFile());
        BSPJob bSPJob2 = new BSPJob((HamaConfiguration) this.conf);
        Path localPath = bSPJob2.getLocalPath("groomServer/" + task.getTaskID() + "/job.xml");
        RunningJob addTaskToJob = addTaskToJob(task.getJobID(), localPath, taskInProgress);
        synchronized (addTaskToJob) {
            if (addTaskToJob.localized) {
                HamaConfiguration hamaConfiguration = new HamaConfiguration();
                hamaConfiguration.addResource(addTaskToJob.getJobFile());
                bSPJob = new BSPJob(hamaConfiguration, addTaskToJob.getJobId().toString());
            } else {
                LocalFileSystem local = FileSystem.getLocal(this.conf);
                Path parent = localPath.getParent();
                if (local.exists(parent)) {
                    local.delete(parent, true);
                    if (!local.mkdirs(parent)) {
                        throw new IOException("Not able to create job directory " + parent.toString());
                    }
                }
                Path localPath2 = bSPJob2.getLocalPath("groomServer/" + task.getTaskID() + "/job.jar");
                this.systemFS.copyToLocalFile(new Path(task.getJobFile()), localPath);
                HamaConfiguration hamaConfiguration2 = new HamaConfiguration();
                hamaConfiguration2.addResource(localPath);
                bSPJob = new BSPJob(hamaConfiguration2, task.getJobID().toString());
                Path path = null;
                if (bSPJob.getJar() != null) {
                    path = new Path(bSPJob.getJar());
                } else {
                    LOG.warn("No jar file for job " + task.getJobID() + " has been defined!");
                }
                bSPJob.setJar(localPath2.toString());
                if (path != null) {
                    this.systemFS.copyToLocalFile(path, localPath2);
                    File file = new File(new File(localPath.toString()).getParent(), "work");
                    if (!file.mkdirs() && !file.isDirectory()) {
                        throw new IOException("Mkdirs failed to create " + file.toString());
                    }
                    RunJar.unJar(new File(localPath2.toString()), file);
                }
                addTaskToJob.localized = true;
            }
        }
        launchTaskForJob(taskInProgress, bSPJob);
    }

    private void launchTaskForJob(TaskInProgress taskInProgress, BSPJob bSPJob) {
        try {
            taskInProgress.setJobConf(bSPJob);
            taskInProgress.launchTask();
        } catch (Throwable th) {
            taskInProgress.taskStatus.setRunState(TaskStatus.State.FAILED);
            LOG.info(StringUtils.stringifyException(th));
        }
    }

    private RunningJob addTaskToJob(BSPJobID bSPJobID, Path path, TaskInProgress taskInProgress) {
        RunningJob runningJob;
        RunningJob runningJob2;
        synchronized (this.runningJobs) {
            if (this.runningJobs.containsKey(bSPJobID)) {
                runningJob = this.runningJobs.get(bSPJobID);
            } else {
                runningJob = new RunningJob(bSPJobID, path);
                runningJob.localized = false;
                runningJob.tasks = new HashSet();
                runningJob.jobFile = path;
                this.runningJobs.put(bSPJobID, runningJob);
            }
            runningJob.tasks.add(taskInProgress);
            runningJob2 = runningJob;
        }
        return runningJob2;
    }

    private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses() {
        ArrayList arrayList = new ArrayList(this.runningTasks.size());
        Iterator<TaskInProgress> it = this.runningTasks.values().iterator();
        while (it.hasNext()) {
            arrayList.add((TaskStatus) it.next().getStatus().clone());
        }
        return arrayList;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            initialize();
            startCleanupThreads();
            boolean z = false;
            while (this.running && !this.shuttingDown && !z) {
                boolean z2 = false;
                while (this.running && !z2 && !this.shuttingDown && !z) {
                    try {
                        State offerService = offerService();
                        if (offerService == State.STALE) {
                            z2 = true;
                        } else if (offerService == State.DENIED) {
                            z = true;
                        }
                    } catch (Exception e) {
                        if (!this.shuttingDown) {
                            LOG.info("Lost connection to BSP Master [" + this.bspMasterAddr + "].  Retrying...", e);
                            try {
                                Thread.sleep(5000L);
                            } catch (InterruptedException e2) {
                            }
                        }
                    }
                }
                if (this.shuttingDown) {
                    return;
                }
                LOG.warn("Reinitializing local state");
                initialize();
            }
        } catch (IOException e3) {
            LOG.error("Got fatal exception while reinitializing GroomServer: " + StringUtils.stringifyException(e3));
        }
    }

    public synchronized void shutdown() throws IOException {
        this.shuttingDown = true;
        close();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.running = false;
        this.initialized = false;
        cleanupStorage();
        this.workerServer.stop();
        RPC.stopProxy(this.masterClient);
        if (this.taskReportServer != null) {
            this.taskReportServer.stop();
            this.taskReportServer = null;
        }
    }

    public static Thread startGroomServer(GroomServer groomServer) {
        return startGroomServer(groomServer, "regionserver" + groomServer.groomServerName);
    }

    public static Thread startGroomServer(GroomServer groomServer, String str) {
        Thread thread = new Thread(groomServer);
        thread.setName(str);
        thread.start();
        return thread;
    }

    public boolean isRunning() {
        return this.running;
    }

    public static GroomServer constructGroomServer(Class<? extends GroomServer> cls, Configuration configuration) {
        try {
            return cls.getConstructor(Configuration.class).newInstance(configuration);
        } catch (Exception e) {
            throw new RuntimeException("Failed construction of Master: " + cls.toString(), e);
        }
    }

    public long getProtocolVersion(String str, long j) throws IOException {
        if (str.equals(GroomProtocol.class.getName()) || str.equals(BSPPeerProtocol.class.getName())) {
            return 1L;
        }
        throw new IOException("Unknown protocol to GroomServer: " + str);
    }

    private void purgeTask(TaskInProgress taskInProgress, boolean z) throws IOException {
        if (taskInProgress != null) {
            LOG.info("About to purge task: " + taskInProgress.getTask().getTaskID());
            removeTaskFromJob(taskInProgress.getTask().getJobID(), taskInProgress);
            taskInProgress.jobHasFinished(z);
        }
    }

    private void removeTaskFromJob(BSPJobID bSPJobID, TaskInProgress taskInProgress) {
        synchronized (this.runningJobs) {
            RunningJob runningJob = this.runningJobs.get(bSPJobID);
            if (runningJob == null) {
                LOG.warn("Unknown job " + bSPJobID + " being deleted.");
            } else {
                synchronized (runningJob) {
                    runningJob.tasks.remove(taskInProgress);
                }
            }
        }
    }

    @Override // org.apache.hama.ipc.BSPPeerProtocol
    public Task getTask(TaskAttemptID taskAttemptID) throws IOException {
        TaskInProgress taskInProgress = this.tasks.get(taskAttemptID);
        if (taskInProgress != null) {
            return taskInProgress.getTask();
        }
        return null;
    }

    @Override // org.apache.hama.ipc.BSPPeerProtocol
    public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
        return false;
    }

    @Override // org.apache.hama.ipc.BSPPeerProtocol
    public void fatalError(TaskAttemptID taskAttemptID, String str) throws IOException {
        LOG.fatal("Task: " + taskAttemptID + " - Killed : " + str);
        purgeTask(this.runningTasks.get(taskAttemptID), true);
    }

    @Override // org.apache.hama.ipc.BSPPeerProtocol
    public void fsError(TaskAttemptID taskAttemptID, String str) throws IOException {
        LOG.fatal("Task: " + taskAttemptID + " - Killed due to FSError: " + str);
    }

    @Override // org.apache.hama.ipc.BSPPeerProtocol
    public boolean statusUpdate(TaskAttemptID taskAttemptID, TaskStatus taskStatus) throws IOException, InterruptedException {
        TaskInProgress taskInProgress = this.tasks.get(taskAttemptID);
        if (taskInProgress != null) {
            taskInProgress.reportProgress(taskStatus);
            return true;
        }
        LOG.warn("Progress from unknown child task: " + taskAttemptID);
        return false;
    }

    @Override // org.apache.hama.ipc.BSPPeerProtocol
    public void done(TaskAttemptID taskAttemptID) throws IOException {
        TaskInProgress taskInProgress = this.tasks.get(taskAttemptID);
        if (taskInProgress != null) {
            taskInProgress.reportDone();
        } else {
            LOG.warn("Unknown child task done: " + taskAttemptID + ". Ignored.");
        }
    }

    @Override // org.apache.hama.ipc.BSPPeerProtocol
    public int getAssignedPortNum(TaskAttemptID taskAttemptID) {
        return this.assignedPeerNames.get(taskAttemptID).intValue();
    }

    public void process(WatchedEvent watchedEvent) {
    }

    static /* synthetic */ int access$512(GroomServer groomServer, int i) {
        int i2 = groomServer.failures + i;
        groomServer.failures = i2;
        return i2;
    }
}
