/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task;

import backtype.storm.Config;
import backtype.storm.hooks.ITaskHook;
import backtype.storm.messaging.IContext;
import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.spout.ISpout;
import backtype.storm.task.IBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import clojure.lang.Atom;
import com.alibaba.jstorm.callback.AsyncLoopDefaultKill;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.task.TaskBaseMetric;
import com.alibaba.jstorm.task.TaskBatchReceiver;
import com.alibaba.jstorm.task.TaskBatchTransfer;
import com.alibaba.jstorm.task.TaskReceiver;
import com.alibaba.jstorm.task.TaskShutdownDameon;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.UptimeComputer;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.UnanchoredSend;
import com.alibaba.jstorm.task.error.TaskReportError;
import com.alibaba.jstorm.task.error.TaskReportErrorAndDie;
import com.alibaba.jstorm.task.execute.BaseExecutors;
import com.alibaba.jstorm.task.execute.BoltExecutors;
import com.alibaba.jstorm.task.execute.spout.MultipleThreadSpoutExecutors;
import com.alibaba.jstorm.task.execute.spout.SingleThreadSpoutExecutors;
import com.alibaba.jstorm.task.execute.spout.SpoutExecutors;
import com.alibaba.jstorm.task.group.MkGrouper;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Task
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    private Map<Object, Object> stormConf;
    private TopologyContext topologyContext;
    private TopologyContext userContext;
    private IContext context;
    private TaskTransfer taskTransfer;
    private TaskReceiver taskReceiver;
    private Map<Integer, DisruptorQueue> innerTaskTransfer;
    private Map<Integer, DisruptorQueue> deserializeQueues;
    private Map<Integer, DisruptorQueue> controlQueues;
    private AsyncLoopDefaultKill workHalt;
    private String topologyId;
    private Integer taskId;
    private String componentId;
    private volatile TaskStatus taskStatus;
    private Atom openOrPrepareWasCalled;
    private UptimeComputer uptime = new UptimeComputer();
    private StormClusterState zkCluster;
    private Object taskObj;
    private TaskBaseMetric taskStats;
    private WorkerData workerData;
    private TaskSendTargets taskSendTargets;
    private TaskReportErrorAndDie reportErrorDie;
    private boolean isTaskBatchTuple;
    private TaskShutdownDameon taskShutdownDameon;

    public Task(WorkerData workerData, int taskId) throws Exception {
        this.openOrPrepareWasCalled = new Atom((Object)false);
        this.workerData = workerData;
        this.topologyContext = workerData.getContextMaker().makeTopologyContext(workerData.getSysTopology(), taskId, this.openOrPrepareWasCalled);
        this.userContext = workerData.getContextMaker().makeTopologyContext(workerData.getRawTopology(), taskId, this.openOrPrepareWasCalled);
        this.taskId = taskId;
        this.componentId = this.topologyContext.getThisComponentId();
        this.stormConf = Common.component_conf(workerData.getStormConf(), this.topologyContext, this.componentId);
        this.taskStatus = new TaskStatus();
        this.innerTaskTransfer = workerData.getInnerTaskTransfer();
        this.deserializeQueues = workerData.getDeserializeQueues();
        this.controlQueues = workerData.getControlQueues();
        this.topologyId = workerData.getTopologyId();
        this.context = workerData.getContext();
        this.workHalt = workerData.getWorkHalt();
        this.zkCluster = workerData.getZkCluster();
        this.taskStats = new TaskBaseMetric(this.topologyId, this.componentId, taskId);
        List listHooks = Config.getTopologyAutoTaskHooks(this.stormConf);
        for (String hook : listHooks) {
            ITaskHook iTaskHook = (ITaskHook)Utils.newInstance(hook);
            this.userContext.addTaskHook(iTaskHook);
        }
        LOG.info("Begin to deserialize taskObj " + this.componentId + ":" + this.taskId);
        WorkerClassLoader.switchThreadContext();
        this.taskObj = Common.get_task_object(this.topologyContext.getRawTopology(), this.componentId, WorkerClassLoader.getInstance());
        WorkerClassLoader.restoreThreadContext();
        this.isTaskBatchTuple = ConfigExtension.isTaskBatchTuple(this.stormConf);
        LOG.info("Transfer/receive in batch mode :" + this.isTaskBatchTuple);
        LOG.info("Loading task " + this.componentId + ":" + this.taskId);
    }

    private TaskSendTargets makeSendTargets() {
        String component = this.topologyContext.getThisComponentId();
        Map<String, Map<String, MkGrouper>> streamComponentGrouper = Common.outbound_components(this.topologyContext, this.workerData);
        return new TaskSendTargets(this.stormConf, component, streamComponentGrouper, this.topologyContext, this.taskStats);
    }

    private void updateSendTargets() {
        if (this.taskSendTargets != null) {
            Map<String, Map<String, MkGrouper>> streamComponentGrouper = Common.outbound_components(this.topologyContext, this.workerData);
            this.taskSendTargets.updateStreamCompGrouper(streamComponentGrouper);
        } else {
            LOG.error("taskSendTargets is null when trying to update it.");
        }
    }

    public TaskSendTargets echoToSystemBolt() {
        ArrayList<Object> msg = new ArrayList<Object>();
        msg.add("startup");
        TaskSendTargets sendTargets = this.makeSendTargets();
        if (this.isTaskBatchTuple) {
            UnanchoredSend.sendBatch(this.topologyContext, sendTargets, this.taskTransfer, "__system", msg);
        } else {
            UnanchoredSend.send(this.topologyContext, sendTargets, this.taskTransfer, "__system", msg);
        }
        return sendTargets;
    }

    public boolean isSingleThread(Map conf) {
        boolean isOnePending = JStormServerUtils.isOnePending(conf);
        if (isOnePending) {
            return true;
        }
        return ConfigExtension.isSpoutSingleThread(conf);
    }

    public BaseExecutors mkExecutor() {
        BaseExecutors baseExecutor = null;
        if (this.taskObj instanceof IBolt) {
            baseExecutor = new BoltExecutors(this);
        } else if (this.taskObj instanceof ISpout) {
            baseExecutor = this.isSingleThread(this.stormConf) ? new SingleThreadSpoutExecutors(this) : new MultipleThreadSpoutExecutors(this);
        }
        return baseExecutor;
    }

    private RunnableCallback prepareExecutor() {
        TaskReportError reportError = new TaskReportError(this.zkCluster, this.topologyId, this.taskId);
        this.reportErrorDie = new TaskReportErrorAndDie(reportError, this.workHalt);
        BaseExecutors baseExecutor = this.mkExecutor();
        return baseExecutor;
    }

    public TaskReceiver mkTaskReceiver() {
        String taskName = JStormServerUtils.getName(this.componentId, this.taskId);
        this.taskReceiver = this.isTaskBatchTuple ? new TaskBatchReceiver(this, this.taskId, this.stormConf, this.topologyContext, this.innerTaskTransfer, this.taskStatus, taskName) : new TaskReceiver(this, this.taskId, this.stormConf, this.topologyContext, this.innerTaskTransfer, this.taskStatus, taskName);
        this.deserializeQueues.put(this.taskId, this.taskReceiver.getDeserializeQueue());
        return this.taskReceiver;
    }

    public TaskShutdownDameon execute() throws Exception {
        this.taskSendTargets = this.echoToSystemBolt();
        this.taskTransfer = this.mkTaskSending(this.workerData);
        RunnableCallback baseExecutor = this.prepareExecutor();
        AsyncLoopThread executor_threads = new AsyncLoopThread(baseExecutor, false, 10, true);
        this.taskReceiver = this.mkTaskReceiver();
        ArrayList<AsyncLoopThread> allThreads = new ArrayList<AsyncLoopThread>();
        allThreads.add(executor_threads);
        LOG.info("Finished loading task " + this.componentId + ":" + this.taskId);
        this.taskShutdownDameon = this.getShutdown(allThreads, this.taskReceiver.getDeserializeQueue(), baseExecutor);
        return this.taskShutdownDameon;
    }

    private TaskTransfer mkTaskSending(WorkerData workerData) {
        KryoTupleSerializer serializer = new KryoTupleSerializer(workerData.getStormConf(), this.topologyContext);
        String taskName = JStormServerUtils.getName(this.componentId, this.taskId);
        TaskTransfer taskTransfer = this.isTaskBatchTuple ? new TaskBatchTransfer(this, taskName, serializer, this.taskStatus, workerData) : new TaskTransfer(this, taskName, serializer, this.taskStatus, workerData);
        return taskTransfer;
    }

    public TaskShutdownDameon getShutdown(List<AsyncLoopThread> allThreads, DisruptorQueue deserializeQueue, RunnableCallback baseExecutor) {
        AsyncLoopThread ackerThread = null;
        if (baseExecutor instanceof SpoutExecutors && (ackerThread = ((SpoutExecutors)baseExecutor).getAckerRunnableThread()) != null) {
            allThreads.add(ackerThread);
        }
        AsyncLoopThread recvThread = this.taskReceiver.getDeserializeThread();
        allThreads.add(recvThread);
        AsyncLoopThread serializeThread = this.taskTransfer.getSerializeThread();
        allThreads.add(serializeThread);
        TaskShutdownDameon shutdown = new TaskShutdownDameon(this.taskStatus, this.topologyId, this.taskId, allThreads, this.zkCluster, this.taskObj, this);
        return shutdown;
    }

    public TaskShutdownDameon getTaskShutdownDameon() {
        return this.taskShutdownDameon;
    }

    @Override
    public void run() {
        try {
            this.taskShutdownDameon = this.execute();
        }
        catch (Throwable e) {
            LOG.error("init task take error", e);
            if (this.reportErrorDie != null) {
                this.reportErrorDie.report(e);
            }
            throw new RuntimeException(e);
        }
    }

    public static TaskShutdownDameon mk_task(WorkerData workerData, int taskId) throws Exception {
        Task t = new Task(workerData, taskId);
        return t.execute();
    }

    public void updateTaskData() {
        List<Integer> localTasks = JStormUtils.mk_list(this.workerData.getTaskids());
        this.topologyContext.setThisWorkerTasks(localTasks);
        this.userContext.setThisWorkerTasks(localTasks);
        this.updateSendTargets();
    }

    public long getWorkerAssignmentTs() {
        return this.workerData.getAssignmentTs();
    }

    public Assignment.AssignmentType getWorkerAssignmentType() {
        return this.workerData.getAssignmentType();
    }

    public void unregisterDeserializeQueue() {
        this.deserializeQueues.remove(this.taskId);
    }

    public String getComponentId() {
        return this.componentId;
    }

    public Integer getTaskId() {
        return this.taskId;
    }

    public DisruptorQueue getExecuteQueue() {
        return this.innerTaskTransfer.get(this.taskId);
    }

    public DisruptorQueue getDeserializeQueue() {
        return this.deserializeQueues.get(this.taskId);
    }

    public Map<Object, Object> getStormConf() {
        return this.stormConf;
    }

    public TopologyContext getTopologyContext() {
        return this.topologyContext;
    }

    public TopologyContext getUserContext() {
        return this.userContext;
    }

    public TaskTransfer getTaskTransfer() {
        return this.taskTransfer;
    }

    public TaskReceiver getTaskReceiver() {
        return this.taskReceiver;
    }

    public Map<Integer, DisruptorQueue> getInnerTaskTransfer() {
        return this.innerTaskTransfer;
    }

    public Map<Integer, DisruptorQueue> getDeserializeQueues() {
        return this.deserializeQueues;
    }

    public Map<Integer, DisruptorQueue> getControlQueues() {
        return this.controlQueues;
    }

    public String getTopologyId() {
        return this.topologyId;
    }

    public TaskStatus getTaskStatus() {
        return this.taskStatus;
    }

    public StormClusterState getZkCluster() {
        return this.zkCluster;
    }

    public Object getTaskObj() {
        return this.taskObj;
    }

    public TaskBaseMetric getTaskStats() {
        return this.taskStats;
    }

    public WorkerData getWorkerData() {
        return this.workerData;
    }

    public TaskSendTargets getTaskSendTargets() {
        return this.taskSendTargets;
    }

    public TaskReportErrorAndDie getReportErrorDie() {
        return this.reportErrorDie;
    }

    public boolean isTaskBatchTuple() {
        return this.isTaskBatchTuple;
    }
}

