/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.BatchTuple;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.daemon.worker.JStormDebugger;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskReceiver;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskBatchReceiver
extends TaskReceiver {
    private static Logger LOG = LoggerFactory.getLogger(TaskBatchReceiver.class);

    public TaskBatchReceiver(Task task, int taskId, Map stormConf, TopologyContext topologyContext, Map<Integer, DisruptorQueue> innerTaskTransfer, TaskStatus taskStatus, String taskName) {
        super(task, taskId, stormConf, topologyContext, innerTaskTransfer, taskStatus, taskName);
    }

    @Override
    protected void setDeserializeThread() {
        this.deserializeThread = new AsyncLoopThread(new DeserializeBatchRunnable(this.deserializeQueue, (DisruptorQueue)this.innerTaskTransfer.get(this.taskId)));
    }

    public class DeserializeBatchRunnable
    extends TaskReceiver.DeserializeRunnable {
        public DeserializeBatchRunnable(DisruptorQueue deserializeQueue, DisruptorQueue exeQueue) {
            super(TaskBatchReceiver.this, deserializeQueue, exeQueue);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected Object deserialize(byte[] ser_msg) {
            long start = TaskBatchReceiver.this.deserializeTimer.getTime();
            try {
                if (ser_msg == null) {
                    Object var4_3 = null;
                    return var4_3;
                }
                if (ser_msg.length == 0) {
                    Object var4_4 = null;
                    return var4_4;
                }
                if (ser_msg.length == 1) {
                    byte newStatus = ser_msg[0];
                    LOG.info("Change task status as " + newStatus);
                    TaskBatchReceiver.this.taskStatus.setStatus(newStatus);
                    Object var5_8 = null;
                    return var5_8;
                }
                BatchTuple tuple = TaskBatchReceiver.this.deserializer.deserializeBatch(ser_msg);
                if (JStormDebugger.isDebugRecv(tuple.getTuples())) {
                    LOG.info(TaskBatchReceiver.this.idStr + " receive " + tuple.toString());
                }
                BatchTuple batchTuple = tuple;
                return batchTuple;
            }
            catch (Throwable e) {
                if (!TaskBatchReceiver.this.taskStatus.isShutdown()) {
                    LOG.error(TaskBatchReceiver.this.idStr + " recv thread error " + JStormUtils.toPrintableString(ser_msg) + "\n", e);
                }
            }
            finally {
                TaskBatchReceiver.this.deserializeTimer.updateTime(start);
            }
            return null;
        }
    }
}

