package org.apache.hama.bsp;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/hama/bsp/SimpleTaskScheduler.class */
public class SimpleTaskScheduler extends TaskScheduler {
    private static final Log LOG = LogFactory.getLog(SimpleTaskScheduler.class);
    public static final String WAIT_QUEUE = "waitQueue";
    public static final String PROCESSING_QUEUE = "processingQueue";
    public static final String FINISHED_QUEUE = "finishedQueue";
    private QueueManager queueManager;
    private volatile boolean initialized;
    private JobListener jobListener = new JobListener();
    private JobProcessor jobProcessor = new JobProcessor();

    /* loaded from: input_file:org/apache/hama/bsp/SimpleTaskScheduler$JobListener.class */
    private class JobListener extends JobInProgressListener {
        private JobListener() {
        }

        @Override // org.apache.hama.bsp.JobInProgressListener
        public void jobAdded(JobInProgress jobInProgress) throws IOException {
            SimpleTaskScheduler.this.queueManager.initJob(jobInProgress);
            SimpleTaskScheduler.this.queueManager.addJob(SimpleTaskScheduler.WAIT_QUEUE, jobInProgress);
        }

        @Override // org.apache.hama.bsp.JobInProgressListener
        public void jobRemoved(JobInProgress jobInProgress) throws IOException {
            SimpleTaskScheduler.this.queueManager.moveJob(SimpleTaskScheduler.PROCESSING_QUEUE, SimpleTaskScheduler.FINISHED_QUEUE, jobInProgress);
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/SimpleTaskScheduler$JobProcessor.class */
    private class JobProcessor extends Thread implements Schedulable {
        JobProcessor() {
            super("JobProcess");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            if (false == SimpleTaskScheduler.this.initialized) {
                throw new IllegalStateException("SimpleTaskScheduler initialization is not yet finished!");
            }
            while (SimpleTaskScheduler.this.initialized) {
                Queue<JobInProgress> findQueue = SimpleTaskScheduler.this.queueManager.findQueue(SimpleTaskScheduler.WAIT_QUEUE);
                if (null == findQueue) {
                    SimpleTaskScheduler.LOG.error("waitQueue does not exist.");
                    throw new NullPointerException("waitQueue does not exist.");
                }
                JobInProgress removeJob = findQueue.removeJob();
                SimpleTaskScheduler.this.queueManager.addJob(SimpleTaskScheduler.PROCESSING_QUEUE, removeJob);
                Collection<GroomServerStatus> groomServerStatusKeySet = SimpleTaskScheduler.this.groomServerManager.groomServerStatusKeySet();
                schedule(removeJob, (GroomServerStatus[]) groomServerStatusKeySet.toArray(new GroomServerStatus[groomServerStatusKeySet.size()]));
            }
        }

        @Override // org.apache.hama.bsp.Schedulable
        public void schedule(JobInProgress jobInProgress, GroomServerStatus... groomServerStatusArr) {
            int groomServers = SimpleTaskScheduler.this.groomServerManager.getClusterStatus(false).getGroomServers();
            ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(groomServerStatusArr.length + 5);
            for (GroomServerStatus groomServerStatus : groomServerStatusArr) {
                newScheduledThreadPool.schedule(new TaskWorker(groomServerStatus, groomServers, jobInProgress), 0L, TimeUnit.SECONDS);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hama/bsp/SimpleTaskScheduler$TaskWorker.class */
    public class TaskWorker implements Runnable {
        private final GroomServerStatus stus;
        private final int groomNum;
        private final JobInProgress jip;

        TaskWorker(GroomServerStatus groomServerStatus, int i, JobInProgress jobInProgress) {
            this.stus = groomServerStatus;
            this.groomNum = i;
            this.jip = jobInProgress;
            if (null == this.stus) {
                throw new NullPointerException("Target groom server is not specified.");
            }
            if (-1 == this.groomNum) {
                throw new IllegalArgumentException("Groom number is not specified.");
            }
            if (null == this.jip) {
                throw new NullPointerException("No job is specified.");
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            ArrayList arrayList = new ArrayList();
            int i = 0;
            do {
                Task obtainNewTask = this.jip.obtainNewTask(this.stus, this.groomNum);
                if (obtainNewTask == null) {
                    break;
                }
                arrayList.add(new LaunchTaskAction(obtainNewTask));
                i++;
            } while (i <= this.stus.getMaxTasks() - 1);
            if (this.jip.getStatus().getRunState() != 1) {
                SimpleTaskScheduler.LOG.warn("Currently master only shcedules job in running state. This may be refined in the future. JobId:" + this.jip.getJobID());
                return;
            }
            try {
                SimpleTaskScheduler.this.groomServerManager.findGroomServer(this.stus).dispatch(new DispatchTasksDirective((GroomServerAction[]) arrayList.toArray(new GroomServerAction[0])));
            } catch (IOException e) {
                SimpleTaskScheduler.LOG.error("Fail to dispatch tasks to GroomServer " + this.stus.getGroomName(), e);
            }
        }
    }

    @Override // org.apache.hama.bsp.TaskScheduler
    public void start() {
        this.queueManager = new QueueManager(getConf());
        this.queueManager.createFCFSQueue(WAIT_QUEUE);
        this.queueManager.createFCFSQueue(PROCESSING_QUEUE);
        this.queueManager.createFCFSQueue(FINISHED_QUEUE);
        this.groomServerManager.addJobInProgressListener(this.jobListener);
        this.initialized = true;
        this.jobProcessor.start();
    }

    @Override // org.apache.hama.bsp.TaskScheduler
    public void terminate() {
        this.initialized = false;
        if (null != this.jobListener) {
            this.groomServerManager.removeJobInProgressListener(this.jobListener);
        }
    }

    @Override // org.apache.hama.bsp.TaskScheduler
    public Collection<JobInProgress> getJobs(String str) {
        return this.queueManager.findQueue(str).jobs();
    }
}
