/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.scheduler.WorkerSlot;
import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.tuple.BatchTuple;
import backtype.storm.tuple.ITupleExt;
import backtype.storm.tuple.TupleExt;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.metric.JStormHealthCheck;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.task.backpressure.BackpressureController;
import com.alibaba.jstorm.utils.JStormUtils;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskTransfer {
    private static Logger LOG = LoggerFactory.getLogger(TaskTransfer.class);
    protected Map storm_conf;
    protected DisruptorQueue transferControlQueue;
    protected KryoTupleSerializer serializer;
    protected Map<Integer, DisruptorQueue> innerTaskTransfer;
    protected Map<Integer, DisruptorQueue> controlQueues;
    protected DisruptorQueue serializeQueue;
    protected final AsyncLoopThread serializeThread;
    protected volatile TaskStatus taskStatus;
    protected String taskName;
    protected AsmHistogram serializeTimer;
    protected Task task;
    protected String topolgyId;
    protected String componentId;
    protected int taskId;
    protected ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket;
    protected ConcurrentHashMap<Integer, WorkerSlot> taskNodeport;
    protected BackpressureController backpressureController;
    protected boolean isTopologyMaster;

    public TaskTransfer(Task task, String taskName, KryoTupleSerializer serializer, TaskStatus taskStatus, WorkerData workerData) {
        this.task = task;
        this.taskName = taskName;
        this.serializer = serializer;
        this.taskStatus = taskStatus;
        this.storm_conf = workerData.getStormConf();
        this.transferControlQueue = workerData.getTransferCtrlQueue();
        this.innerTaskTransfer = workerData.getInnerTaskTransfer();
        this.controlQueues = workerData.getControlQueues();
        this.nodeportSocket = workerData.getNodeportSocket();
        this.taskNodeport = workerData.getTaskNodeport();
        this.topolgyId = workerData.getTopologyId();
        this.componentId = this.task.getComponentId();
        this.taskId = this.task.getTaskId();
        int queue_size = Utils.getInt(this.storm_conf.get("topology.executor.send.buffer.size"));
        WaitStrategy waitStrategy = (WaitStrategy)JStormUtils.createDisruptorWaitStrategy(this.storm_conf);
        this.serializeQueue = DisruptorQueue.mkInstance(taskName, ProducerType.MULTI, queue_size, waitStrategy);
        String taskId = taskName.substring(taskName.indexOf(":") + 1);
        QueueGauge serializeQueueGauge = new QueueGauge(this.serializeQueue, taskName, "SerializeQueue");
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topolgyId, this.componentId, this.taskId, "SerializeQueue", MetricType.GAUGE), new AsmGauge(serializeQueueGauge));
        JStormHealthCheck.registerTaskHealthCheck(Integer.valueOf(taskId), "SerializeQueue", serializeQueueGauge);
        this.serializeTimer = (AsmHistogram)JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topolgyId, this.componentId, this.taskId, "SerializeTime", MetricType.HISTOGRAM), new AsmHistogram());
        this.serializeThread = this.setupSerializeThread();
        this.backpressureController = new BackpressureController(this.storm_conf, task.getTaskId(), this.serializeQueue, queue_size);
        this.isTopologyMaster = task.getTopologyContext().getTopologyMasterId() == task.getTaskId().intValue();
        LOG.info("Successfully start TaskTransfer thread");
    }

    public void transfer(TupleExt tuple) {
        int taskId = tuple.getTargetTaskId();
        DisruptorQueue exeQueue = this.innerTaskTransfer.get(taskId);
        if (exeQueue != null) {
            exeQueue.publish(tuple);
        } else {
            this.push(taskId, tuple);
        }
        if (this.backpressureController.isBackpressureMode()) {
            this.backpressureController.flowControl();
        }
    }

    public void transferControl(TupleExt tuple) {
        int taskId = tuple.getTargetTaskId();
        DisruptorQueue controlQueue = this.controlQueues.get(taskId);
        if (controlQueue != null) {
            controlQueue.publish(tuple);
        } else {
            this.transferControlQueue.publish(tuple);
        }
    }

    public void transferControl(BatchTuple batch) {
        LOG.error("It is not allowed to send batch here!");
    }

    public void transfer(BatchTuple batch) {
        LOG.error("It is not allowed to send batch here!");
    }

    public void push(int taskId, TupleExt tuple) {
        this.serializeQueue.publish(tuple);
    }

    protected AsyncLoopThread setupSerializeThread() {
        return new AsyncLoopThread(new TransferRunnable());
    }

    public AsyncLoopThread getSerializeThread() {
        return this.serializeThread;
    }

    public BackpressureController getBackpressureController() {
        return this.backpressureController;
    }

    protected class TransferRunnable
    extends RunnableCallback
    implements EventHandler {
        private AtomicBoolean shutdown = AsyncLoopRunnable.getShutdown();

        protected TransferRunnable() {
        }

        @Override
        public String getThreadName() {
            return TaskTransfer.this.taskName + "-" + TransferRunnable.class.getSimpleName();
        }

        @Override
        public void preRun() {
            WorkerClassLoader.switchThreadContext();
        }

        @Override
        public void run() {
            while (!this.shutdown.get()) {
                TaskTransfer.this.serializeQueue.consumeBatchWhenAvailable(this);
            }
        }

        @Override
        public void postRun() {
            WorkerClassLoader.restoreThreadContext();
        }

        public byte[] serialize(ITupleExt tuple) {
            return TaskTransfer.this.serializer.serialize((TupleExt)tuple);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
            if (event == null) {
                return;
            }
            long start = TaskTransfer.this.serializeTimer.getTime();
            try {
                ITupleExt tuple = (ITupleExt)event;
                int taskid = tuple.getTargetTaskId();
                IConnection conn = this.getConnection(taskid);
                if (conn != null) {
                    byte[] tupleMessage = this.serialize(tuple);
                    TaskMessage taskMessage = new TaskMessage(taskid, tupleMessage);
                    conn.send(taskMessage);
                }
            }
            finally {
                TaskTransfer.this.serializeTimer.updateTime(start);
            }
        }

        protected IConnection getConnection(int taskId) {
            IConnection conn = null;
            WorkerSlot nodePort = TaskTransfer.this.taskNodeport.get(taskId);
            if (nodePort == null) {
                String errormsg = "IConnection to " + taskId + " can't be found";
                LOG.warn("Internal transfer warn, throw tuple,", (Throwable)new Exception(errormsg));
            } else {
                conn = TaskTransfer.this.nodeportSocket.get(nodePort);
                if (conn == null) {
                    String errormsg = "NodePort to" + nodePort + " can't be found";
                    LOG.warn("Internal transfer warn, throw tuple,", (Throwable)new Exception(errormsg));
                }
            }
            return conn;
        }
    }
}

