/*
 * Decompiled with CFR 0.152.
 */
package tech.powerjob.worker.core.tracker.processor;

import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.powerjob.common.enhance.SafeRunnable;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.core.processor.runnable.HeavyProcessorRunnable;
import tech.powerjob.worker.core.tracker.manager.ProcessorTrackerManager;
import tech.powerjob.worker.extension.processor.ProcessorBean;
import tech.powerjob.worker.extension.processor.ProcessorDefinition;
import tech.powerjob.worker.log.OmsLogger;
import tech.powerjob.worker.log.OmsLoggerFactory;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.pojo.model.InstanceInfo;
import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;

public class ProcessorTracker {
    private static final Logger log = LoggerFactory.getLogger(ProcessorTracker.class);
    private long startTime;
    private WorkerRuntime workerRuntime;
    private InstanceInfo instanceInfo;
    private Long instanceId;
    private ProcessorBean processorBean;
    private OmsLogger omsLogger;
    private Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
    private long lastIdleTime;
    private long lastCompletedTaskCount;
    private String taskTrackerAddress;
    private ThreadPoolExecutor threadPool;
    private ScheduledExecutorService timingPool;
    private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128;
    private static final long MAX_IDLE_TIME = 120000L;
    private boolean lethal = false;
    private String lethalReason;

