package com.alibaba.jstorm.task.execute;

import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.scheduler.WorkerSlot;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.daemon.worker.timer.RotatingMapTrigger;
import com.alibaba.jstorm.daemon.worker.timer.TaskHeartbeatTrigger;
import com.alibaba.jstorm.metric.JStormHealthCheck;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskBaseMetric;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import shade.storm.com.lmax.disruptor.dsl.ProducerType;

/* loaded from: input_file:com/alibaba/jstorm/task/execute/BaseExecutors.class */
public class BaseExecutors extends RunnableCallback {
    private static Logger LOG = LoggerFactory.getLogger(BaseExecutors.class);
    protected final String topologyId;
    protected final String componentId;
    protected final int taskId;
    protected final String idStr;
    protected Map storm_conf;
    protected TopologyContext userTopologyCtx;
    protected TopologyContext sysTopologyCtx;
    protected TaskBaseMetric task_stats;
    protected volatile TaskStatus taskStatus;
    protected int message_timeout_secs;
    protected ITaskReportErr report_error;
    protected DisruptorQueue exeQueue;
    protected DisruptorQueue controlQueue;
    protected Map<Integer, DisruptorQueue> innerTaskTransfer;
    protected Task task;
    protected long assignmentTs;
    protected TaskTransfer taskTransfer;
    protected JStormMetricsReporter metricsReporter;
    protected RotatingMapTrigger rotatingMapTrigger;
    protected TaskHeartbeatTrigger taskHbTrigger;
    protected boolean isBatchMode;
    protected Throwable error = null;
    protected boolean isFinishInit = false;
    protected TaskStatus executorStatus = new TaskStatus();

