/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.worker;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.scheduler.WorkerSlot;
import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.tuple.ITupleExt;
import backtype.storm.tuple.TupleExt;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.utils.DisruptorRunable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DrainerCtrlRunable
extends DisruptorRunable {
    private static final Logger LOG = LoggerFactory.getLogger(DrainerCtrlRunable.class);
    private ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket;
    private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport;
    protected AtomicReference<KryoTupleSerializer> atomKryoSerializer;

    public DrainerCtrlRunable(WorkerData workerData, String idStr) {
        super(workerData.getTransferCtrlQueue(), idStr);
        this.nodeportSocket = workerData.getNodeportSocket();
        this.taskNodeport = workerData.getTaskNodeport();
        this.atomKryoSerializer = workerData.getAtomKryoSerializer();
    }

    protected IConnection getConnection(int taskId) {
        IConnection conn = null;
        WorkerSlot nodePort = this.taskNodeport.get(taskId);
        if (nodePort == null) {
            String errormsg = "IConnection to " + taskId + " can't be found";
            LOG.warn("Internal transfer warn, throw tuple,", (Throwable)new Exception(errormsg));
        } else {
            conn = this.nodeportSocket.get(nodePort);
            if (conn == null) {
                String errormsg = "NodePort to" + nodePort + " can't be found";
                LOG.warn("Internal transfer warn, throw tuple,", (Throwable)new Exception(errormsg));
            }
        }
        return conn;
    }

    protected byte[] serialize(ITupleExt tuple) {
        byte[] bytes = null;
        KryoTupleSerializer kryo = this.atomKryoSerializer.get();
        if (kryo != null) {
            bytes = kryo.serialize((TupleExt)tuple);
        } else {
            LOG.warn("KryoTupleSerializer is null, so drop batchTuple...");
        }
        return bytes;
    }

    @Override
    public void handleEvent(Object event, boolean endOfBatch) throws Exception {
        if (event == null) {
            return;
        }
        ITupleExt tuple = (ITupleExt)event;
        int targetTask = tuple.getTargetTaskId();
        IConnection conn = this.getConnection(targetTask);
        if (conn != null) {
            byte[] tupleMessage = null;
            try {
                tupleMessage = this.serialize(tuple);
            }
            catch (Throwable e) {
                LOG.warn("serialize happened errors!!!", e);
            }
            TaskMessage message = new TaskMessage(1, targetTask, tupleMessage);
            conn.send(message);
        }
    }
}

