package com.alibaba.jstorm.task;

import backtype.storm.Config;
import backtype.storm.hooks.ITaskHook;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;
import backtype.storm.scheduler.WorkerSlot;
import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.spout.ISpout;
import backtype.storm.task.IBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import clojure.lang.Atom;
import com.alibaba.jstorm.callback.AsyncLoopDefaultKill;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.UnanchoredSend;
import com.alibaba.jstorm.task.error.TaskReportError;
import com.alibaba.jstorm.task.error.TaskReportErrorAndDie;
import com.alibaba.jstorm.task.execute.BaseExecutors;
import com.alibaba.jstorm.task.execute.BoltExecutors;
import com.alibaba.jstorm.task.execute.spout.MultipleThreadSpoutExecutors;
import com.alibaba.jstorm.task.execute.spout.SingleThreadSpoutExecutors;
import com.alibaba.jstorm.task.execute.spout.SpoutExecutors;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.org.apache.thrift.protocol.TMultiplexedProtocol;

/* loaded from: input_file:com/alibaba/jstorm/task/Task.class */
public class Task implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    private Map<Object, Object> stormConf;
    private TopologyContext topologyContext;
    private TopologyContext userContext;
    private IContext context;
    private TaskTransfer taskTransfer;
    private TaskReceiver taskReceiver;
    private Map<Integer, DisruptorQueue> innerTaskTransfer;
    private Map<Integer, DisruptorQueue> deserializeQueues;
    private Map<Integer, DisruptorQueue> controlQueues;
    private AsyncLoopDefaultKill workHalt;
    private String topologyId;
    private Integer taskId;
    private String componentId;
    private StormClusterState zkCluster;
    private Object taskObj;
    private TaskBaseMetric taskStats;
    private WorkerData workerData;
    private TaskSendTargets taskSendTargets;
    private TaskReportErrorAndDie reportErrorDie;
    private boolean isTaskBatchTuple;
    private TaskShutdownDameon taskShutdownDameon;
    private ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket;
    private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport;
    private BaseExecutors baseExecutors;
    private UptimeComputer uptime = new UptimeComputer();
    private Atom openOrPrepareWasCalled = new Atom(false);
    private volatile TaskStatus taskStatus = new TaskStatus();

    public Task(WorkerData workerData, int i) throws Exception {
        this.workerData = workerData;
        this.topologyContext = workerData.getContextMaker().makeTopologyContext(workerData.getSysTopology(), Integer.valueOf(i), this.openOrPrepareWasCalled);
        this.userContext = workerData.getContextMaker().makeTopologyContext(workerData.getRawTopology(), Integer.valueOf(i), this.openOrPrepareWasCalled);
        this.taskId = Integer.valueOf(i);
        this.componentId = this.topologyContext.getThisComponentId();
        this.stormConf = Common.component_conf(workerData.getStormConf(), this.topologyContext, this.componentId);
        this.innerTaskTransfer = workerData.getInnerTaskTransfer();
        this.deserializeQueues = workerData.getDeserializeQueues();
        this.controlQueues = workerData.getControlQueues();
        this.topologyId = workerData.getTopologyId();
        this.context = workerData.getContext();
        this.workHalt = workerData.getWorkHalt();
        this.zkCluster = workerData.getZkCluster();
        this.nodeportSocket = workerData.getNodeportSocket();
        this.taskNodeport = workerData.getTaskNodeport();
        this.reportErrorDie = new TaskReportErrorAndDie(new TaskReportError(this.zkCluster, this.topologyId, i), this.workHalt);
        this.taskStats = new TaskBaseMetric(this.topologyId, this.componentId, i);
        Iterator it = Config.getTopologyAutoTaskHooks(this.stormConf).iterator();
        while (it.hasNext()) {
            this.userContext.addTaskHook((ITaskHook) Utils.newInstance((String) it.next()));
        }
        LOG.info("Begin to deserialize taskObj " + this.componentId + TMultiplexedProtocol.SEPARATOR + this.taskId);
        try {
            WorkerClassLoader.switchThreadContext();
            this.taskObj = Common.get_task_object(this.topologyContext.getRawTopology(), this.componentId, WorkerClassLoader.getInstance());
            WorkerClassLoader.restoreThreadContext();
        } catch (Exception e) {
            if (this.reportErrorDie == null) {
                throw e;
            }
            this.reportErrorDie.report(e);
        }
        this.isTaskBatchTuple = ConfigExtension.isTaskBatchTuple(this.stormConf).booleanValue();
        LOG.info("Transfer/receive in batch mode :" + this.isTaskBatchTuple);
        LOG.info("Loading task " + this.componentId + TMultiplexedProtocol.SEPARATOR + this.taskId);
    }

    private TaskSendTargets makeSendTargets() {
        return new TaskSendTargets(this.stormConf, this.topologyContext.getThisComponentId(), Common.outbound_components(this.topologyContext, this.workerData), this.topologyContext, this.taskStats);
    }

    private void updateSendTargets() {
        if (this.taskSendTargets == null) {
            LOG.error("taskSendTargets is null when trying to update it.");
        } else {
            this.taskSendTargets.updateStreamCompGrouper(Common.outbound_components(this.topologyContext, this.workerData));
        }
    }

    public TaskSendTargets echoToSystemBolt() {
        ArrayList arrayList = new ArrayList();
        arrayList.add("startup");
        TaskSendTargets makeSendTargets = makeSendTargets();
        UnanchoredSend.send(this.topologyContext, makeSendTargets, this.taskTransfer, "__system", arrayList);
        return makeSendTargets;
    }

    public boolean isSingleThread(Map map) {
        if (JStormServerUtils.isOnePending(map)) {
            return true;
        }
        return ConfigExtension.isSpoutSingleThread(map);
    }

    public BaseExecutors mkExecutor() {
        BaseExecutors baseExecutors = null;
        if (this.taskObj instanceof IBolt) {
            baseExecutors = new BoltExecutors(this);
        } else if (this.taskObj instanceof ISpout) {
            baseExecutors = isSingleThread(this.stormConf) ? new SingleThreadSpoutExecutors(this) : new MultipleThreadSpoutExecutors(this);
        }
        return baseExecutors;
    }

    private RunnableCallback prepareExecutor() {
        return mkExecutor();
    }

    public TaskReceiver mkTaskReceiver() {
        this.taskReceiver = new TaskReceiver(this, this.taskId.intValue(), this.stormConf, this.topologyContext, this.innerTaskTransfer, this.taskStatus, JStormServerUtils.getName(this.componentId, this.taskId.intValue()));
        this.deserializeQueues.put(this.taskId, this.taskReceiver.getDeserializeQueue());
        return this.taskReceiver;
    }

    public TaskShutdownDameon execute() throws Exception {
        this.taskSendTargets = echoToSystemBolt();
        this.taskTransfer = mkTaskSending(this.workerData);
        RunnableCallback prepareExecutor = prepareExecutor();
        setBaseExecutors((BaseExecutors) prepareExecutor);
        AsyncLoopThread asyncLoopThread = new AsyncLoopThread(prepareExecutor, false, 10, true);
        this.taskReceiver = mkTaskReceiver();
        ArrayList arrayList = new ArrayList();
        arrayList.add(asyncLoopThread);
        LOG.info("Finished loading task " + this.componentId + TMultiplexedProtocol.SEPARATOR + this.taskId);
        this.taskShutdownDameon = getShutdown(arrayList, prepareExecutor);
        return this.taskShutdownDameon;
    }

    private TaskTransfer mkTaskSending(WorkerData workerData) {
        return new TaskTransfer(this, JStormServerUtils.getName(this.componentId, this.taskId.intValue()), new KryoTupleSerializer(workerData.getStormConf(), this.topologyContext.getRawTopology()), this.taskStatus, workerData, this.topologyContext);
    }

    public TaskShutdownDameon getShutdown(List<AsyncLoopThread> list, RunnableCallback runnableCallback) {
        AsyncLoopThread ackerRunnableThread;
        if ((runnableCallback instanceof SpoutExecutors) && (ackerRunnableThread = ((SpoutExecutors) runnableCallback).getAckerRunnableThread()) != null) {
            list.add(ackerRunnableThread);
        }
        Iterator<AsyncLoopThread> it = this.taskReceiver.getDeserializeThread().iterator();
        while (it.hasNext()) {
            list.add(it.next());
        }
        list.addAll(this.taskTransfer.getSerializeThreads());
        return new TaskShutdownDameon(this.taskStatus, this.topologyId, this.taskId, list, this.zkCluster, this.taskObj, this, ((BaseExecutors) runnableCallback).getTaskHbTrigger());
    }

    public TaskShutdownDameon getTaskShutdownDameon() {
        return this.taskShutdownDameon;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.taskShutdownDameon = execute();
        } catch (Throwable th) {
            LOG.error("init task take error", th);
            if (this.reportErrorDie == null) {
                throw new RuntimeException(th);
            }
            this.reportErrorDie.report(th);
        }
    }

    public static TaskShutdownDameon mk_task(WorkerData workerData, int i) throws Exception {
        return new Task(workerData, i).execute();
    }

    public void updateTaskData() {
        List<Integer> mk_list = JStormUtils.mk_list((Set) this.workerData.getTaskids());
        this.topologyContext.setThisWorkerTasks(mk_list);
        this.userContext.setThisWorkerTasks(mk_list);
        updateSendTargets();
    }

    public long getWorkerAssignmentTs() {
        return this.workerData.getAssignmentTs().longValue();
    }

    public Assignment.AssignmentType getWorkerAssignmentType() {
        return this.workerData.getAssignmentType();
    }

    public void unregisterDeserializeQueue() {
        this.deserializeQueues.remove(this.taskId);
    }

    public String getComponentId() {
        return this.componentId;
    }

    public Integer getTaskId() {
        return this.taskId;
    }

    public DisruptorQueue getExecuteQueue() {
        return this.innerTaskTransfer.get(this.taskId);
    }

    public DisruptorQueue getDeserializeQueue() {
        return this.deserializeQueues.get(this.taskId);
    }

    public Map<Object, Object> getStormConf() {
        return this.stormConf;
    }

    public TopologyContext getTopologyContext() {
        return this.topologyContext;
    }

    public TopologyContext getUserContext() {
        return this.userContext;
    }

    public TaskTransfer getTaskTransfer() {
        return this.taskTransfer;
    }

    public TaskReceiver getTaskReceiver() {
        return this.taskReceiver;
    }

    public Map<Integer, DisruptorQueue> getInnerTaskTransfer() {
        return this.innerTaskTransfer;
    }

    public Map<Integer, DisruptorQueue> getDeserializeQueues() {
        return this.deserializeQueues;
    }

    public Map<Integer, DisruptorQueue> getControlQueues() {
        return this.controlQueues;
    }

    public String getTopologyId() {
        return this.topologyId;
    }

    public TaskStatus getTaskStatus() {
        return this.taskStatus;
    }

    public StormClusterState getZkCluster() {
        return this.zkCluster;
    }

    public Object getTaskObj() {
        return this.taskObj;
    }

    public TaskBaseMetric getTaskStats() {
        return this.taskStats;
    }

    public WorkerData getWorkerData() {
        return this.workerData;
    }

    public TaskSendTargets getTaskSendTargets() {
        return this.taskSendTargets;
    }

    public TaskReportErrorAndDie getReportErrorDie() {
        return this.reportErrorDie;
    }

    public boolean isTaskBatchTuple() {
        return this.isTaskBatchTuple;
    }

    public ConcurrentHashMap<WorkerSlot, IConnection> getNodeportSocket() {
        return this.nodeportSocket;
    }

    public ConcurrentHashMap<Integer, WorkerSlot> getTaskNodeport() {
        return this.taskNodeport;
    }

    public BaseExecutors getBaseExecutors() {
        return this.baseExecutors;
    }

    public void setBaseExecutors(BaseExecutors baseExecutors) {
        this.baseExecutors = baseExecutors;
    }
}
