package org.apache.hama.bsp;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.ipc.GroomProtocol;
import org.apache.hama.monitor.Federator;
import org.apache.hama.monitor.Metric;
import org.apache.hama.monitor.MetricsRecord;
import org.apache.hama.monitor.Monitor;
import org.apache.hama.monitor.ZKCollector;
import org.apache.hama.monitor.fd.NodeEventListener;
import org.apache.hama.monitor.fd.NodeStatus;
import org.apache.zookeeper.ZooKeeper;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/hama/bsp/SimpleTaskScheduler.class */
public class SimpleTaskScheduler extends TaskScheduler {
    private static final Log LOG = LogFactory.getLog(SimpleTaskScheduler.class);
    public static final String WAIT_QUEUE = "waitQueue";
    public static final String PROCESSING_QUEUE = "processingQueue";
    public static final String FINISHED_QUEUE = "finishedQueue";
    private final AtomicReference<QueueManager> queueManager = new AtomicReference<>();
    private AtomicBoolean initialized = new AtomicBoolean(false);
    private final AtomicReference<Federator> federator = new AtomicReference<>();
    private final ConcurrentMap<String, MetricsRecord> repository = new ConcurrentHashMap();
    private final JobListener jobListener = new JobListener();
    private final JobProcessor jobProcessor = new JobProcessor();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    /* loaded from: input_file:org/apache/hama/bsp/SimpleTaskScheduler$JobListener.class */
    private class JobListener extends JobInProgressListener {
        private JobListener() {
        }

        @Override // org.apache.hama.bsp.JobInProgressListener
        public void jobAdded(JobInProgress jobInProgress) throws IOException {
            ((QueueManager) SimpleTaskScheduler.this.queueManager.get()).initJob(jobInProgress);
            ((QueueManager) SimpleTaskScheduler.this.queueManager.get()).addJob(SimpleTaskScheduler.WAIT_QUEUE, jobInProgress);
        }

