/*
 * Decompiled with CFR 0.152.
 */
package io.druid.indexing.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.WorkerCuratorCoordinator;
import io.druid.indexing.worker.config.WorkerConfig;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;

public class WorkerTaskMonitor {
    private static final EmittingLogger log = new EmittingLogger(WorkerTaskMonitor.class);
    private final ObjectMapper jsonMapper;
    private final PathChildrenCache pathChildrenCache;
    private final CuratorFramework cf;
    private final WorkerCuratorCoordinator workerCuratorCoordinator;
    private final TaskRunner taskRunner;
    private final ExecutorService exec;
    private final List<Task> running = new CopyOnWriteArrayList<Task>();

    @Inject
    public WorkerTaskMonitor(ObjectMapper jsonMapper, CuratorFramework cf, WorkerCuratorCoordinator workerCuratorCoordinator, TaskRunner taskRunner, WorkerConfig workerConfig) {
        this.jsonMapper = jsonMapper;
        this.pathChildrenCache = new PathChildrenCache(cf, workerCuratorCoordinator.getTaskPathForWorker(), false, true, Execs.makeThreadFactory((String)"TaskMonitorCache-%s"));
        this.cf = cf;
        this.workerCuratorCoordinator = workerCuratorCoordinator;
        this.taskRunner = taskRunner;
        this.exec = Execs.multiThreaded((int)workerConfig.getCapacity(), (String)"WorkerTaskMonitor-%d");
    }

    @LifecycleStart
    public void start() {
        try {
            this.pathChildrenCache.getListenable().addListener((Object)new PathChildrenCacheListener(){

                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    if (pathChildrenCacheEvent.getType().equals((Object)PathChildrenCacheEvent.Type.CHILD_ADDED)) {
                        final Task task = (Task)WorkerTaskMonitor.this.jsonMapper.readValue((byte[])WorkerTaskMonitor.this.cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()), Task.class);
                        if (WorkerTaskMonitor.this.isTaskRunning(task)) {
                            log.warn("I can't build it. There's something in the way. Got task %s that I am already running...", new Object[]{task.getId()});
                            WorkerTaskMonitor.this.workerCuratorCoordinator.unannounceTask(task.getId());
                            return;
                        }
                        log.info("Submitting runnable for task[%s]", new Object[]{task.getId()});
                        WorkerTaskMonitor.this.exec.submit(new Runnable(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public void run() {
                                TaskStatus taskStatus;
                                long startTime = System.currentTimeMillis();
                                log.info("Affirmative. Running task [%s]", new Object[]{task.getId()});
                                WorkerTaskMonitor.this.running.add(task);
                                try {
                                    WorkerTaskMonitor.this.workerCuratorCoordinator.unannounceTask(task.getId());
                                    WorkerTaskMonitor.this.workerCuratorCoordinator.announceTastAnnouncement(TaskAnnouncement.create(task, TaskStatus.running(task.getId())));
                                    taskStatus = (TaskStatus)WorkerTaskMonitor.this.taskRunner.run(task).get();
                                }
                                catch (Exception e) {
                                    log.makeAlert((Throwable)e, "I can't build there. Failed to run task", new Object[0]).addData("task", (Object)task.getId()).emit();
                                    taskStatus = TaskStatus.failure(task.getId());
                                }
                                finally {
                                    WorkerTaskMonitor.this.running.remove(task);
                                }
                                taskStatus = taskStatus.withDuration(System.currentTimeMillis() - startTime);
                                try {
                                    WorkerTaskMonitor.this.workerCuratorCoordinator.updateAnnouncement(TaskAnnouncement.create(task, taskStatus));
                                    log.info("Job's finished. Completed [%s] with status [%s]", new Object[]{task.getId(), taskStatus.getStatusCode()});
                                }
                                catch (Exception e) {
                                    log.makeAlert((Throwable)e, "Failed to update task status", new Object[0]).addData("task", (Object)task.getId()).emit();
                                }
                            }
                        });
                    }
                }
            });
            this.pathChildrenCache.start();
        }
        catch (Exception e) {
            log.makeAlert((Throwable)e, "Exception starting WorkerTaskMonitor", new Object[0]).addData("exception", (Object)e.toString()).emit();
        }
    }

    private boolean isTaskRunning(Task task) {
        for (Task runningTask : this.running) {
            if (!runningTask.getId().equals(task.getId())) continue;
            return true;
        }
        return false;
    }

    @LifecycleStop
    public void stop() {
        try {
            this.pathChildrenCache.close();
            this.exec.shutdown();
        }
        catch (Exception e) {
            log.makeAlert((Throwable)e, "Exception stopping WorkerTaskMonitor", new Object[0]).addData("exception", (Object)e.toString()).emit();
        }
    }
}

