package com.alibaba.jstorm.task;

import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.scheduler.WorkerSlot;
import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.tuple.ITupleExt;
import backtype.storm.tuple.TupleExt;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.metric.JStormHealthCheck;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.com.lmax.disruptor.AlertException;
import shade.storm.com.lmax.disruptor.EventHandler;
import shade.storm.com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import shade.storm.com.lmax.disruptor.TimeoutException;
import shade.storm.com.lmax.disruptor.dsl.ProducerType;
import shade.storm.org.apache.commons.cli.HelpFormatter;
import shade.storm.org.apache.thrift.protocol.TMultiplexedProtocol;

/* loaded from: input_file:com/alibaba/jstorm/task/TaskTransfer.class */
public class TaskTransfer {
    private static Logger LOG = LoggerFactory.getLogger(TaskTransfer.class);
    protected Map stormConf;
    protected DisruptorQueue transferControlQueue;
    protected Map<Integer, DisruptorQueue> innerTaskTransfer;
    protected Map<Integer, DisruptorQueue> controlQueues;
    protected DisruptorQueue serializeQueue;
    protected volatile TaskStatus taskStatus;
    protected String taskName;
    protected AsmHistogram serializeTimer;
    protected Task task;
    protected String topolgyId;
    protected String componentId;
    protected int taskId;
    protected ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket;
    protected ConcurrentHashMap<Integer, WorkerSlot> taskNodeport;
    protected boolean isTopologyMaster;
    protected boolean isBackpressureEnable;
    protected float highMark;
    protected float lowMark;
    protected Map<Integer, Boolean> targetTaskBackpressureStatus;
    protected int serializeThreadNum;
    protected final List<AsyncLoopThread> serializeThreads = new ArrayList();
    protected final GeneralTopologyContext topologyContext;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/alibaba/jstorm/task/TaskTransfer$TransferRunnable.class */
    public class TransferRunnable extends RunnableCallback implements EventHandler {
        private AtomicBoolean shutdown = AsyncLoopRunnable.getShutdown();
        private int threadIndex;
        protected KryoTupleSerializer serializer;

        public TransferRunnable(int i) {
            this.threadIndex = i;
            this.serializer = new KryoTupleSerializer(TaskTransfer.this.stormConf, TaskTransfer.this.topologyContext.getRawTopology());
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public String getThreadName() {
            return TaskTransfer.this.taskName + HelpFormatter.DEFAULT_OPT_PREFIX + TransferRunnable.class.getSimpleName() + HelpFormatter.DEFAULT_OPT_PREFIX + this.threadIndex;
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public void preRun() {
            WorkerClassLoader.switchThreadContext();
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
        public void run() {
            while (!this.shutdown.get()) {
                TaskTransfer.this.serializeQueue.multiConsumeBatchWhenAvailable(this);
            }
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public void postRun() {
            WorkerClassLoader.restoreThreadContext();
        }

        @Override // shade.storm.com.lmax.disruptor.EventHandler
        public void onEvent(Object obj, long j, boolean z) throws Exception {
            if (obj == null) {
                return;
            }
            TaskTransfer.this.serialize(this.serializer, obj);
        }
    }

    public TaskTransfer(Task task, String str, KryoTupleSerializer kryoTupleSerializer, TaskStatus taskStatus, WorkerData workerData, GeneralTopologyContext generalTopologyContext) {
        this.task = task;
        this.taskName = str;
        this.taskStatus = taskStatus;
        this.stormConf = task.getStormConf();
        this.transferControlQueue = workerData.getTransferCtrlQueue();
        this.innerTaskTransfer = workerData.getInnerTaskTransfer();
        this.controlQueues = workerData.getControlQueues();
        this.nodeportSocket = workerData.getNodeportSocket();
        this.taskNodeport = workerData.getTaskNodeport();
        this.topolgyId = workerData.getTopologyId();
        this.componentId = this.task.getComponentId();
        this.taskId = this.task.getTaskId().intValue();
        this.topologyContext = generalTopologyContext;
        this.serializeQueue = DisruptorQueue.mkInstance(str, ProducerType.MULTI, Utils.getInt(this.stormConf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE)).intValue(), new TimeoutBlockingWaitStrategy(JStormUtils.parseLong(this.stormConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10L).longValue(), TimeUnit.MILLISECONDS), ConfigExtension.isDisruptorQueueBatchMode(this.stormConf).booleanValue(), ConfigExtension.getDisruptorBufferSize(this.stormConf), ConfigExtension.getDisruptorBufferFlushMs(this.stormConf));
        this.serializeThreadNum = ConfigExtension.getTaskSerializeThreadNum(workerData.getStormConf()).intValue();
        setupSerializeThread();
        String substring = str.substring(str.indexOf(TMultiplexedProtocol.SEPARATOR) + 1);
        QueueGauge queueGauge = new QueueGauge(this.serializeQueue, str, MetricDef.SERIALIZE_QUEUE);
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topolgyId, this.componentId, this.taskId, MetricDef.SERIALIZE_QUEUE, MetricType.GAUGE), new AsmGauge(queueGauge));
        JStormHealthCheck.registerTaskHealthCheck(Integer.valueOf(substring).intValue(), MetricDef.SERIALIZE_QUEUE, queueGauge);
        AsmHistogram asmHistogram = new AsmHistogram();
        asmHistogram.setAggregate(false);
        this.serializeTimer = (AsmHistogram) JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(this.topolgyId, this.componentId, this.taskId, MetricDef.SERIALIZE_TIME, MetricType.HISTOGRAM), asmHistogram);
        this.isTopologyMaster = task.getTopologyContext().getTopologyMasterId() == task.getTaskId().intValue();
        this.isBackpressureEnable = ConfigExtension.isBackpressureEnable(this.stormConf);
        this.highMark = (float) ConfigExtension.getBackpressureWaterMarkHigh(this.stormConf);
        this.lowMark = (float) ConfigExtension.getBackpressureWaterMarkLow(this.stormConf);
        this.targetTaskBackpressureStatus = new ConcurrentHashMap();
        Iterator<Integer> it = this.innerTaskTransfer.keySet().iterator();
        while (it.hasNext()) {
            this.targetTaskBackpressureStatus.put(it.next(), false);
        }
        this.targetTaskBackpressureStatus.put(0, false);
        LOG.info("Successfully start TaskTransfer thread");
    }

