/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task;

import backtype.storm.serialization.KryoTupleDeserializer;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.daemon.worker.JStormDebugger;
import com.alibaba.jstorm.metric.JStormHealthCheck;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.utils.JStormUtils;
import com.esotericsoftware.kryo.KryoException;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskReceiver {
    private static Logger LOG = LoggerFactory.getLogger(TaskReceiver.class);
    protected Task task;
    protected final int taskId;
    protected final String idStr;
    protected TopologyContext topologyContext;
    protected Map<Integer, DisruptorQueue> innerTaskTransfer;
    protected DisruptorQueue deserializeQueue;
    protected KryoTupleDeserializer deserializer;
    protected AsyncLoopThread deserializeThread;
    protected AsmHistogram deserializeTimer;
    protected TaskStatus taskStatus;

    public TaskReceiver(Task task, int taskId, Map stormConf, TopologyContext topologyContext, Map<Integer, DisruptorQueue> innerTaskTransfer, TaskStatus taskStatus, String taskName) {
        this.task = task;
        this.taskId = taskId;
        this.idStr = taskName;
        this.topologyContext = topologyContext;
        this.innerTaskTransfer = innerTaskTransfer;
        this.taskStatus = taskStatus;
        int queueSize = JStormUtils.parseInt(stormConf.get("topology.executor.receive.buffer.size"), 256);
        WaitStrategy waitStrategy = (WaitStrategy)JStormUtils.createDisruptorWaitStrategy(stormConf);
        this.deserializeQueue = DisruptorQueue.mkInstance("TaskDeserialize", ProducerType.MULTI, queueSize, waitStrategy);
        this.setDeserializeThread();
        this.deserializer = new KryoTupleDeserializer(stormConf, topologyContext);
        String topologyId = topologyContext.getTopologyId();
        String component = topologyContext.getThisComponentId();
        this.deserializeTimer = (AsmHistogram)JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, component, taskId, "DeserializeTime", MetricType.HISTOGRAM), new AsmHistogram());
        QueueGauge deserializeQueueGauge = new QueueGauge(this.deserializeQueue, this.idStr, "DeserializeQueue");
        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, component, taskId, "DeserializeQueue", MetricType.GAUGE), new AsmGauge(deserializeQueueGauge));
        JStormHealthCheck.registerTaskHealthCheck(taskId, "DeserializeQueue", deserializeQueueGauge);
    }

    public AsyncLoopThread getDeserializeThread() {
        return this.deserializeThread;
    }

    protected void setDeserializeThread() {
        this.deserializeThread = new AsyncLoopThread(new DeserializeRunnable(this.deserializeQueue, this.innerTaskTransfer.get(this.taskId)));
    }

    public DisruptorQueue getDeserializeQueue() {
        return this.deserializeQueue;
    }

    class DeserializeRunnable
    extends RunnableCallback
    implements EventHandler {
        DisruptorQueue deserializeQueue;
        DisruptorQueue exeQueue;

        DeserializeRunnable(DisruptorQueue deserializeQueue, DisruptorQueue exeQueue) {
            this.deserializeQueue = deserializeQueue;
            this.exeQueue = exeQueue;
        }

        @Override
        public String getThreadName() {
            return TaskReceiver.this.idStr + "-deserializer";
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected Object deserialize(byte[] ser_msg) {
            long start = TaskReceiver.this.deserializeTimer.getTime();
            try {
                if (ser_msg == null) {
                    Object var4_3 = null;
                    return var4_3;
                }
                if (ser_msg.length == 0) {
                    Object var4_4 = null;
                    return var4_4;
                }
                if (ser_msg.length == 1) {
                    byte newStatus = ser_msg[0];
                    LOG.info("Change task status as " + newStatus);
                    TaskReceiver.this.taskStatus.setStatus(newStatus);
                    Object var5_9 = null;
                    return var5_9;
                }
                Tuple tuple = TaskReceiver.this.deserializer.deserialize(ser_msg);
                if (JStormDebugger.isDebugRecv(tuple.getMessageId().getAnchors())) {
                    LOG.info(TaskReceiver.this.idStr + " receive " + tuple.toString());
                }
                Tuple tuple2 = tuple;
                return tuple2;
            }
            catch (KryoException e) {
                throw new RuntimeException(e);
            }
            catch (Throwable e) {
                if (!TaskReceiver.this.taskStatus.isShutdown()) {
                    LOG.error(TaskReceiver.this.idStr + " recv thread error " + JStormUtils.toPrintableString(ser_msg) + "\n", e);
                }
            }
            finally {
                TaskReceiver.this.deserializeTimer.updateTime(start);
            }
            return null;
        }

        public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
            Object tuple = this.deserialize((byte[])event);
            if (tuple != null) {
                this.exeQueue.publish(tuple);
            }
        }

        @Override
        public void preRun() {
            WorkerClassLoader.switchThreadContext();
        }

        @Override
        public void postRun() {
            WorkerClassLoader.restoreThreadContext();
        }

        @Override
        public void run() {
            LOG.info("Successfully start recvThread of " + TaskReceiver.this.idStr);
            while (!TaskReceiver.this.taskStatus.isShutdown()) {
                try {
                    this.deserializeQueue.consumeBatchWhenAvailable(this);
                }
                catch (Throwable e) {
                    if (TaskReceiver.this.taskStatus.isShutdown()) continue;
                    LOG.error("Unknow exception ", e);
                }
            }
            TaskReceiver.this.task.unregisterDeserializeQueue();
            LOG.info("Successfully shutdown recvThread of " + TaskReceiver.this.idStr);
        }

        @Override
        public Object getResult() {
            LOG.info("Begin to shutdown recvThread of " + TaskReceiver.this.idStr);
            return -1;
        }
    }
}