        @Override // org.apache.hama.bsp.JobInProgressListener
        public void jobRemoved(JobInProgress jobInProgress) throws IOException {
            ((QueueManager) SimpleTaskScheduler.this.queueManager.get()).moveJob(SimpleTaskScheduler.PROCESSING_QUEUE, SimpleTaskScheduler.FINISHED_QUEUE, jobInProgress);
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/SimpleTaskScheduler$JobProcessor.class */
    private class JobProcessor extends Thread implements Schedulable {
        final ExecutorService sched;

        JobProcessor() {
            super("JobProcessor");
            this.sched = Executors.newCachedThreadPool();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            if (!SimpleTaskScheduler.this.initialized.get()) {
                throw new IllegalStateException("SimpleTaskScheduler initialization is not yet finished!");
            }
            while (SimpleTaskScheduler.this.initialized.get()) {
                Queue<JobInProgress> findQueue = ((QueueManager) SimpleTaskScheduler.this.queueManager.get()).findQueue(SimpleTaskScheduler.WAIT_QUEUE);
                if (null == findQueue) {
                    SimpleTaskScheduler.LOG.error("waitQueue does not exist.");
                    throw new NullPointerException("waitQueue does not exist.");
                }
                JobInProgress removeJob = findQueue.removeJob();
                ((QueueManager) SimpleTaskScheduler.this.queueManager.get()).addJob(SimpleTaskScheduler.PROCESSING_QUEUE, removeJob);
                Collection<GroomServerStatus> groomServerStatusKeySet = SimpleTaskScheduler.this.groomServerManager.get().groomServerStatusKeySet();
                schedule(removeJob, (GroomServerStatus[]) groomServerStatusKeySet.toArray(new GroomServerStatus[groomServerStatusKeySet.size()]));
            }
        }

        @Override // org.apache.hama.bsp.Schedulable
        public void schedule(JobInProgress jobInProgress, GroomServerStatus... groomServerStatusArr) {
            Boolean bool;
            Future submit = this.sched.submit(new TaskWorker(groomServerStatusArr, SimpleTaskScheduler.this.groomServerManager.get().getClusterStatus(false).getGroomServers(), jobInProgress));
            Boolean bool2 = Boolean.FALSE;
            try {
                bool = (Boolean) submit.get();
            } catch (InterruptedException e) {
                bool = Boolean.FALSE;
                SimpleTaskScheduler.LOG.error("Error submitting job", e);
            } catch (ExecutionException e2) {
                bool = Boolean.FALSE;
                SimpleTaskScheduler.LOG.error("Error submitting job", e2);
            }
            if (Boolean.FALSE.equals(bool)) {
                SimpleTaskScheduler.LOG.error(new StringBuffer(512).append("Scheduling of job ").append(jobInProgress.getJobName()).append(" could not be done successfully. Killing it!").toString());
                jobInProgress.kill();
            }
        }

        @Override // java.lang.Thread
        public void interrupt() {
            super.interrupt();
            this.sched.shutdown();
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/SimpleTaskScheduler$JvmCollector.class */
    private class JvmCollector implements Runnable {
        final Federator federator;
        final ZooKeeper zk;

        JvmCollector(Federator federator, ZooKeeper zooKeeper) {
            this.federator = federator;
            this.zk = zooKeeper;
        }

        @Override // java.lang.Runnable
        public void run() {
            Iterator<GroomServerStatus> it = SimpleTaskScheduler.this.groomServerManager.get().groomServerStatusKeySet().iterator();
            while (it.hasNext()) {
                final String groomName = it.next().getGroomName();
                this.federator.register(new Federator.Act(new ZKCollector(this.zk, "jvm", "Jvm metrics.", Monitor.MONITOR_ROOT_PATH + groomName + "/metrics/jvm"), new Federator.CollectorHandler() { // from class: org.apache.hama.bsp.SimpleTaskScheduler.JvmCollector.1
                    @Override // org.apache.hama.monitor.Federator.CollectorHandler
                    public void handle(Future future) {
                        try {
                            MetricsRecord metricsRecord = (MetricsRecord) future.get();
                            if (null != metricsRecord) {
                                if (SimpleTaskScheduler.LOG.isDebugEnabled()) {
                                    for (Metric<?> metric : metricsRecord.metrics()) {
                                        SimpleTaskScheduler.LOG.debug("Metric name:" + metric.name() + " metric value:" + metric.value());
                                    }
                                }
                                SimpleTaskScheduler.this.repository.put(groomName, metricsRecord);
                            }
                        } catch (InterruptedException e) {
                            SimpleTaskScheduler.LOG.warn(e);
                            Thread.currentThread().interrupt();
                        } catch (ExecutionException e2) {
                            SimpleTaskScheduler.LOG.warn(e2.getCause());
                        }
                    }
                }));
            }
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/SimpleTaskScheduler$NodeWatcher.class */
    final class NodeWatcher implements NodeEventListener {
        final GroomServerManager groomManager;
        final TaskScheduler _sched;

        NodeWatcher(GroomServerManager groomServerManager, TaskScheduler taskScheduler) {
            this.groomManager = groomServerManager;
            this._sched = taskScheduler;
        }

        @Override // org.apache.hama.monitor.fd.NodeEventListener
        public NodeStatus[] interest() {
            return new NodeStatus[]{NodeStatus.Dead};
        }

        @Override // org.apache.hama.monitor.fd.NodeEventListener
        public String name() {
            return SimpleTaskScheduler.class.getSimpleName() + "'s " + NodeWatcher.class.getSimpleName();
        }

        @Override // org.apache.hama.monitor.fd.NodeEventListener
        public void notify(NodeStatus nodeStatus, String str) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hama/bsp/SimpleTaskScheduler$TaskWorker.class */
    public class TaskWorker implements Callable<Boolean> {
        private final Map<String, GroomServerStatus> groomStatuses;
        private final int groomNum;
        private final JobInProgress jip;

        TaskWorker(GroomServerStatus[] groomServerStatusArr, int i, JobInProgress jobInProgress) {
            this.groomStatuses = new HashMap(2 * i);
            for (GroomServerStatus groomServerStatus : groomServerStatusArr) {
                this.groomStatuses.put(groomServerStatus.hostName, groomServerStatus);
            }
            this.groomNum = i;
            this.jip = jobInProgress;
            if (null == this.groomStatuses) {
                throw new NullPointerException("Target groom server is not specified.");
            }
            if (-1 == this.groomNum) {
                throw new IllegalArgumentException("Groom number is not specified.");
            }
            if (null == this.jip) {
                throw new NullPointerException("No job is specified.");
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() {
            HashMap hashMap = new HashMap(2 * this.groomStatuses.size());
            HashSet<Task> hashSet = new HashSet(2 * this.jip.tasks.length);
            int i = 0;
            do {
                Task obtainNewTask = this.jip.obtainNewTask(this.groomStatuses);
                if (obtainNewTask == null) {
                    break;
                }
                hashSet.add(obtainNewTask);
                i++;
            } while (i != this.jip.tasks.length);
            if (i != this.jip.tasks.length) {
                return Boolean.FALSE;
            }
            for (Task task : hashSet) {
                GroomServerStatus groomStatusForTask = this.jip.getGroomStatusForTask(task);
                List list = (List) hashMap.get(groomStatusForTask);
                if (list == null) {
                    list = new ArrayList(groomStatusForTask.getMaxTasks());
                }
                list.add(new LaunchTaskAction(task));
                hashMap.put(groomStatusForTask, list);
            }
            Iterator it = hashMap.keySet().iterator();
            while (this.jip.getStatus().getRunState() == 1 && it.hasNext()) {
                GroomServerStatus groomServerStatus = (GroomServerStatus) it.next();
                List list2 = (List) hashMap.get(groomServerStatus);
                GroomProtocol findGroomServer = SimpleTaskScheduler.this.groomServerManager.get().findGroomServer(groomServerStatus);
                try {
                    GroomServerAction[] groomServerActionArr = new GroomServerAction[list2.size()];
                    list2.toArray(groomServerActionArr);
                    findGroomServer.dispatch(new DispatchTasksDirective(groomServerActionArr));
                } catch (IOException e) {
                    SimpleTaskScheduler.LOG.error("Fail to dispatch tasks to GroomServer " + groomServerStatus.getGroomName(), e);
                }
            }
            if (it.hasNext() && this.jip.getStatus().getRunState() != 1) {
                SimpleTaskScheduler.LOG.warn("Currently master only shcedules job in running state. This may be refined in the future. JobId:" + this.jip.getJobID());
            }
            return Boolean.TRUE;
        }
    }

    @Override // org.apache.hama.bsp.TaskScheduler
    public void start() {
        if (!this.initialized.compareAndSet(false, true)) {
            throw new IllegalStateException(SimpleTaskScheduler.class.getSimpleName() + " is started.");
        }
        this.queueManager.set(new QueueManager(getConf()));
        this.federator.set(new Federator((HamaConfiguration) getConf()));
        this.queueManager.get().createFCFSQueue(WAIT_QUEUE);
        this.queueManager.get().createFCFSQueue(PROCESSING_QUEUE);
        this.queueManager.get().createFCFSQueue(FINISHED_QUEUE);
        this.groomServerManager.get().addJobInProgressListener(this.jobListener);
        if (null != getConf() && getConf().getBoolean("bsp.federator.enabled", false)) {
            this.federator.get().start();
        }
        this.jobProcessor.start();
        if (null != getConf() && getConf().getBoolean("bsp.federator.enabled", false)) {
            this.scheduler.scheduleAtFixedRate(new JvmCollector(this.federator.get(), ((BSPMaster) this.groomServerManager.get()).zk), 5L, 5L, TimeUnit.SECONDS);
        }
        if (null == this.monitorManager.get() || null == this.monitorManager.get().supervisor()) {
            return;
        }
        this.monitorManager.get().supervisor().register(new NodeWatcher(this.groomServerManager.get(), this));
    }

    @Override // org.apache.hama.bsp.TaskScheduler
    public void terminate() {
        this.initialized.set(false);
        if (null != this.jobListener) {
            this.groomServerManager.get().removeJobInProgressListener(this.jobListener);
        }
        this.jobProcessor.interrupt();
        this.federator.get().interrupt();
    }

    @Override // org.apache.hama.bsp.TaskScheduler
    public Collection<JobInProgress> getJobs(String str) {
        return this.queueManager.get().findQueue(str).jobs();
    }

    @Override // org.apache.hama.bsp.TaskScheduler
    public JobInProgress findJobById(BSPJobID bSPJobID) {
        for (JobInProgress jobInProgress : getJobs(PROCESSING_QUEUE)) {
            if (jobInProgress.getJobID().equals(bSPJobID)) {
                return jobInProgress;
            }
        }
        return null;
    }
}