    public ProcessorTracker(TaskTrackerStartTaskReq request, WorkerRuntime workerRuntime) {
        try {
            this.startTime = System.currentTimeMillis();
            this.workerRuntime = workerRuntime;
            this.instanceInfo = request.getInstanceInfo();
            this.instanceId = request.getInstanceInfo().getInstanceId();
            this.taskTrackerAddress = request.getTaskTrackerAddress();
            this.omsLogger = OmsLoggerFactory.build(this.instanceId, request.getLogConfig(), workerRuntime);
            this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
            this.lastIdleTime = -1L;
            this.lastCompletedTaskCount = 0L;
            this.initThreadPool();
            this.initTimingJob();
            this.processorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(this.instanceInfo.getProcessorType()).setProcessorInfo(this.instanceInfo.getProcessorInfo()));
            log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", (Object)this.instanceId);
        }
        catch (Throwable t) {
            log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", (Object)this.instanceId, (Object)t);
            this.lethal = true;
            this.lethalReason = ExceptionUtils.getMessage((Throwable)t);
        }
    }

    public void submitTask(TaskDO newTask) {
        if (this.lethal) {
            ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq().setInstanceId(this.instanceId).setSubInstanceId(newTask.getSubInstanceId()).setTaskId(newTask.getTaskId()).setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()).setResult(this.lethalReason).setReportTime(System.currentTimeMillis());
            TransportUtils.ptReportTask(report, this.taskTrackerAddress, this.workerRuntime);
            return;
        }
        boolean success = false;
        newTask.setInstanceId(this.instanceInfo.getInstanceId());
        newTask.setAddress(this.taskTrackerAddress);
        HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(this.instanceInfo, this.taskTrackerAddress, newTask, this.processorBean, this.omsLogger, this.statusReportRetryQueue, this.workerRuntime);
        try {
            this.threadPool.submit(heavyProcessorRunnable);
            success = true;
        }
        catch (RejectedExecutionException ignore) {
            log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.", new Object[]{this.instanceId, newTask.getTaskId(), newTask.getTaskName()});
        }
        catch (Exception e) {
            log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", new Object[]{this.instanceId, newTask.getTaskId(), newTask.getTaskName(), e});
        }
        if (success) {
            ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
            reportReq.setInstanceId(this.instanceId);
            reportReq.setSubInstanceId(newTask.getSubInstanceId());
            reportReq.setTaskId(newTask.getTaskId());
            reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
            reportReq.setReportTime(System.currentTimeMillis());
            TransportUtils.ptReportTask(reportReq, this.taskTrackerAddress, this.workerRuntime);
            log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.", new Object[]{this.instanceId, newTask.getTaskId(), newTask.getTaskName(), this.threadPool.getQueue().size()});
        }
    }

    public void destroy() {
        CommonUtils.executeIgnoreException(() -> {
            List<Runnable> tasks = this.threadPool.shutdownNow();
            if (!CollectionUtils.isEmpty(tasks)) {
                log.warn("[ProcessorTracker-{}] shutdown threadPool now and stop {} tasks.", (Object)this.instanceId, (Object)tasks.size());
            }
        });
        this.statusReportRetryQueue.clear();
        ProcessorTrackerManager.removeProcessorTracker(this.instanceId);
        log.info("[ProcessorTracker-{}] ProcessorTracker destroyed successfully!", (Object)this.instanceId);
        CommonUtils.executeIgnoreException(() -> this.timingPool.shutdownNow());
    }

    private void initThreadPool() {
        int poolSize = this.calThreadPoolSize();
        ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(128);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPP-%d").build();
        ThreadPoolExecutor.AbortPolicy rejectionHandler = new ThreadPoolExecutor.AbortPolicy();
        this.threadPool = new ThreadPoolExecutor(poolSize, poolSize, 60L, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler);
        this.threadPool.allowCoreThreadTimeOut(true);
    }

    private void initTimingJob() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPT-%d").build();
        this.timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory);
        this.timingPool.scheduleAtFixedRate((Runnable)((Object)new CheckerAndReporter()), 0L, 10L, TimeUnit.SECONDS);
    }

    private int calThreadPoolSize() {
        ExecuteType executeType = ExecuteType.valueOf((String)this.instanceInfo.getExecuteType());
        ProcessorType processorType = ProcessorType.valueOf((String)this.instanceInfo.getProcessorType());
        if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) {
            return 1;
        }
        if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) {
            return this.instanceInfo.getThreadConcurrency();
        }
        if (TimeExpressionType.FREQUENT_TYPES.contains(this.instanceInfo.getTimeExpressionType())) {
            return this.instanceInfo.getThreadConcurrency();
        }
        return 2;
    }

    private class CheckerAndReporter
    extends SafeRunnable {
        private CheckerAndReporter() {
        }

        public void run0() {
            long interval = System.currentTimeMillis() - ProcessorTracker.this.startTime;
            if (!TimeExpressionType.FREQUENT_TYPES.contains(ProcessorTracker.this.instanceInfo.getTimeExpressionType()) && interval > ProcessorTracker.this.instanceInfo.getInstanceTimeoutMS()) {
                log.warn("[ProcessorTracker-{}] detected instance timeout, maybe TaskTracker's destroy request missed, so try to kill self now.", (Object)ProcessorTracker.this.instanceId);
                ProcessorTracker.this.destroy();
                return;
            }
            if (ProcessorTracker.this.threadPool.getActiveCount() > 0 || ProcessorTracker.this.threadPool.getCompletedTaskCount() > ProcessorTracker.this.lastCompletedTaskCount) {
                ProcessorTracker.this.lastIdleTime = -1L;
                ProcessorTracker.this.lastCompletedTaskCount = ProcessorTracker.this.threadPool.getCompletedTaskCount();
            } else if (ProcessorTracker.this.lastIdleTime == -1L) {
                ProcessorTracker.this.lastIdleTime = System.currentTimeMillis();
            } else {
                long idleTime = System.currentTimeMillis() - ProcessorTracker.this.lastIdleTime;
                if (idleTime > 120000L) {
                    log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", (Object)ProcessorTracker.this.instanceId, (Object)idleTime);
                    ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(ProcessorTracker.this.instanceId);
                    statusReportReq.setAddress(ProcessorTracker.this.workerRuntime.getWorkerAddress());
                    TransportUtils.ptReportSelfStatus(statusReportReq, ProcessorTracker.this.taskTrackerAddress, ProcessorTracker.this.workerRuntime);
                    ProcessorTracker.this.destroy();
                    return;
                }
            }
            while (!ProcessorTracker.this.statusReportRetryQueue.isEmpty()) {
                ProcessorReportTaskStatusReq req = (ProcessorReportTaskStatusReq)ProcessorTracker.this.statusReportRetryQueue.poll();
                if (req == null) continue;
                req.setReportTime(System.currentTimeMillis());
                if (TransportUtils.reliablePtReportTask(req, ProcessorTracker.this.taskTrackerAddress, ProcessorTracker.this.workerRuntime)) continue;
                ProcessorTracker.this.statusReportRetryQueue.add(req);
                log.warn("[ProcessorRunnable-{}] retry report finished task status failed: {}", (Object)ProcessorTracker.this.instanceId, (Object)req);
                return;
            }
            long waitingNum = ProcessorTracker.this.threadPool.getQueue().size();
            ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildLoadReport(ProcessorTracker.this.instanceId, waitingNum);
            statusReportReq.setAddress(ProcessorTracker.this.workerRuntime.getWorkerAddress());
            TransportUtils.ptReportSelfStatus(statusReportReq, ProcessorTracker.this.taskTrackerAddress, ProcessorTracker.this.workerRuntime);
            log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", (Object)ProcessorTracker.this.instanceId, (Object)waitingNum);
        }
    }
}

