package com.alibaba.jstorm.task;

import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.scheduler.WorkerSlot;
import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.tuple.BatchTuple;
import backtype.storm.tuple.ITupleExt;
import backtype.storm.tuple.TupleExt;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.metric.JStormHealthCheck;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.backpressure.BackpressureController;
import com.alibaba.jstorm.utils.JStormUtils;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/task/TaskTransfer.class */
public class TaskTransfer {
    private static Logger LOG = LoggerFactory.getLogger(TaskTransfer.class);
    protected Map storm_conf;
    protected DisruptorQueue transferControlQueue;
    protected KryoTupleSerializer serializer;
    protected Map<Integer, DisruptorQueue> innerTaskTransfer;
    protected Map<Integer, DisruptorQueue> controlQueues;
    protected DisruptorQueue serializeQueue;
    protected final AsyncLoopThread serializeThread;
    protected volatile TaskStatus taskStatus;
    protected String taskName;
    protected AsmHistogram serializeTimer;
    protected Task task;
    protected String topolgyId;
    protected String componentId;
    protected int taskId;
    protected ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket;
    protected ConcurrentHashMap<Integer, WorkerSlot> taskNodeport;
    protected BackpressureController backpressureController;
    protected boolean isTopologyMaster;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/alibaba/jstorm/task/TaskTransfer$TransferRunnable.class */
    public class TransferRunnable extends RunnableCallback implements EventHandler {
        private AtomicBoolean shutdown = AsyncLoopRunnable.getShutdown();

        /* JADX INFO: Access modifiers changed from: protected */
        public TransferRunnable() {
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public String getThreadName() {
            return TaskTransfer.this.taskName + "-" + TransferRunnable.class.getSimpleName();
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public void preRun() {
            WorkerClassLoader.switchThreadContext();
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
        public void run() {
            while (!this.shutdown.get()) {
                TaskTransfer.this.serializeQueue.consumeBatchWhenAvailable(this);
            }
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public void postRun() {
            WorkerClassLoader.restoreThreadContext();
        }

        public byte[] serialize(ITupleExt iTupleExt) {
            return TaskTransfer.this.serializer.serialize((TupleExt) iTupleExt);
        }

        public void onEvent(Object obj, long j, boolean z) throws Exception {
            if (obj == null) {
                return;
            }
            long time = TaskTransfer.this.serializeTimer.getTime();
            try {
                ITupleExt iTupleExt = (ITupleExt) obj;
                int targetTaskId = iTupleExt.getTargetTaskId();
                IConnection connection = getConnection(targetTaskId);
                if (connection != null) {
                    connection.send(new TaskMessage(targetTaskId, serialize(iTupleExt)));
                }
            } finally {
                TaskTransfer.this.serializeTimer.updateTime(time);
            }
        }

        protected IConnection getConnection(int i) {
            IConnection iConnection = null;
            WorkerSlot workerSlot = TaskTransfer.this.taskNodeport.get(Integer.valueOf(i));
            if (workerSlot == null) {
                TaskTransfer.LOG.warn("Internal transfer warn, throw tuple,", new Exception("IConnection to " + i + " can't be found"));
            } else {
                iConnection = TaskTransfer.this.nodeportSocket.get(workerSlot);
                if (iConnection == null) {
                    TaskTransfer.LOG.warn("Internal transfer warn, throw tuple,", new Exception("NodePort to" + workerSlot + " can't be found"));
                }
            }
            return iConnection;
        }
    }

    public TaskTransfer(Task task, String str, KryoTupleSerializer kryoTupleSerializer, TaskStatus taskStatus, WorkerData workerData) {
        this.task = task;
        this.taskName = str;
        this.serializer = kryoTupleSerializer;
        this.taskStatus = taskStatus;
        this.storm_conf = workerData.getStormConf();
        this.transferControlQueue = workerData.getTransferCtrlQueue();
        this.innerTaskTransfer = workerData.getInnerTaskTransfer();
        this.controlQueues = workerData.getControlQueues();
        this.nodeportSocket = workerData.getNodeportSocket();
        this.taskNodeport = workerData.getTaskNodeport();
        this.topolgyId = workerData.getTopologyId();
        this.componentId = this.task.getComponentId();
        this.taskId = this.task.getTaskId().intValue();
        int intValue = Utils.getInt(this.storm_conf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE)).intValue();
        this.serializeQueue = DisruptorQueue.mkInstance(str, ProducerType.MULTI, intValue, (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(this.storm_conf));
        String substring = str.substring(str.indexOf(":") + 1);
        QueueGauge queueGauge = new QueueGauge(this.serializeQueue, str, MetricDef.SERIALIZE_QUEUE);
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topolgyId, this.componentId, this.taskId, MetricDef.SERIALIZE_QUEUE, MetricType.GAUGE), new AsmGauge(queueGauge));
        JStormHealthCheck.registerTaskHealthCheck(Integer.valueOf(substring).intValue(), MetricDef.SERIALIZE_QUEUE, queueGauge);
        this.serializeTimer = (AsmHistogram) JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topolgyId, this.componentId, this.taskId, MetricDef.SERIALIZE_TIME, MetricType.HISTOGRAM), new AsmHistogram());
        this.serializeThread = setupSerializeThread();
        this.backpressureController = new BackpressureController(this.storm_conf, task.getTaskId().intValue(), this.serializeQueue, intValue);
        this.isTopologyMaster = task.getTopologyContext().getTopologyMasterId() == task.getTaskId().intValue();
        LOG.info("Successfully start TaskTransfer thread");
    }

    public void transfer(TupleExt tupleExt) {
        int targetTaskId = tupleExt.getTargetTaskId();
        DisruptorQueue disruptorQueue = this.innerTaskTransfer.get(Integer.valueOf(targetTaskId));
        if (disruptorQueue != null) {
            disruptorQueue.publish(tupleExt);
        } else {
            push(targetTaskId, tupleExt);
        }
        if (this.backpressureController.isBackpressureMode()) {
            this.backpressureController.flowControl();
        }
    }

    public void transferControl(TupleExt tupleExt) {
        DisruptorQueue disruptorQueue = this.controlQueues.get(Integer.valueOf(tupleExt.getTargetTaskId()));
        if (disruptorQueue != null) {
            disruptorQueue.publish(tupleExt);
        } else {
            this.transferControlQueue.publish(tupleExt);
        }
    }

    public void transferControl(BatchTuple batchTuple) {
        LOG.error("It is not allowed to send batch here!");
    }

    public void transfer(BatchTuple batchTuple) {
        LOG.error("It is not allowed to send batch here!");
    }

    public void push(int i, TupleExt tupleExt) {
        this.serializeQueue.publish(tupleExt);
    }

    protected AsyncLoopThread setupSerializeThread() {
        return new AsyncLoopThread(new TransferRunnable());
    }

    public AsyncLoopThread getSerializeThread() {
        return this.serializeThread;
    }

    public BackpressureController getBackpressureController() {
        return this.backpressureController;
    }
}
