package com.alibaba.jstorm.task;

import backtype.storm.Config;
import backtype.storm.serialization.KryoTupleDeserializer;
import backtype.storm.task.IBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IProtoBatchBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.daemon.worker.JStormDebugger;
import com.alibaba.jstorm.esotericsoftware.kryo.KryoException;
import com.alibaba.jstorm.metric.JStormHealthCheck;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.com.lmax.disruptor.EventHandler;
import shade.storm.com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import shade.storm.com.lmax.disruptor.TimeoutException;
import shade.storm.com.lmax.disruptor.dsl.ProducerType;
import shade.storm.org.apache.commons.io.IOUtils;

/* loaded from: input_file:com/alibaba/jstorm/task/TaskReceiver.class */
public class TaskReceiver {
    private static Logger LOG = LoggerFactory.getLogger(TaskReceiver.class);
    protected Map stormConf;
    protected Task task;
    protected final int taskId;
    protected final IBolt bolt;
    protected final String idStr;
    protected TopologyContext topologyContext;
    protected Map<Integer, DisruptorQueue> innerTaskTransfer;
    protected DisruptorQueue deserializeQueue;
    protected List<AsyncLoopThread> deserializeThreads;
    protected AsmHistogram deserializeTimer;
    protected volatile TaskStatus taskStatus;
    protected int dserializeThreadNum;
    private boolean isBackpressureEnable;
    private float lowMark;
    private float highMark;
    private volatile boolean backpressureStatus;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/jstorm/task/TaskReceiver$DeserializeRunnable.class */
    public class DeserializeRunnable extends RunnableCallback implements EventHandler {
        DisruptorQueue deserializeQueue;
        DisruptorQueue exeQueue;
        int threadIndex;
        KryoTupleDeserializer deserializer;

        DeserializeRunnable(DisruptorQueue disruptorQueue, DisruptorQueue disruptorQueue2, int i) {
            this.deserializeQueue = disruptorQueue;
            this.exeQueue = disruptorQueue2;
            this.threadIndex = i;
            this.deserializer = new KryoTupleDeserializer(TaskReceiver.this.stormConf, TaskReceiver.this.topologyContext, TaskReceiver.this.topologyContext.getRawTopology());
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public String getThreadName() {
            return TaskReceiver.this.idStr + "-deserializer-" + this.threadIndex;
        }

        @Override // shade.storm.com.lmax.disruptor.EventHandler
        public void onEvent(Object obj, long j, boolean z) throws Exception {
            TaskReceiver.this.deserialize(this.deserializer, (byte[]) obj, this.exeQueue);
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public void preRun() {
            WorkerClassLoader.switchThreadContext();
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public void postRun() {
            WorkerClassLoader.restoreThreadContext();
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
        public void run() {
            TaskReceiver.LOG.info("Successfully start recvThread of {}, {}", TaskReceiver.this.idStr, Integer.valueOf(this.threadIndex));
            while (!TaskReceiver.this.taskStatus.isShutdown()) {
                try {
                    this.deserializeQueue.multiConsumeBatchWhenAvailableWithCallback(this);
                } catch (Throwable th) {
                    if (!TaskReceiver.this.taskStatus.isShutdown()) {
                        TaskReceiver.LOG.error("Unknow exception ", th);
                    }
                }
            }
            TaskReceiver.this.task.unregisterDeserializeQueue();
            TaskReceiver.LOG.info("Successfully shutdown recvThread of " + TaskReceiver.this.idStr);
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public Object getResult() {
            TaskReceiver.LOG.info("Begin to shutdown recvThread of " + TaskReceiver.this.idStr);
            return -1;
        }
    }

    public TaskReceiver(Task task, int i, Map map, TopologyContext topologyContext, Map<Integer, DisruptorQueue> map2, TaskStatus taskStatus, String str) {
        this.stormConf = map;
        this.task = task;
        this.bolt = task.getTaskObj() instanceof IBolt ? (IBolt) task.getTaskObj() : null;
        this.taskId = i;
        this.idStr = str;
        this.topologyContext = topologyContext;
        this.innerTaskTransfer = map2;
        this.taskStatus = taskStatus;
        this.deserializeQueue = DisruptorQueue.mkInstance("TaskDeserialize", ProducerType.MULTI, JStormUtils.parseInt(map.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256).intValue(), new TimeoutBlockingWaitStrategy(JStormUtils.parseLong(map.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10L).longValue(), TimeUnit.MILLISECONDS), ConfigExtension.isDisruptorQueueBatchMode(map).booleanValue(), ConfigExtension.getDisruptorBufferSize(map), ConfigExtension.getDisruptorBufferFlushMs(map));
        this.dserializeThreadNum = ConfigExtension.getTaskDeserializeThreadNum(map).intValue();
        this.deserializeThreads = new ArrayList();
        setDeserializeThread();
        String topologyId = topologyContext.getTopologyId();
        String thisComponentId = topologyContext.getThisComponentId();
        AsmHistogram asmHistogram = new AsmHistogram();
        asmHistogram.setAggregate(false);
        this.deserializeTimer = (AsmHistogram) JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, thisComponentId, i, MetricDef.DESERIALIZE_TIME, MetricType.HISTOGRAM), asmHistogram);
        QueueGauge queueGauge = new QueueGauge(this.deserializeQueue, this.idStr, MetricDef.DESERIALIZE_QUEUE);
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, thisComponentId, i, MetricDef.DESERIALIZE_QUEUE, MetricType.GAUGE), new AsmGauge(queueGauge));
        JStormHealthCheck.registerTaskHealthCheck(i, MetricDef.DESERIALIZE_QUEUE, queueGauge);
        this.isBackpressureEnable = ConfigExtension.isBackpressureEnable(map);
        this.highMark = (float) ConfigExtension.getBackpressureWaterMarkHigh(map);
        this.lowMark = (float) ConfigExtension.getBackpressureWaterMarkLow(map);
        this.backpressureStatus = false;
        LOG.info("Successfully start TaskReceiver thread for {}, thread num: {}", this.idStr, Integer.valueOf(this.dserializeThreadNum));
    }

