/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.message.netty;

import backtype.storm.messaging.ControlMessage;
import backtype.storm.messaging.TaskMessage;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.AsmMeter;
import com.alibaba.jstorm.common.metric.AsmMetric;
import com.alibaba.jstorm.message.netty.NettyConnection;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.utils.NetWorkUtils;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageDecoder
extends FrameDecoder {
    private static final Logger LOG = LoggerFactory.getLogger(MessageDecoder.class);
    private static AsmHistogram msgDecodeTimer;
    private static AsmMeter nettyServerRecvSpeed;
    private static Map<Channel, AsmHistogram> networkTransmitTimeMap;
    private static AsmHistogram networkWorkerTransmitTime;
    private static Map<Channel, String> transmitNameMap;
    private boolean isServer;
    private String localIp;
    private int localPort;
    private boolean enableNettyMetrics;

    public MessageDecoder(boolean isServer, Map conf) {
        this.isServer = isServer;
        this.localPort = ConfigExtension.getLocalWorkerPort(conf);
        this.localIp = NetWorkUtils.ip();
        this.enableNettyMetrics = MetricUtils.isEnableNettyMetrics(conf);
        if (this.enableNettyMetrics) {
            msgDecodeTimer = (AsmHistogram)JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName("MsgDecodeTime", MetricType.HISTOGRAM), new AsmHistogram());
            msgDecodeTimer.setTimeUnit(TimeUnit.NANOSECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
        long available = buf.readableBytes();
        if (available < 6L) {
            return null;
        }
        long startTime = 0L;
        if (this.isServer && this.enableNettyMetrics) {
            startTime = msgDecodeTimer.getTime();
        }
        try {
            buf.markReaderIndex();
            short code = buf.readShort();
            available -= 2L;
            ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
            if (ctrl_msg != null) {
                if (available < 12L) {
                    buf.resetReaderIndex();
                    Object var10_8 = null;
                    return var10_8;
                }
                long timeStamp = buf.readLong();
                int clientPort = buf.readInt();
                available -= 12L;
                if (ctrl_msg == ControlMessage.EOB_MESSAGE) {
                    AsmHistogram netTransTime;
                    long interval = System.currentTimeMillis() - timeStamp;
                    if (interval < 0L) {
                        interval = 0L;
                    }
                    if (this.enableNettyMetrics && (netTransTime = this.getTransmitHistogram(channel, clientPort)) != null) {
                        netTransTime.update(interval * 1000L);
                    }
                    networkWorkerTransmitTime.update(interval * 1000L);
                }
                nettyServerRecvSpeed.update(ControlMessage.encodeLength());
                ControlMessage interval = ctrl_msg;
                return interval;
            }
            short type = code;
            short task = buf.readShort();
            if ((available -= 2L) < 4L) {
                buf.resetReaderIndex();
                Object clientPort = null;
                return clientPort;
            }
            int length = buf.readInt();
            if (length <= 0) {
                LOG.info("Receive one message whose TaskMessage's message length is {}", (Object)length);
                TaskMessage interval = new TaskMessage(task, null);
                return interval;
            }
            if ((available -= 4L) < (long)length) {
                buf.resetReaderIndex();
                Object interval = null;
                return interval;
            }
            ChannelBuffer payload = buf.readBytes(length);
            byte[] rawBytes = payload.array();
            TaskMessage ret = new TaskMessage(type, task, rawBytes);
            nettyServerRecvSpeed.update(rawBytes.length + 8);
            TaskMessage taskMessage = ret;
            return taskMessage;
        }
        finally {
            if (this.isServer && this.enableNettyMetrics && msgDecodeTimer != null) {
                msgDecodeTimer.updateTime(startTime);
            }
        }
    }

    public AsmHistogram getTransmitHistogram(Channel channel, int clientPort) {
        AsmHistogram netTransTime = networkTransmitTimeMap.get(channel);
        if (netTransTime == null) {
            InetSocketAddress sockAddr = (InetSocketAddress)channel.getRemoteAddress();
            String nettyConnection = NettyConnection.mkString(sockAddr.getAddress().getHostAddress(), clientPort, this.localIp, this.localPort);
            netTransTime = (AsmHistogram)JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName("NettySrvTransmitTime", nettyConnection), MetricType.HISTOGRAM), new AsmHistogram());
            networkTransmitTimeMap.put(channel, netTransTime);
            transmitNameMap.put(channel, nettyConnection);
            LOG.info("Register Transmit Histogram of {}, channel {}", (Object)nettyConnection, (Object)channel);
        }
        return netTransTime;
    }

    public static void removeTransmitHistogram(Channel channel) {
        AsmHistogram netTransTime = networkTransmitTimeMap.remove(channel);
        if (netTransTime != null) {
            String nettyConnection = transmitNameMap.remove(channel);
            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName("NettySrvTransmitTime", nettyConnection), MetricType.HISTOGRAM));
            LOG.info("Remove Transmit Histogram of {}, channel {}", (Object)nettyConnection, (Object)channel);
        }
    }

    static {
        nettyServerRecvSpeed = (AsmMeter)JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName("NettySrvRecvSpeed", MetricType.METER), new AsmMeter());
        networkTransmitTimeMap = new HashMap<Channel, AsmHistogram>();
        networkWorkerTransmitTime = (AsmHistogram)JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName("NettySrvTransmitTime", MetricType.HISTOGRAM), new AsmHistogram());
        transmitNameMap = new HashMap<Channel, String>();
    }
}