    protected void setupSerializeThread() {
        for (int i = 0; i < this.serializeThreadNum; i++) {
            this.serializeThreads.add(new AsyncLoopThread(new TransferRunnable(i)));
        }
    }

    public void transfer(TupleExt tupleExt) {
        DisruptorQueue disruptorQueue;
        int targetTaskId = tupleExt.getTargetTaskId();
        DisruptorQueue disruptorQueue2 = this.innerTaskTransfer.get(Integer.valueOf(targetTaskId));
        if (disruptorQueue2 == null) {
            targetTaskId = 0;
            disruptorQueue = this.serializeQueue;
        } else {
            disruptorQueue = disruptorQueue2;
        }
        if (!this.isBackpressureEnable) {
            disruptorQueue.publish(tupleExt);
            return;
        }
        Boolean bool = this.targetTaskBackpressureStatus.get(Integer.valueOf(targetTaskId));
        if (bool == null) {
            bool = false;
            this.targetTaskBackpressureStatus.put(Integer.valueOf(targetTaskId), null);
        }
        if (bool.booleanValue()) {
            while (disruptorQueue.pctFull() > this.lowMark) {
                JStormUtils.sleepMs(1L);
            }
            this.targetTaskBackpressureStatus.put(Integer.valueOf(targetTaskId), false);
            disruptorQueue.publish(tupleExt);
            return;
        }
        disruptorQueue.publish(tupleExt);
        if (disruptorQueue.pctFull() > this.highMark) {
            this.targetTaskBackpressureStatus.put(Integer.valueOf(targetTaskId), true);
        }
    }

    public void transferControl(TupleExt tupleExt) {
        DisruptorQueue disruptorQueue = this.controlQueues.get(Integer.valueOf(tupleExt.getTargetTaskId()));
        if (disruptorQueue != null) {
            disruptorQueue.publish(tupleExt);
        } else {
            this.transferControlQueue.publish(tupleExt);
        }
    }

    public void push(int i, TupleExt tupleExt) {
        this.serializeQueue.publish(tupleExt);
    }

    public List<AsyncLoopThread> getSerializeThreads() {
        return this.serializeThreads;
    }

    public void serializer(KryoTupleSerializer kryoTupleSerializer) {
        LOG.debug("start Serializer of task, {}", Integer.valueOf(this.taskId));
        if (AsyncLoopRunnable.getShutdown().get()) {
            return;
        }
        if (this.serializeQueue.population() == 0) {
            Utils.sleep(1L);
            return;
        }
        try {
            for (Object obj : this.serializeQueue.retreiveAvailableBatch()) {
                if (obj != null) {
                    serialize(kryoTupleSerializer, obj);
                }
            }
        } catch (InterruptedException e) {
            LOG.error("InterruptedException " + e.getCause());
        } catch (AlertException e2) {
            LOG.error(e2.getMessage(), e2);
            throw new RuntimeException(e2);
        } catch (TimeoutException e3) {
        }
    }

    protected IConnection getConnection(int i) {
        IConnection iConnection = null;
        WorkerSlot workerSlot = this.taskNodeport.get(Integer.valueOf(i));
        if (workerSlot == null) {
            LOG.warn("Internal transfer warn, throw tuple,", new Exception("IConnection to task-" + i + " can't be found"));
        } else {
            iConnection = this.nodeportSocket.get(workerSlot);
            if (iConnection == null) {
                LOG.warn("Internal transfer warn, throw tuple,", new Exception("NodePort to" + workerSlot + " can't be found"));
            }
        }
        return iConnection;
    }

    protected void serialize(KryoTupleSerializer kryoTupleSerializer, Object obj) {
        long time = this.serializeTimer.getTime();
        try {
            ITupleExt iTupleExt = (ITupleExt) obj;
            int targetTaskId = iTupleExt.getTargetTaskId();
            IConnection connection = getConnection(targetTaskId);
            if (connection != null) {
                connection.send(new TaskMessage(targetTaskId, kryoTupleSerializer.serialize((TupleExt) iTupleExt)));
            } else {
                LOG.error("Can not find connection for task-{}", Integer.valueOf(targetTaskId));
            }
            if (MetricUtils.metricAccurateCal) {
                this.serializeTimer.updateTime(time);
            }
        } catch (Throwable th) {
            if (MetricUtils.metricAccurateCal) {
                this.serializeTimer.updateTime(time);
            }
            throw th;
        }
    }
}