    public BaseExecutors(Task task) {
        this.message_timeout_secs = 30;
        this.task = task;
        this.storm_conf = task.getStormConf();
        this.userTopologyCtx = task.getUserContext();
        this.sysTopologyCtx = task.getTopologyContext();
        this.task_stats = task.getTaskStats();
        this.taskId = this.sysTopologyCtx.getThisTaskId();
        this.innerTaskTransfer = task.getInnerTaskTransfer();
        this.topologyId = this.sysTopologyCtx.getTopologyId();
        this.componentId = this.sysTopologyCtx.getThisComponentId();
        this.idStr = JStormServerUtils.getName(this.componentId, this.taskId);
        this.taskStatus = task.getTaskStatus();
        this.report_error = task.getReportErrorDie();
        this.taskTransfer = task.getTaskTransfer();
        this.metricsReporter = task.getWorkerData().getMetricsReporter();
        this.message_timeout_secs = JStormUtils.parseInt(this.storm_conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30).intValue();
        int intValue = Utils.getInt(this.storm_conf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256).intValue();
        TimeoutBlockingWaitStrategy timeoutBlockingWaitStrategy = new TimeoutBlockingWaitStrategy(JStormUtils.parseLong(this.storm_conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10L).longValue(), TimeUnit.MILLISECONDS);
        this.exeQueue = DisruptorQueue.mkInstance(this.idStr, ProducerType.MULTI, intValue, timeoutBlockingWaitStrategy, ConfigExtension.isDisruptorQueueBatchMode(this.storm_conf).booleanValue(), ConfigExtension.getDisruptorBufferSize(this.storm_conf), ConfigExtension.getDisruptorBufferFlushMs(this.storm_conf));
        this.controlQueue = DisruptorQueue.mkInstance(this.idStr + " for control message", ProducerType.MULTI, Utils.getInt(this.storm_conf.get(Config.TOPOLOGY_CTRL_BUFFER_SIZE), 32).intValue(), timeoutBlockingWaitStrategy, false, 0, 0L);
        registerInnerTransfer(this.exeQueue);
        this.task.getControlQueues().put(Integer.valueOf(this.taskId), this.controlQueue);
        QueueGauge queueGauge = new QueueGauge(this.exeQueue, this.idStr, MetricDef.EXECUTE_QUEUE);
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topologyId, this.componentId, this.taskId, MetricDef.EXECUTE_QUEUE, MetricType.GAUGE), new AsmGauge(queueGauge));
        JStormHealthCheck.registerTaskHealthCheck(this.taskId, MetricDef.EXECUTE_QUEUE, queueGauge);
        QueueGauge queueGauge2 = new QueueGauge(this.controlQueue, this.idStr, MetricDef.CONTROL_QUEUE);
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topologyId, this.componentId, this.taskId, MetricDef.CONTROL_QUEUE, MetricType.GAUGE), new AsmGauge(queueGauge2));
        JStormHealthCheck.registerTaskHealthCheck(this.taskId, MetricDef.CONTROL_QUEUE, queueGauge2);
        this.rotatingMapTrigger = new RotatingMapTrigger(this.storm_conf, this.idStr + "_rotating", this.controlQueue);
        this.rotatingMapTrigger.register();
        this.taskHbTrigger = new TaskHeartbeatTrigger(this.storm_conf, this.idStr + "_taskHeartbeat", this.controlQueue, this.taskId, this.componentId, this.sysTopologyCtx, this.report_error, this.executorStatus);
        this.assignmentTs = System.currentTimeMillis();
        this.isBatchMode = ConfigExtension.isTaskBatchTuple(this.storm_conf).booleanValue();
    }

    public void init() throws Exception {
        throw new RuntimeException("Should implement this function");
    }

    public void initWrapper() {
        try {
            try {
                LOG.info("{} begin to init", this.idStr);
                init();
                if (this.taskId == getMinTaskIdOfWorker()) {
                    this.metricsReporter.setOutputCollector(getOutputCollector());
                }
                this.isFinishInit = true;
                LOG.info("{} initialization finished", this.idStr);
            } catch (Throwable th) {
                this.error = th;
                LOG.error("Init error ", th);
                this.report_error.report(th);
                LOG.info("{} initialization finished", this.idStr);
            }
        } catch (Throwable th2) {
            LOG.info("{} initialization finished", this.idStr);
            throw th2;
        }
    }

    @Override // com.alibaba.jstorm.callback.RunnableCallback
    public void preRun() {
        WorkerClassLoader.switchThreadContext();
    }

    @Override // com.alibaba.jstorm.callback.RunnableCallback
    public void postRun() {
        WorkerClassLoader.restoreThreadContext();
    }

    @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
    public void run() {
        throw new RuntimeException("Should implement this function");
    }

    @Override // com.alibaba.jstorm.callback.RunnableCallback
    public Exception error() {
        if (this.error == null) {
            return null;
        }
        return new Exception(this.error);
    }

    @Override // com.alibaba.jstorm.callback.RunnableCallback, backtype.storm.daemon.Shutdownable
    public void shutdown() {
        LOG.info("Shutdown executing thread of " + this.idStr);
        if (!this.taskStatus.isShutdown()) {
            LOG.error("Taskstatus isn't shutdown, but enter shutdown method, Occur exception");
        }
        unregistorInnerTransfer();
    }

    protected void registerInnerTransfer(DisruptorQueue disruptorQueue) {
        LOG.info("Registor inner transfer for executor thread of " + this.idStr);
        DisruptorQueue disruptorQueue2 = this.innerTaskTransfer.get(Integer.valueOf(this.taskId));
        if (disruptorQueue2 != null) {
            LOG.info("Exist inner task transfer for executing thread of " + this.idStr);
            if (disruptorQueue2 != disruptorQueue) {
                throw new RuntimeException("Inner task transfer must be only one in executing thread of " + this.idStr);
            }
        }
        this.innerTaskTransfer.put(Integer.valueOf(this.taskId), disruptorQueue);
    }

    protected void unregistorInnerTransfer() {
        LOG.info("Unregistor inner transfer for executor thread of " + this.idStr);
        this.innerTaskTransfer.remove(Integer.valueOf(this.taskId));
    }

    protected int getMinTaskIdOfWorker() {
        return ((Integer) new TreeSet(this.sysTopologyCtx.getThisWorkerTasks()).first()).intValue();
    }

    public Object getOutputCollector() {
        return null;
    }

    public TaskHeartbeatTrigger getTaskHbTrigger() {
        return this.taskHbTrigger;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public IConnection getConnection(int i) {
        IConnection iConnection = null;
        WorkerSlot workerSlot = this.task.getTaskNodeport().get(Integer.valueOf(i));
        if (workerSlot != null) {
            iConnection = this.task.getNodeportSocket().get(workerSlot);
        }
        return iConnection;
    }

    public void update(Map map) {
        for (Object obj : map.keySet()) {
            this.storm_conf.put(obj, map.get(obj));
        }
    }
}
