/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task.execute;

import backtype.storm.task.TopologyContext;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.daemon.worker.timer.RotatingMapTrigger;
import com.alibaba.jstorm.daemon.worker.timer.TaskHeartbeatTrigger;
import com.alibaba.jstorm.metric.JStormHealthCheck;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskBaseMetric;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Map;
import java.util.TreeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseExecutors
extends RunnableCallback {
    private static Logger LOG = LoggerFactory.getLogger(BaseExecutors.class);
    protected final String topologyId;
    protected final String componentId;
    protected final int taskId;
    protected final String idStr;
    protected Map storm_conf;
    protected TopologyContext userTopologyCtx;
    protected TopologyContext sysTopologyCtx;
    protected TaskBaseMetric task_stats;
    protected volatile TaskStatus taskStatus;
    protected int message_timeout_secs = 30;
    protected Throwable error = null;
    protected ITaskReportErr report_error;
    protected DisruptorQueue exeQueue;
    protected DisruptorQueue controlQueue;
    protected Map<Integer, DisruptorQueue> innerTaskTransfer;
    protected Task task;
    protected long assignmentTs;
    protected TaskTransfer taskTransfer;
    protected JStormMetricsReporter metricsReporter;
    protected boolean isFinishInit = false;
    protected RotatingMapTrigger rotatingMapTrigger;
    protected TaskHeartbeatTrigger taskHbTrigger;

    public BaseExecutors(Task task) {
        this.task = task;
        this.storm_conf = task.getStormConf();
        this.userTopologyCtx = task.getUserContext();
        this.sysTopologyCtx = task.getTopologyContext();
        this.task_stats = task.getTaskStats();
        this.taskId = this.sysTopologyCtx.getThisTaskId();
        this.innerTaskTransfer = task.getInnerTaskTransfer();
        this.topologyId = this.sysTopologyCtx.getTopologyId();
        this.componentId = this.sysTopologyCtx.getThisComponentId();
        this.idStr = JStormServerUtils.getName(this.componentId, this.taskId);
        this.taskStatus = task.getTaskStatus();
        this.report_error = task.getReportErrorDie();
        this.taskTransfer = task.getTaskTransfer();
        this.metricsReporter = task.getWorkerData().getMetricsReporter();
        this.message_timeout_secs = JStormUtils.parseInt(this.storm_conf.get("topology.message.timeout.secs"), 30);
        int queue_size = Utils.getInt(this.storm_conf.get("topology.executor.receive.buffer.size"), 256);
        WaitStrategy waitStrategy = (WaitStrategy)JStormUtils.createDisruptorWaitStrategy(this.storm_conf);
        this.exeQueue = DisruptorQueue.mkInstance(this.idStr, ProducerType.MULTI, queue_size, waitStrategy);
        queue_size = Utils.getInt(this.storm_conf.get("topology.ctrl.buffer.size"), 32);
        this.controlQueue = DisruptorQueue.mkInstance(this.idStr + " for control message", ProducerType.MULTI, queue_size, waitStrategy);
        this.registerInnerTransfer(this.exeQueue);
        this.task.getControlQueues().put(this.taskId, this.controlQueue);
        QueueGauge exeQueueGauge = new QueueGauge(this.exeQueue, this.idStr, "ExecutorQueue");
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topologyId, this.componentId, this.taskId, "ExecutorQueue", MetricType.GAUGE), new AsmGauge(exeQueueGauge));
        JStormHealthCheck.registerTaskHealthCheck(this.taskId, "ExecutorQueue", exeQueueGauge);
        QueueGauge controlQueueGauge = new QueueGauge(this.controlQueue, this.idStr, "CtrlQueue");
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topologyId, this.componentId, this.taskId, "CtrlQueue", MetricType.GAUGE), new AsmGauge(controlQueueGauge));
        JStormHealthCheck.registerTaskHealthCheck(this.taskId, "CtrlQueue", controlQueueGauge);
        this.rotatingMapTrigger = new RotatingMapTrigger(this.storm_conf, this.idStr + "_rotating", this.controlQueue);
        this.rotatingMapTrigger.register();
        this.taskHbTrigger = new TaskHeartbeatTrigger(this.storm_conf, this.idStr + "_taskHeartbeat", this.controlQueue, this.taskId, this.componentId, this.sysTopologyCtx, this.report_error);
        this.assignmentTs = System.currentTimeMillis();
    }

    public void init() throws Exception {
        throw new RuntimeException("Should implement this function");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initWrapper() {
        try {
            LOG.info("{} begin to init", (Object)this.idStr);
            this.init();
            if (this.taskId == this.getMinTaskIdOfWorker()) {
                this.metricsReporter.setOutputCollector(this.getOutputCollector());
            }
            this.isFinishInit = true;
        }
        catch (Throwable e) {
            this.error = e;
            LOG.error("Init error ", e);
            this.report_error.report(e);
        }
        finally {
            LOG.info("{} initialization finished", (Object)this.idStr);
        }
    }

    @Override
    public void preRun() {
        WorkerClassLoader.switchThreadContext();
    }

    @Override
    public void postRun() {
        WorkerClassLoader.restoreThreadContext();
    }

    @Override
    public void run() {
        throw new RuntimeException("Should implement this function");
    }

    @Override
    public Exception error() {
        if (this.error == null) {
            return null;
        }
        return new Exception(this.error);
    }

    @Override
    public void shutdown() {
        LOG.info("Shutdown executing thread of " + this.idStr);
        if (!this.taskStatus.isShutdown()) {
            LOG.info("Taskstatus isn't shutdown, but enter shutdown method, Occur exception");
        }
        this.unregistorInnerTransfer();
    }

    protected void registerInnerTransfer(DisruptorQueue disruptorQueue) {
        LOG.info("Registor inner transfer for executor thread of " + this.idStr);
        DisruptorQueue existInnerTransfer = this.innerTaskTransfer.get(this.taskId);
        if (existInnerTransfer != null) {
            LOG.info("Exist inner task transfer for executing thread of " + this.idStr);
            if (existInnerTransfer != disruptorQueue) {
                throw new RuntimeException("Inner task transfer must be only one in executing thread of " + this.idStr);
            }
        }
        this.innerTaskTransfer.put(this.taskId, disruptorQueue);
    }

    protected void unregistorInnerTransfer() {
        LOG.info("Unregistor inner transfer for executor thread of " + this.idStr);
        this.innerTaskTransfer.remove(this.taskId);
    }

    protected int getMinTaskIdOfWorker() {
        TreeSet<Integer> tasks = new TreeSet<Integer>(this.sysTopologyCtx.getThisWorkerTasks());
        return (Integer)tasks.first();
    }

    public Object getOutputCollector() {
        return null;
    }
}

