package org.apache.hama.bsp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.TaskStatus;
import org.apache.hama.bsp.sync.MasterSyncClient;
import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
import org.apache.hama.http.HttpServer;
import org.apache.hama.ipc.GroomProtocol;
import org.apache.hama.ipc.JobSubmissionProtocol;
import org.apache.hama.ipc.MasterProtocol;
import org.apache.hama.ipc.RPC;
import org.apache.hama.ipc.Server;
import org.apache.hama.monitor.fd.FDProvider;
import org.apache.hama.monitor.fd.Supervisor;
import org.apache.hama.monitor.fd.UDPSupervisor;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

/* loaded from: input_file:org/apache/hama/bsp/BSPMaster.class */
public class BSPMaster implements JobSubmissionProtocol, MasterProtocol, GroomServerManager, Watcher, MonitorManager {
    public static final String localModeMessage = "Local mode detected, no launch of the daemon needed.";
    private static final int FS_ACCESS_RETRY_PERIOD = 10000;
    private HamaConfiguration conf;
    MasterSyncClient syncClient;
    final AtomicReference<State> state;
    String masterIdentifier;
    private Server masterServer;
    private String host;
    private int port;
    private long startTime;
    private HttpServer infoServer;
    private int infoPort;
    static final String SUBDIR = "bspMaster";
    FileSystem fs;
    Path systemDir;
    private Integer nextJobId;
    private int totalSubmissions;
    private int totalTasks;
    private int totalTaskCapacity;
    private Map<BSPJobID, JobInProgress> jobs;
    private TaskScheduler taskScheduler;
    protected ConcurrentMap<GroomServerStatus, GroomProtocol> groomServers;
    private final List<GroomServerStatus> blackList;
    private Instructor instructor;
    private final List<JobInProgressListener> jobInProgressListeners;
    private final List<GroomStatusListener> groomStatusListeners;
    private final AtomicReference<Supervisor> supervisor;
    TaskCompletionEvent[] EMPTY_EVENTS;
    public static final Log LOG = LogFactory.getLog(BSPMaster.class);
    static long JOBINIT_SLEEP_INTERVAL = 2000;
    static final FsPermission SYSTEM_DIR_PERMISSION = FsPermission.createImmutable(475);
    static final FsPermission SYSTEM_FILE_PERMISSION = FsPermission.createImmutable(448);

    /* loaded from: input_file:org/apache/hama/bsp/BSPMaster$Instructor.class */
    private class Instructor extends Thread {
        private final BlockingQueue<Directive> buffer;
        private final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers;

        private Instructor() {
            this.buffer = new LinkedBlockingQueue();
            this.handlers = new ConcurrentHashMap();
        }

        public void bind(Class<? extends Directive> cls, DirectiveHandler directiveHandler) {
            this.handlers.putIfAbsent(cls, directiveHandler);
        }

