/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zeppelin.scheduler;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelScheduler
implements Scheduler {
    List<Job> queue = new LinkedList<Job>();
    List<Job> running = new LinkedList<Job>();
    private ExecutorService executor;
    private SchedulerListener listener;
    boolean terminate = false;
    private String name;
    private int maxConcurrency;
    static Logger LOGGER = LoggerFactory.getLogger(ParallelScheduler.class);

    public ParallelScheduler(String name, ExecutorService executor, SchedulerListener listener, int maxConcurrency) {
        this.name = name;
        this.executor = executor;
        this.listener = listener;
        this.maxConcurrency = maxConcurrency;
    }

    @Override
    public String getName() {
        return this.name;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Collection<Job> getJobsWaiting() {
        LinkedList<Job> ret = new LinkedList<Job>();
        List<Job> list = this.queue;
        synchronized (list) {
            for (Job job : this.queue) {
                ret.add(job);
            }
        }
        return ret;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Job removeFromWaitingQueue(String jobId) {
        List<Job> list = this.queue;
        synchronized (list) {
            Iterator<Job> it = this.queue.iterator();
            while (it.hasNext()) {
                Job job = it.next();
                if (!job.getId().equals(jobId)) continue;
                it.remove();
                return job;
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Collection<Job> getJobsRunning() {
        LinkedList<Job> ret = new LinkedList<Job>();
        List<Job> list = this.queue;
        synchronized (list) {
            for (Job job : this.running) {
                ret.add(job);
            }
        }
        return ret;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void submit(Job job) {
        job.setStatus(Job.Status.PENDING);
        List<Job> list = this.queue;
        synchronized (list) {
            this.queue.add(job);
            this.queue.notify();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        while (!this.terminate) {
            Job job = null;
            List<Job> list = this.queue;
            synchronized (list) {
                if (this.running.size() >= this.maxConcurrency || this.queue.isEmpty()) {
                    try {
                        this.queue.wait(500L);
                    }
                    catch (InterruptedException e) {
                        LOGGER.error("Exception in MockInterpreterAngular while interpret queue.wait", e);
                    }
                    continue;
                }
                job = this.queue.remove(0);
                this.running.add(job);
            }
            ParallelScheduler scheduler = this;
            this.executor.execute(new JobRunner(scheduler, job));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setMaxConcurrency(int maxConcurrency) {
        this.maxConcurrency = maxConcurrency;
        List<Job> list = this.queue;
        synchronized (list) {
            this.queue.notify();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        this.terminate = true;
        List<Job> list = this.queue;
        synchronized (list) {
            this.queue.notify();
        }
    }

    private class JobRunner
    implements Runnable {
        private Scheduler scheduler;
        private Job job;

        public JobRunner(Scheduler scheduler, Job job) {
            this.scheduler = scheduler;
            this.job = job;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (this.job.isAborted()) {
                this.job.setStatus(Job.Status.ABORT);
                this.job.aborted = false;
                List<Job> list = ParallelScheduler.this.queue;
                synchronized (list) {
                    ParallelScheduler.this.running.remove(this.job);
                    ParallelScheduler.this.queue.notify();
                }
                return;
            }
            this.job.setStatus(Job.Status.RUNNING);
            if (ParallelScheduler.this.listener != null) {
                ParallelScheduler.this.listener.jobStarted(this.scheduler, this.job);
            }
            this.job.run();
            if (this.job.isAborted()) {
                this.job.setStatus(Job.Status.ABORT);
            } else if (this.job.getException() != null) {
                this.job.setStatus(Job.Status.ERROR);
            } else {
                this.job.setStatus(Job.Status.FINISHED);
            }
            if (ParallelScheduler.this.listener != null) {
                ParallelScheduler.this.listener.jobFinished(this.scheduler, this.job);
            }
            this.job.aborted = false;
            List<Job> list = ParallelScheduler.this.queue;
            synchronized (list) {
                ParallelScheduler.this.running.remove(this.job);
                ParallelScheduler.this.queue.notify();
            }
        }
    }
}