    public List<AsyncLoopThread> getDeserializeThread() {
        return this.deserializeThreads;
    }

    protected void setDeserializeThread() {
        for (int i = 0; i < this.dserializeThreadNum; i++) {
            this.deserializeThreads.add(new AsyncLoopThread(new DeserializeRunnable(this.deserializeQueue, this.innerTaskTransfer.get(Integer.valueOf(this.taskId)), i)));
        }
    }

    public DisruptorQueue getDeserializeQueue() {
        return this.deserializeQueue;
    }

    public boolean deserializer(KryoTupleDeserializer kryoTupleDeserializer, boolean z) {
        boolean z2 = true;
        DisruptorQueue disruptorQueue = this.innerTaskTransfer.get(Integer.valueOf(this.taskId));
        if (this.taskStatus.isShutdown()) {
            this.task.unregisterDeserializeQueue();
        } else if ((this.deserializeQueue.population() > 0 && disruptorQueue.pctFull() < 1.0d) || z) {
            try {
                Iterator<Object> it = this.deserializeQueue.retreiveAvailableBatch().iterator();
                while (it.hasNext()) {
                    deserialize(kryoTupleDeserializer, (byte[]) it.next(), disruptorQueue);
                }
                z2 = false;
            } catch (InterruptedException e) {
                LOG.error("InterruptedException " + e.getCause());
                return true;
            } catch (TimeoutException e2) {
                return true;
            } catch (Throwable th) {
                if (Utils.exceptionCauseIsInstanceOf(KryoException.class, th)) {
                    throw new RuntimeException(th);
                }
                if (!this.taskStatus.isShutdown()) {
                    LOG.error("Unknow exception ", th);
                }
            }
        }
        return z2;
    }

    protected void deserialize(KryoTupleDeserializer kryoTupleDeserializer, byte[] bArr, DisruptorQueue disruptorQueue) {
        long time = this.deserializeTimer.getTime();
        try {
            if (bArr != null) {
                try {
                    if (bArr.length != 0) {
                        if (bArr.length == 1) {
                            byte b = bArr[0];
                            LOG.info("Change task status as " + ((int) b));
                            this.taskStatus.setStatus(b);
                            if (MetricUtils.metricAccurateCal) {
                                this.deserializeTimer.updateTime(time);
                                return;
                            }
                            return;
                        }
                        if (this.bolt == null || !(this.bolt instanceof IProtoBatchBolt)) {
                            deserializeTuple(kryoTupleDeserializer, bArr, disruptorQueue);
                        } else {
                            List<byte[]> protoExecute = ((IProtoBatchBolt) this.bolt).protoExecute(bArr);
                            if (protoExecute != null) {
                                Iterator<byte[]> it = protoExecute.iterator();
                                while (it.hasNext()) {
                                    deserializeTuple(kryoTupleDeserializer, it.next(), disruptorQueue);
                                }
                            } else {
                                deserializeTuple(kryoTupleDeserializer, bArr, disruptorQueue);
                            }
                        }
                        if (MetricUtils.metricAccurateCal) {
                            this.deserializeTimer.updateTime(time);
                            return;
                        }
                        return;
                    }
                } catch (Throwable th) {
                    if (Utils.exceptionCauseIsInstanceOf(KryoException.class, th)) {
                        throw new RuntimeException(th);
                    }
                    if (!this.taskStatus.isShutdown()) {
                        LOG.error(this.idStr + " recv thread error " + JStormUtils.toPrintableString(bArr) + IOUtils.LINE_SEPARATOR_UNIX, th);
                    }
                    if (MetricUtils.metricAccurateCal) {
                        this.deserializeTimer.updateTime(time);
                        return;
                    }
                    return;
                }
            }
            if (MetricUtils.metricAccurateCal) {
                this.deserializeTimer.updateTime(time);
            }
        } catch (Throwable th2) {
            if (MetricUtils.metricAccurateCal) {
                this.deserializeTimer.updateTime(time);
            }
            throw th2;
        }
    }

    protected void deserializeTuple(KryoTupleDeserializer kryoTupleDeserializer, byte[] bArr, DisruptorQueue disruptorQueue) {
        Tuple deserialize = kryoTupleDeserializer.deserialize(bArr);
        if (deserialize != null) {
            if (JStormDebugger.isDebugRecv(deserialize.getMessageId())) {
                LOG.info(this.idStr + " receive " + deserialize.toString());
            }
            if (!this.isBackpressureEnable) {
                disruptorQueue.publish(deserialize);
                return;
            }
            if (this.backpressureStatus) {
                while (disruptorQueue.pctFull() > this.lowMark) {
                    JStormUtils.sleepMs(1L);
                }
                disruptorQueue.publish(deserialize);
                this.backpressureStatus = false;
                return;
            }
            disruptorQueue.publish(deserialize);
            if (disruptorQueue.pctFull() > this.highMark) {
                this.backpressureStatus = true;
            }
        }
    }
}