        public void put(Directive directive) {
            try {
                this.buffer.put(directive);
            } catch (InterruptedException e) {
                BSPMaster.LOG.error("Fail to put directive into queue.", e);
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Directive take = this.buffer.take();
                    this.handlers.get(take.getClass()).handle(take);
                } catch (InterruptedException e) {
                    BSPMaster.LOG.error("Unable to retrieve directive from the queue.", e);
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    BSPMaster.LOG.error("Fail to execute directive command.", e2);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/BSPMaster$ReportGroomStatusHandler.class */
    private class ReportGroomStatusHandler implements DirectiveHandler {
        private ReportGroomStatusHandler() {
        }

        @Override // org.apache.hama.bsp.DirectiveHandler
        public void handle(Directive directive) throws DirectiveException {
            GroomServerStatus status = ((ReportGroomStatusDirective) directive).getStatus();
            if (!BSPMaster.this.groomServers.containsKey(status)) {
                throw new RuntimeException("GroomServer not found." + status.getGroomName());
            }
            GroomServerStatus groomServerStatus = null;
            Iterator<GroomServerStatus> it = BSPMaster.this.groomServers.keySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                GroomServerStatus next = it.next();
                if (next.equals(status)) {
                    BSPMaster.access$020(BSPMaster.this, next.countTasks());
                    groomServerStatus = status;
                    BSPMaster.this.updateGroomServersKey(next, groomServerStatus);
                    break;
                }
            }
            if (null == groomServerStatus) {
                throw new RuntimeException("BSPMaster contains GroomServerSatus, but fail to retrieve it.");
            }
            BSPMaster.access$012(BSPMaster.this, groomServerStatus.countTasks());
            for (TaskStatus taskStatus : groomServerStatus.getTaskReports()) {
                JobInProgress findJobById = BSPMaster.this.taskScheduler.findJobById(taskStatus.getJobId());
                TaskInProgress findTaskInProgress = findJobById.findTaskInProgress(taskStatus.getTaskId().getTaskID());
                if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED) {
                    findJobById.completedTask(findTaskInProgress, taskStatus);
                    findJobById.getCounters().incrAllCounters(taskStatus.getCounters());
                    Iterator it2 = BSPMaster.this.groomStatusListeners.iterator();
                    while (it2.hasNext()) {
                        ((GroomStatusListener) it2.next()).taskComplete(status, findTaskInProgress);
                    }
                } else if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
                    findJobById.getStatus().setProgress(taskStatus.getSuperstepCount());
                    findJobById.getStatus().setSuperstepCount(taskStatus.getSuperstepCount());
                } else if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
                    if (findJobById.handleFailure(findTaskInProgress)) {
                        BSPMaster.this.recoverTask(findJobById);
                    } else {
                        findJobById.status.setRunState(3);
                        findJobById.failedTask(findTaskInProgress, taskStatus);
                        Iterator it3 = BSPMaster.this.jobInProgressListeners.iterator();
                        while (it3.hasNext()) {
                            try {
                                ((JobInProgressListener) it3.next()).jobRemoved(findJobById);
                            } catch (IOException e) {
                                BSPMaster.LOG.error("Fail to alter scheduler a job is moved.", e);
                            }
                        }
                    }
                }
                if (findJobById.getStatus().getRunState() == 2) {
                    Iterator it4 = BSPMaster.this.jobInProgressListeners.iterator();
                    while (it4.hasNext()) {
                        try {
                            ((JobInProgressListener) it4.next()).jobRemoved(findJobById);
                        } catch (IOException e2) {
                            BSPMaster.LOG.error("Fail to alter scheduler a job is moved.", e2);
                        }
                    }
                } else if (findJobById.getStatus().getRunState() == 1) {
                    findJobById.getStatus().setProgress(taskStatus.getSuperstepCount());
                    findJobById.getStatus().setSuperstepCount(taskStatus.getSuperstepCount());
                } else if (findJobById.getStatus().getRunState() == 5) {
                    try {
                        BSPMaster.this.findGroomServer(groomServerStatus).dispatch(new DispatchTasksDirective(new GroomServerAction[]{new KillTaskAction(taskStatus.getTaskId())}));
                        Iterator it5 = BSPMaster.this.jobInProgressListeners.iterator();
                        while (it5.hasNext()) {
                            try {
                                ((JobInProgressListener) it5.next()).jobRemoved(findJobById);
                            } catch (IOException e3) {
                                BSPMaster.LOG.error("Fail to alter scheduler a job is moved.", e3);
                            }
                        }
                    } catch (IOException e4) {
                        throw new DirectiveException("Error when dispatching kill task action.", e4);
                    }
                } else {
                    continue;
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/BSPMaster$State.class */
    public enum State {
        INITIALIZING,
        RUNNING
    }

    public BSPMaster(HamaConfiguration hamaConfiguration) throws IOException, InterruptedException {
        this(hamaConfiguration, generateNewIdentifier());
    }

    BSPMaster(HamaConfiguration hamaConfiguration, String str) throws IOException, InterruptedException {
        this.syncClient = null;
        this.state = new AtomicReference<>(State.INITIALIZING);
        this.fs = null;
        this.systemDir = null;
        this.nextJobId = 1;
        this.totalSubmissions = 0;
        this.totalTasks = 0;
        this.jobs = new TreeMap();
        this.groomServers = new ConcurrentHashMap();
        this.blackList = new CopyOnWriteArrayList();
        this.jobInProgressListeners = new CopyOnWriteArrayList();
        this.groomStatusListeners = new CopyOnWriteArrayList();
        this.supervisor = new AtomicReference<>();
        this.EMPTY_EVENTS = new TaskCompletionEvent[0];
        this.conf = hamaConfiguration;
        this.masterIdentifier = str;
        this.taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(hamaConfiguration.getClass("bsp.master.taskscheduler", SimpleTaskScheduler.class, TaskScheduler.class), hamaConfiguration);
        InetSocketAddress address = getAddress(hamaConfiguration);
        if (address == null) {
            System.out.println(localModeMessage);
            LOG.info(localModeMessage);
            return;
        }
        this.host = address.getHostName();
        this.port = address.getPort();
        LOG.info("RPC BSPMaster: host " + this.host + " port " + this.port);
        this.startTime = System.currentTimeMillis();
        this.masterServer = RPC.getServer(this, this.host, this.port, hamaConfiguration);
        this.infoPort = hamaConfiguration.getInt("bsp.http.infoserver.port", 40013);
        this.infoServer = new HttpServer("bspmaster", this.host, this.infoPort, true, hamaConfiguration);
        this.infoServer.setAttribute("bsp.master", this);
        this.infoServer.start();
        if (hamaConfiguration.getBoolean("bsp.monitor.fd.enabled", false)) {
            this.supervisor.set(FDProvider.createSupervisor(hamaConfiguration.getClass("bsp.monitor.fd.supervisor.class", UDPSupervisor.class, Supervisor.class), hamaConfiguration));
        }
        while (!Thread.currentThread().isInterrupted()) {
            try {
                if (this.fs == null) {
                    this.fs = FileSystem.get(hamaConfiguration);
                }
            } catch (IOException e) {
                LOG.error("Can't get connection to Hadoop Namenode!", e);
            }
            try {
                if (this.systemDir == null) {
                    this.systemDir = new Path(getSystemDir());
                }
                LOG.info("Cleaning up the system directory");
                LOG.info(this.systemDir);
                this.fs.delete(this.systemDir, true);
            } catch (AccessControlException e2) {
                LOG.warn("Failed to operate on bsp.system.dir (" + this.systemDir + ") because of permissions.");
                LOG.warn("Manually delete the bsp.system.dir (" + this.systemDir + ") and then start the BSPMaster.");
                LOG.warn("Bailing out ... ");
                throw e2;
            } catch (IOException e3) {
                LOG.info("problem cleaning system directory: " + this.systemDir, e3);
            }
            if (FileSystem.mkdirs(this.fs, this.systemDir, new FsPermission(SYSTEM_DIR_PERMISSION))) {
                break;
            }
            LOG.error("Mkdirs failed to create " + this.systemDir);
            LOG.info(SUBDIR);
            Thread.sleep(10000L);
        }
        if (Thread.currentThread().isInterrupted()) {
            throw new InterruptedException();
        }
        deleteLocalFiles(SUBDIR);
    }

    @Override // org.apache.hama.ipc.MasterProtocol
    public boolean register(GroomServerStatus groomServerStatus) throws IOException {
        GroomProtocol groomProtocol;
        if (null == groomServerStatus) {
            LOG.error("No groom server status.");
            throw new NullPointerException("No groom server status.");
        }
        Exception exc = null;
        try {
            groomProtocol = (GroomProtocol) RPC.waitForProxy(GroomProtocol.class, 1L, resolveWorkerAddress(groomServerStatus.getRpcServer()), this.conf);
        } catch (ClassCastException e) {
            exc = e;
        } catch (IllegalArgumentException e2) {
            exc = e2;
        } catch (NullPointerException e3) {
            exc = e3;
        } catch (UnsupportedOperationException e4) {
            exc = e4;
        } catch (Exception e5) {
            exc = e5;
        }
        if (null == groomProtocol) {
            LOG.warn("Fail to create Worker client at host");
            return false;
        }
        this.groomServers.putIfAbsent(groomServerStatus, groomProtocol);
        if (null != exc) {
            LOG.error("Fail to register GroomServer " + groomServerStatus.getGroomName(), exc);
            return false;
        }
        Iterator<GroomStatusListener> it = this.groomStatusListeners.iterator();
        while (it.hasNext()) {
            it.next().groomServerRegistered(groomServerStatus);
        }
        LOG.info(groomServerStatus.getGroomName() + " is added.");
        return true;
    }

    public static InetSocketAddress resolveWorkerAddress(String str) {
        return new InetSocketAddress(str.split(":")[0], Integer.parseInt(str.split(":")[1]));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateGroomServersKey(GroomServerStatus groomServerStatus, GroomServerStatus groomServerStatus2) {
        synchronized (this.groomServers) {
            this.groomServers.put(groomServerStatus2, this.groomServers.remove(groomServerStatus));
        }
    }

    @Override // org.apache.hama.ipc.MasterProtocol
    public boolean report(Directive directive) throws IOException {
        this.instructor.put(directive);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Path getSystemDirectoryForJob(BSPJobID bSPJobID) {
        return new Path(getSystemDir(), bSPJobID.toString());
    }

    String[] getLocalDirs() throws IOException {
        return this.conf.getStrings("bsp.local.dir");
    }

    void deleteLocalFiles() throws IOException {
        for (String str : getLocalDirs()) {
            FileSystem.getLocal(this.conf).delete(new Path(str), true);
        }
    }

    void deleteLocalFiles(String str) throws IOException {
        try {
            for (String str2 : getLocalDirs()) {
                FileSystem.getLocal(this.conf).delete(new Path(str2, str), true);
            }
        } catch (NullPointerException e) {
            LOG.info(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Path getLocalPath(String str) throws IOException {
        return this.conf.getLocalPath("bsp.local.dir", str);
    }

    public static BSPMaster startMaster(HamaConfiguration hamaConfiguration) throws IOException, InterruptedException {
        return startMaster(hamaConfiguration, generateNewIdentifier());
    }

    public static BSPMaster startMaster(HamaConfiguration hamaConfiguration, String str) throws IOException, InterruptedException {
        BSPMaster bSPMaster = new BSPMaster(hamaConfiguration, str);
        bSPMaster.initZK(hamaConfiguration);
        bSPMaster.taskScheduler.setGroomServerManager(bSPMaster);
        bSPMaster.taskScheduler.setMonitorManager(bSPMaster);
        if (hamaConfiguration.getBoolean("bsp.monitor.fd.enabled", false)) {
            bSPMaster.supervisor.get().start();
        }
        bSPMaster.taskScheduler.start();
        return bSPMaster;
    }

    private void initZK(HamaConfiguration hamaConfiguration) {
        this.syncClient = new ZKSyncBSPMasterClient();
        this.syncClient.init(hamaConfiguration);
    }

    public MasterSyncClient getSyncClient() {
        return this.syncClient;
    }

    public static InetSocketAddress getAddress(Configuration configuration) {
        String str = configuration.get("bsp.master.address", "localhost");
        if (str.equals("local")) {
            return null;
        }
        return NetUtils.createSocketAddr(str, configuration.getInt("bsp.master.port", 40000));
    }

    private static String generateNewIdentifier() {
        return new SimpleDateFormat("yyyyMMddHHmm").format(new Date());
    }

    public void offerService() throws InterruptedException, IOException {
        this.masterServer.start();
        this.state.set(State.RUNNING);
        this.instructor = new Instructor();
        this.instructor.bind(ReportGroomStatusDirective.class, new ReportGroomStatusHandler());
        this.instructor.start();
        LOG.info("Starting RUNNING");
        this.masterServer.join();
        LOG.info("Stopped RPC Master server.");
    }

    @Override // org.apache.hama.ipc.VersionedProtocol
    public long getProtocolVersion(String str, long j) throws IOException {
        if (str.equals(MasterProtocol.class.getName()) || str.equals(JobSubmissionProtocol.class.getName())) {
            return 1L;
        }
        throw new IOException("Unknown protocol to BSPMaster: " + str);
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public BSPJobID getNewJobId() throws IOException {
        int intValue;
        synchronized (this.nextJobId) {
            intValue = this.nextJobId.intValue();
            this.nextJobId = Integer.valueOf(intValue + 1);
        }
        return new BSPJobID(this.masterIdentifier, intValue);
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobStatus submitJob(BSPJobID bSPJobID, String str) throws IOException {
        if (this.jobs.containsKey(bSPJobID)) {
            LOG.info("The job (" + bSPJobID + ") was already submitted");
            return this.jobs.get(bSPJobID).getStatus();
        }
        JobInProgress jobInProgress = new JobInProgress(bSPJobID, new Path(str), this, this.conf);
        this.totalSubmissions++;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Submitting job number = " + this.totalSubmissions + " id = " + jobInProgress.getJobID());
        }
        return addJob(bSPJobID, jobInProgress);
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol, org.apache.hama.bsp.GroomServerManager
    public ClusterStatus getClusterStatus(boolean z) {
        HashMap hashMap = null;
        int size = this.groomServers.size();
        if (z) {
            hashMap = new HashMap();
            Iterator<Map.Entry<GroomServerStatus, GroomProtocol>> it = this.groomServers.entrySet().iterator();
            while (it.hasNext()) {
                GroomServerStatus key = it.next().getKey();
                hashMap.put(key.getGroomHostName() + ":" + Constants.DEFAULT_GROOM_INFO_SERVER, key);
            }
        }
        this.totalTaskCapacity = this.conf.getInt(Constants.MAX_TASKS, this.conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3) * size);
        return z ? new ClusterStatus(hashMap, this.totalTasks, this.totalTaskCapacity, this.state.get()) : new ClusterStatus(size, this.totalTasks, this.totalTaskCapacity, this.state.get());
    }

    @Override // org.apache.hama.bsp.GroomServerManager
    public GroomProtocol findGroomServer(GroomServerStatus groomServerStatus) {
        return this.groomServers.get(groomServerStatus);
    }

    @Override // org.apache.hama.bsp.GroomServerManager
    public Collection<GroomProtocol> findGroomServers() {
        return this.groomServers.values();
    }

    @Override // org.apache.hama.bsp.GroomServerManager
    public Collection<GroomServerStatus> groomServerStatusKeySet() {
        return this.groomServers.keySet();
    }

    @Override // org.apache.hama.bsp.GroomServerManager
    public void addJobInProgressListener(JobInProgressListener jobInProgressListener) {
        this.jobInProgressListeners.add(jobInProgressListener);
    }

    @Override // org.apache.hama.bsp.GroomServerManager
    public void removeJobInProgressListener(JobInProgressListener jobInProgressListener) {
        this.jobInProgressListeners.remove(jobInProgressListener);
    }

    @Override // org.apache.hama.bsp.GroomServerManager
    public void addGroomStatusListener(GroomStatusListener groomStatusListener) {
        this.groomStatusListeners.add(groomStatusListener);
    }

    @Override // org.apache.hama.bsp.GroomServerManager
    public void removeGroomStatusListener(GroomStatusListener groomStatusListener) {
        this.groomStatusListeners.remove(groomStatusListener);
    }

    @Override // org.apache.hama.bsp.GroomServerManager
    public void moveToBlackList(String str) {
        LOG.info("[moveToBlackList()]Host to be moved to black list: " + str);
        for (GroomServerStatus groomServerStatus : groomServerStatusKeySet()) {
            LOG.info("[moveToBlackList()]GroomServerStatus's host name:" + groomServerStatus.getGroomHostName() + " host:" + str);
            if (groomServerStatus.getGroomHostName().equals(str)) {
                if (!this.groomServers.remove(groomServerStatus, findGroomServer(groomServerStatus))) {
                    LOG.error("Fail to remove " + str + " out of groom server cache!");
                }
                this.blackList.add(groomServerStatus);
                LOG.info("[moveToBlackList()] " + str + " is successfully moved to black list.");
            }
        }
    }

    @Override // org.apache.hama.bsp.GroomServerManager
    public void removeOutOfBlackList(String str) {
        for (GroomServerStatus groomServerStatus : this.blackList) {
            if (groomServerStatus.getGroomHostName().equals(str)) {
                if (this.blackList.remove(groomServerStatus)) {
                    LOG.info("Successfully remove " + str + " out of black list.");
                } else {
                    LOG.error("Fail to remove " + str + " out of black list.");
                }
            }
        }
    }

    @Override // org.apache.hama.bsp.GroomServerManager
    public Collection<GroomServerStatus> alive() {
        return groomServerStatusKeySet();
    }

    @Override // org.apache.hama.bsp.GroomServerManager
    public Collection<GroomServerStatus> blackList() {
        return this.blackList;
    }

    @Override // org.apache.hama.bsp.GroomServerManager
    public GroomServerStatus findInBlackList(String str) {
        for (GroomServerStatus groomServerStatus : this.blackList) {
            if (str.equals(groomServerStatus.getGroomHostName())) {
                return groomServerStatus;
            }
        }
        return null;
    }

    public String getBSPMasterName() {
        return this.host + ":" + this.port;
    }

    public long getStartTime() {
        return this.startTime;
    }

    public String getBSPMasterIdentifier() {
        return this.masterIdentifier;
    }

    public int getHttpPort() {
        return this.infoPort;
    }

    private synchronized JobStatus addJob(BSPJobID bSPJobID, JobInProgress jobInProgress) {
        synchronized (this.jobs) {
            this.jobs.put(jobInProgress.getProfile().getJobID(), jobInProgress);
            Iterator<JobInProgressListener> it = this.jobInProgressListeners.iterator();
            while (it.hasNext()) {
                try {
                    it.next().jobAdded(jobInProgress);
                } catch (IOException e) {
                    LOG.error("Fail to alter Scheduler a job is added.", e);
                }
            }
        }
        return jobInProgress.getStatus();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void recoverTask(JobInProgress jobInProgress) {
        this.totalSubmissions++;
        Iterator<JobInProgressListener> it = this.jobInProgressListeners.iterator();
        while (it.hasNext()) {
            try {
                it.next().recoverTaskInJob(jobInProgress);
            } catch (IOException e) {
                LOG.error("Fail to alter Scheduler a job is added.", e);
            }
        }
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobStatus[] jobsToComplete() throws IOException {
        return getJobStatus(this.jobs.values(), true);
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobStatus[] getAllJobs() throws IOException {
        LOG.debug("returns all jobs: " + this.jobs.size());
        return getJobStatus(this.jobs.values(), false);
    }

    private static synchronized JobStatus[] getJobStatus(Collection<JobInProgress> collection, boolean z) {
        if (collection == null) {
            return new JobStatus[0];
        }
        ArrayList arrayList = new ArrayList();
        for (JobInProgress jobInProgress : collection) {
            JobStatus status = jobInProgress.getStatus();
            status.setStartTime(jobInProgress.getStartTime());
            status.setNumOfTasks(jobInProgress.getNumOfTasks());
            status.setUsername(jobInProgress.getProfile().getUser());
            status.setName(jobInProgress.getJobName());
            if (!z) {
                arrayList.add(status);
            } else if (status.getRunState() == 1 || status.getRunState() == 4) {
                arrayList.add(status);
            }
        }
        return (JobStatus[]) arrayList.toArray(new JobStatus[arrayList.size()]);
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public synchronized String getFilesystemName() throws IOException {
        if (this.fs == null) {
            throw new IllegalStateException("FileSystem object not available yet");
        }
        return this.fs.getUri().toString();
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol, org.apache.hama.ipc.MasterProtocol
    public String getSystemDir() {
        return this.fs.makeQualified(new Path(this.conf.get("bsp.system.dir", "/tmp/hadoop/bsp/system"))).toString();
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobProfile getJobProfile(BSPJobID bSPJobID) throws IOException {
        synchronized (this) {
            JobInProgress jobInProgress = this.jobs.get(bSPJobID);
            if (jobInProgress == null) {
                return null;
            }
            return jobInProgress.getProfile();
        }
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobStatus getJobStatus(BSPJobID bSPJobID) throws IOException {
        synchronized (this) {
            JobInProgress jobInProgress = this.jobs.get(bSPJobID);
            if (jobInProgress == null) {
                return null;
            }
            return jobInProgress.getStatus();
        }
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public void killJob(BSPJobID bSPJobID) throws IOException {
        JobInProgress jobInProgress = this.jobs.get(bSPJobID);
        if (null == jobInProgress) {
            LOG.info("killJob(): JobId " + bSPJobID.toString() + " is not a valid job");
        } else {
            killJob(jobInProgress);
        }
    }

    private static synchronized void killJob(JobInProgress jobInProgress) {
        LOG.info("Killing job " + jobInProgress.getJobID());
        jobInProgress.kill();
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public boolean killTask(TaskAttemptID taskAttemptID, boolean z) throws IOException {
        return false;
    }

    public static BSPMaster constructMaster(Class<? extends BSPMaster> cls, Configuration configuration) {
        try {
            return cls.getConstructor(Configuration.class).newInstance(configuration);
        } catch (Exception e) {
            throw new RuntimeException("Failed construction of Master: " + cls.toString() + (e.getCause() != null ? e.getCause().getMessage() : ""), e);
        }
    }

    public void shutdown() {
        try {
            this.syncClient.close();
        } catch (IOException e) {
            LOG.error("Error closing the sync client", e);
        }
        if (null != this.supervisor.get()) {
            this.supervisor.get().stop();
        }
        this.masterServer.stop();
    }

    public State currentState() {
        return this.state.get();
    }

    public void process(WatchedEvent watchedEvent) {
    }

    @Override // org.apache.hama.bsp.MonitorManager
    public Supervisor supervisor() {
        return this.supervisor.get();
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public TaskCompletionEvent[] getTaskCompletionEvents(BSPJobID bSPJobID, int i, int i2) {
        synchronized (this) {
            JobInProgress jobInProgress = this.jobs.get(bSPJobID);
            if (null == jobInProgress) {
                return null;
            }
            if (jobInProgress.areTasksInited()) {
                return jobInProgress.getTaskCompletionEvents(i, i2);
            }
            return this.EMPTY_EVENTS;
        }
    }

    static /* synthetic */ int access$020(BSPMaster bSPMaster, int i) {
        int i2 = bSPMaster.totalTasks - i;
        bSPMaster.totalTasks = i2;
        return i2;
    }

    static /* synthetic */ int access$012(BSPMaster bSPMaster, int i) {
        int i2 = bSPMaster.totalTasks + i;
        bSPMaster.totalTasks = i2;
        return i2;
    }
}
