/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.worker;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.serialization.KryoTupleDeserializer;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.utils.DisruptorRunable;
import com.alibaba.jstorm.utils.JStormUtils;
import com.esotericsoftware.kryo.KryoException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VirtualPortCtrlDispatch
extends DisruptorRunable {
    private static final Logger LOG = LoggerFactory.getLogger(VirtualPortCtrlDispatch.class);
    protected ConcurrentHashMap<Integer, DisruptorQueue> controlQueues;
    protected IConnection recvConnection;
    protected volatile TaskStatus taskStatus;
    protected AtomicReference<KryoTupleDeserializer> atomKryoDeserializer;

    public VirtualPortCtrlDispatch(WorkerData workerData, IConnection recvConnection, DisruptorQueue recvQueue, String idStr) {
        super(recvQueue, idStr);
        this.recvConnection = recvConnection;
        this.controlQueues = workerData.getControlQueues();
        this.atomKryoDeserializer = workerData.getAtomKryoDeserializer();
    }

    public void shutdownRecv() {
        try {
            this.recvConnection.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.recvConnection = null;
    }

    @Override
    public void shutdown() {
        super.shutdown();
        LOG.info("Begin to shutdown VirtualPortCtrlDispatch");
        this.shutdownRecv();
        LOG.info("Successfully shudown VirtualPortCtrlDispatch");
    }

    protected Object deserialize(byte[] ser_msg, int taskId) {
        try {
            if (ser_msg == null) {
                return null;
            }
            if (ser_msg.length == 0) {
                return null;
            }
            if (ser_msg.length == 1) {
                byte newStatus = ser_msg[0];
                LOG.info("Change task status as " + newStatus);
                this.taskStatus.setStatus(newStatus);
                return null;
            }
            Tuple tuple = null;
            KryoTupleDeserializer kryo = this.atomKryoDeserializer.get();
            if (kryo != null) {
                tuple = kryo.deserialize(ser_msg);
            }
            return tuple;
        }
        catch (KryoException e) {
            throw new RuntimeException(e);
        }
        catch (Throwable e) {
            if (!this.taskStatus.isShutdown()) {
                LOG.error(this.idStr + " recv thread error " + JStormUtils.toPrintableString(ser_msg) + "\n", e);
            }
            return null;
        }
    }

    @Override
    public void handleEvent(Object event, boolean endOfBatch) throws Exception {
        TaskMessage message = (TaskMessage)event;
        int task = message.task();
        Object tuple = null;
        try {
            tuple = this.deserialize(message.message(), task);
        }
        catch (Throwable e) {
            LOG.warn("serialize happened errors!!!", e);
        }
        DisruptorQueue queue = this.controlQueues.get(task);
        if (queue == null) {
            LOG.warn("Received invalid control message directed at port " + task + ". Dropping...");
            return;
        }
        if (tuple != null) {
            queue.publish(tuple);
        }
    }
}

