package com.alibaba.jstorm.task;

import backtype.storm.Config;
import backtype.storm.serialization.KryoTupleDeserializer;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.daemon.worker.JStormDebugger;
import com.alibaba.jstorm.metric.JStormHealthCheck;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.esotericsoftware.kryo.KryoException;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/task/TaskReceiver.class */
public class TaskReceiver {
    private static Logger LOG = LoggerFactory.getLogger(TaskReceiver.class);
    protected Task task;
    protected final int taskId;
    protected final String idStr;
    protected TopologyContext topologyContext;
    protected Map<Integer, DisruptorQueue> innerTaskTransfer;
    protected DisruptorQueue deserializeQueue;
    protected KryoTupleDeserializer deserializer;
    protected AsyncLoopThread deserializeThread;
    protected AsmHistogram deserializeTimer;
    protected TaskStatus taskStatus;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/jstorm/task/TaskReceiver$DeserializeRunnable.class */
    public class DeserializeRunnable extends RunnableCallback implements EventHandler {
        DisruptorQueue deserializeQueue;
        DisruptorQueue exeQueue;

        /* JADX INFO: Access modifiers changed from: package-private */
        public DeserializeRunnable(DisruptorQueue disruptorQueue, DisruptorQueue disruptorQueue2) {
            this.deserializeQueue = disruptorQueue;
            this.exeQueue = disruptorQueue2;
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public String getThreadName() {
            return TaskReceiver.this.idStr + "-deserializer";
        }

        protected Object deserialize(byte[] bArr) {
            long time = TaskReceiver.this.deserializeTimer.getTime();
            try {
                if (bArr == null) {
                    TaskReceiver.this.deserializeTimer.updateTime(time);
                    return null;
                }
                try {
                    if (bArr.length == 0) {
                        TaskReceiver.this.deserializeTimer.updateTime(time);
                        return null;
                    }
                    if (bArr.length == 1) {
                        byte b = bArr[0];
                        TaskReceiver.LOG.info("Change task status as " + ((int) b));
                        TaskReceiver.this.taskStatus.setStatus(b);
                        TaskReceiver.this.deserializeTimer.updateTime(time);
                        return null;
                    }
                    Tuple deserialize = TaskReceiver.this.deserializer.deserialize(bArr);
                    if (JStormDebugger.isDebugRecv(deserialize.getMessageId().getAnchors())) {
                        TaskReceiver.LOG.info(TaskReceiver.this.idStr + " receive " + deserialize.toString());
                    }
                    TaskReceiver.this.deserializeTimer.updateTime(time);
                    return deserialize;
                } catch (KryoException e) {
                    throw new RuntimeException((Throwable) e);
                } catch (Throwable th) {
                    if (!TaskReceiver.this.taskStatus.isShutdown()) {
                        TaskReceiver.LOG.error(TaskReceiver.this.idStr + " recv thread error " + JStormUtils.toPrintableString(bArr) + "\n", th);
                    }
                    TaskReceiver.this.deserializeTimer.updateTime(time);
                    return null;
                }
            } catch (Throwable th2) {
                TaskReceiver.this.deserializeTimer.updateTime(time);
                throw th2;
            }
        }

        public void onEvent(Object obj, long j, boolean z) throws Exception {
            Object deserialize = deserialize((byte[]) obj);
            if (deserialize != null) {
                this.exeQueue.publish(deserialize);
            }
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public void preRun() {
            WorkerClassLoader.switchThreadContext();
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public void postRun() {
            WorkerClassLoader.restoreThreadContext();
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
        public void run() {
            TaskReceiver.LOG.info("Successfully start recvThread of " + TaskReceiver.this.idStr);
            while (!TaskReceiver.this.taskStatus.isShutdown()) {
                try {
                    this.deserializeQueue.consumeBatchWhenAvailable(this);
                } catch (Throwable th) {
                    if (!TaskReceiver.this.taskStatus.isShutdown()) {
                        TaskReceiver.LOG.error("Unknow exception ", th);
                    }
                }
            }
            TaskReceiver.this.task.unregisterDeserializeQueue();
            TaskReceiver.LOG.info("Successfully shutdown recvThread of " + TaskReceiver.this.idStr);
        }

        @Override // com.alibaba.jstorm.callback.RunnableCallback
        public Object getResult() {
            TaskReceiver.LOG.info("Begin to shutdown recvThread of " + TaskReceiver.this.idStr);
            return -1;
        }
    }

    public TaskReceiver(Task task, int i, Map map, TopologyContext topologyContext, Map<Integer, DisruptorQueue> map2, TaskStatus taskStatus, String str) {
        this.task = task;
        this.taskId = i;
        this.idStr = str;
        this.topologyContext = topologyContext;
        this.innerTaskTransfer = map2;
        this.taskStatus = taskStatus;
        this.deserializeQueue = DisruptorQueue.mkInstance("TaskDeserialize", ProducerType.MULTI, JStormUtils.parseInt(map.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256).intValue(), (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(map));
        setDeserializeThread();
        this.deserializer = new KryoTupleDeserializer(map, topologyContext);
        String topologyId = topologyContext.getTopologyId();
        String thisComponentId = topologyContext.getThisComponentId();
        this.deserializeTimer = (AsmHistogram) JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, thisComponentId, i, MetricDef.DESERIALIZE_TIME, MetricType.HISTOGRAM), new AsmHistogram());
        QueueGauge queueGauge = new QueueGauge(this.deserializeQueue, this.idStr, MetricDef.DESERIALIZE_QUEUE);
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, thisComponentId, i, MetricDef.DESERIALIZE_QUEUE, MetricType.GAUGE), new AsmGauge(queueGauge));
        JStormHealthCheck.registerTaskHealthCheck(i, MetricDef.DESERIALIZE_QUEUE, queueGauge);
    }

    public AsyncLoopThread getDeserializeThread() {
        return this.deserializeThread;
    }

    protected void setDeserializeThread() {
        this.deserializeThread = new AsyncLoopThread(new DeserializeRunnable(this.deserializeQueue, this.innerTaskTransfer.get(Integer.valueOf(this.taskId))));
    }

    public DisruptorQueue getDeserializeQueue() {
        return this.deserializeQueue;
    }
}
