package com.alibaba.jstorm.daemon.worker;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.serialization.KryoTupleDeserializer;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.utils.DisruptorRunable;
import com.alibaba.jstorm.utils.JStormUtils;
import com.esotericsoftware.kryo.KryoException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/daemon/worker/VirtualPortCtrlDispatch.class */
public class VirtualPortCtrlDispatch extends DisruptorRunable {
    private static final Logger LOG = LoggerFactory.getLogger(VirtualPortCtrlDispatch.class);
    protected ConcurrentHashMap<Integer, DisruptorQueue> controlQueues;
    protected IConnection recvConnection;
    protected volatile TaskStatus taskStatus;
    protected AtomicReference<KryoTupleDeserializer> atomKryoDeserializer;

    public VirtualPortCtrlDispatch(WorkerData workerData, IConnection iConnection, DisruptorQueue disruptorQueue, String str) {
        super(disruptorQueue, str);
        this.recvConnection = iConnection;
        this.controlQueues = workerData.getControlQueues();
        this.atomKryoDeserializer = workerData.getAtomKryoDeserializer();
    }

    public void shutdownRecv() {
        try {
            this.recvConnection.close();
        } catch (Exception e) {
        }
        this.recvConnection = null;
    }

    @Override // com.alibaba.jstorm.utils.DisruptorRunable, com.alibaba.jstorm.callback.RunnableCallback, backtype.storm.daemon.Shutdownable
    public void shutdown() {
        super.shutdown();
        LOG.info("Begin to shutdown VirtualPortCtrlDispatch");
        shutdownRecv();
        LOG.info("Successfully shudown VirtualPortCtrlDispatch");
    }

    protected Object deserialize(byte[] bArr, int i) {
        if (bArr == null) {
            return null;
        }
        try {
            if (bArr.length == 0) {
                return null;
            }
            if (bArr.length == 1) {
                byte b = bArr[0];
                LOG.info("Change task status as " + ((int) b));
                this.taskStatus.setStatus(b);
                return null;
            }
            Tuple tuple = null;
            KryoTupleDeserializer kryoTupleDeserializer = this.atomKryoDeserializer.get();
            if (kryoTupleDeserializer != null) {
                tuple = kryoTupleDeserializer.deserialize(bArr);
            }
            return tuple;
        } catch (KryoException e) {
            throw new RuntimeException((Throwable) e);
        } catch (Throwable th) {
            if (this.taskStatus.isShutdown()) {
                return null;
            }
            LOG.error(this.idStr + " recv thread error " + JStormUtils.toPrintableString(bArr) + "\n", th);
            return null;
        }
    }

    @Override // com.alibaba.jstorm.utils.DisruptorRunable
    public void handleEvent(Object obj, boolean z) throws Exception {
        TaskMessage taskMessage = (TaskMessage) obj;
        int task = taskMessage.task();
        Object obj2 = null;
        try {
            obj2 = deserialize(taskMessage.message(), task);
        } catch (Throwable th) {
            LOG.warn("serialize happened errors!!!", th);
        }
        DisruptorQueue disruptorQueue = this.controlQueues.get(Integer.valueOf(task));
        if (disruptorQueue == null) {
            LOG.warn("Received invalid control message directed at port " + task + ". Dropping...");
        } else if (obj2 != null) {
            disruptorQueue.publish(obj2);
        }
    }
}
