package com.alibaba.jstorm.daemon.worker;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.scheduler.WorkerSlot;
import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.tuple.ITupleExt;
import backtype.storm.tuple.TupleExt;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.esotericsoftware.kryo.KryoException;
import com.alibaba.jstorm.utils.DisruptorRunable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/daemon/worker/DrainerCtrlRunable.class */
public class DrainerCtrlRunable extends DisruptorRunable {
    private static final Logger LOG = LoggerFactory.getLogger(DrainerCtrlRunable.class);
    private ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket;
    private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport;
    protected AtomicReference<KryoTupleSerializer> atomKryoSerializer;

    public DrainerCtrlRunable(WorkerData workerData, String str) {
        super(workerData.getTransferCtrlQueue(), str);
        this.nodeportSocket = workerData.getNodeportSocket();
        this.taskNodeport = workerData.getTaskNodeport();
        this.atomKryoSerializer = workerData.getAtomKryoSerializer();
    }

    protected IConnection getConnection(int i) {
        IConnection iConnection = null;
        WorkerSlot workerSlot = this.taskNodeport.get(Integer.valueOf(i));
        if (workerSlot == null) {
            LOG.warn("Internal transfer warn, throw tuple,", new Exception("IConnection to " + i + " can't be found"));
        } else {
            iConnection = this.nodeportSocket.get(workerSlot);
            if (iConnection == null) {
                LOG.warn("Internal transfer warn, throw tuple,", new Exception("NodePort to" + workerSlot + " can't be found"));
            }
        }
        return iConnection;
    }

    protected byte[] serialize(ITupleExt iTupleExt) {
        byte[] bArr = null;
        KryoTupleSerializer kryoTupleSerializer = this.atomKryoSerializer.get();
        if (kryoTupleSerializer != null) {
            bArr = kryoTupleSerializer.serialize((TupleExt) iTupleExt);
        } else {
            LOG.warn("KryoTupleSerializer is null, so drop tuple...");
        }
        return bArr;
    }

    @Override // com.alibaba.jstorm.utils.DisruptorRunable
    public void handleEvent(Object obj, boolean z) throws Exception {
        ITupleExt iTupleExt;
        int targetTaskId;
        IConnection connection;
        if (obj == null || (connection = getConnection((targetTaskId = (iTupleExt = (ITupleExt) obj).getTargetTaskId()))) == null) {
            return;
        }
        byte[] bArr = null;
        try {
            bArr = serialize(iTupleExt);
        } catch (Throwable th) {
            if (Utils.exceptionCauseIsInstanceOf(KryoException.class, th)) {
                throw new RuntimeException(th);
            }
            LOG.warn("serialize happened errors!!!", th);
        }
        connection.send(new TaskMessage((short) 1, targetTaskId, bArr));
    }
}
