package com.alibaba.jstorm.message.netty;

import backtype.storm.messaging.ControlMessage;
import backtype.storm.messaging.TaskMessage;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.AsmMeter;
import com.alibaba.jstorm.common.metric.AsmMetric;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.utils.NetWorkUtils;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/message/netty/MessageDecoder.class */
public class MessageDecoder extends FrameDecoder {
    private static AsmHistogram msgDecodeTimer;
    private boolean isServer;
    private String localIp = NetWorkUtils.ip();
    private int localPort;
    private boolean enableNettyMetrics;
    private static final Logger LOG = LoggerFactory.getLogger(MessageDecoder.class);
    private static AsmMeter nettyServerRecvSpeed = (AsmMeter) JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName(MetricDef.NETTY_SRV_RECV_SPEED, MetricType.METER), new AsmMeter());
    private static Map<Channel, AsmHistogram> networkTransmitTimeMap = new HashMap();
    private static AsmHistogram networkWorkerTransmitTime = (AsmHistogram) JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName(MetricDef.NETTY_SRV_MSG_TRANS_TIME, MetricType.HISTOGRAM), new AsmHistogram());
    private static Map<Channel, String> transmitNameMap = new HashMap();

    public MessageDecoder(boolean z, Map map) {
        this.isServer = z;
        this.localPort = ConfigExtension.getLocalWorkerPort(map);
        this.enableNettyMetrics = MetricUtils.isEnableNettyMetrics(map);
        if (this.enableNettyMetrics) {
            msgDecodeTimer = (AsmHistogram) JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName(MetricDef.NETWORK_MSG_DECODE_TIME, MetricType.HISTOGRAM), new AsmHistogram());
            msgDecodeTimer.setTimeUnit(TimeUnit.NANOSECONDS);
        }
    }

    protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer) throws Exception {
        AsmHistogram transmitHistogram;
        long readableBytes = channelBuffer.readableBytes();
        if (readableBytes < 6) {
            return null;
        }
        long j = 0;
        if (this.isServer && this.enableNettyMetrics) {
            j = msgDecodeTimer.getTime();
        }
        try {
            channelBuffer.markReaderIndex();
            short readShort = channelBuffer.readShort();
            long j2 = readableBytes - 2;
            ControlMessage mkMessage = ControlMessage.mkMessage(readShort);
            if (mkMessage != null) {
                if (j2 < 12) {
                    channelBuffer.resetReaderIndex();
                    if (this.isServer && this.enableNettyMetrics && msgDecodeTimer != null) {
                        msgDecodeTimer.updateTime(j);
                    }
                    return null;
                }
                long readLong = channelBuffer.readLong();
                int readInt = channelBuffer.readInt();
                long j3 = j2 - 12;
                if (mkMessage == ControlMessage.EOB_MESSAGE) {
                    long currentTimeMillis = System.currentTimeMillis() - readLong;
                    if (currentTimeMillis < 0) {
                        currentTimeMillis = 0;
                    }
                    if (this.enableNettyMetrics && (transmitHistogram = getTransmitHistogram(channel, readInt)) != null) {
                        transmitHistogram.update(Long.valueOf(currentTimeMillis * 1000));
                    }
                    networkWorkerTransmitTime.update(Long.valueOf(currentTimeMillis * 1000));
                }
                nettyServerRecvSpeed.update(Integer.valueOf(ControlMessage.encodeLength()));
                if (this.isServer && this.enableNettyMetrics && msgDecodeTimer != null) {
                    msgDecodeTimer.updateTime(j);
                }
                return mkMessage;
            }
            short readShort2 = channelBuffer.readShort();
            long j4 = j2 - 2;
            if (j4 < 4) {
                channelBuffer.resetReaderIndex();
                if (this.isServer && this.enableNettyMetrics && msgDecodeTimer != null) {
                    msgDecodeTimer.updateTime(j);
                }
                return null;
            }
            int readInt2 = channelBuffer.readInt();
            if (readInt2 <= 0) {
                LOG.info("Receive one message whose TaskMessage's message length is {}", Integer.valueOf(readInt2));
                TaskMessage taskMessage = new TaskMessage(readShort2, null);
                if (this.isServer && this.enableNettyMetrics && msgDecodeTimer != null) {
                    msgDecodeTimer.updateTime(j);
                }
                return taskMessage;
            }
            if (j4 - 4 < readInt2) {
                channelBuffer.resetReaderIndex();
                if (this.isServer && this.enableNettyMetrics && msgDecodeTimer != null) {
                    msgDecodeTimer.updateTime(j);
                }
                return null;
            }
            byte[] array = channelBuffer.readBytes(readInt2).array();
            TaskMessage taskMessage2 = new TaskMessage(readShort, readShort2, array);
            nettyServerRecvSpeed.update(Integer.valueOf(array.length + 8));
            if (this.isServer && this.enableNettyMetrics && msgDecodeTimer != null) {
                msgDecodeTimer.updateTime(j);
            }
            return taskMessage2;
        } catch (Throwable th) {
            if (this.isServer && this.enableNettyMetrics && msgDecodeTimer != null) {
                msgDecodeTimer.updateTime(j);
            }
            throw th;
        }
    }

    public AsmHistogram getTransmitHistogram(Channel channel, int i) {
        AsmHistogram asmHistogram = networkTransmitTimeMap.get(channel);
        if (asmHistogram == null) {
            String mkString = NettyConnection.mkString(((InetSocketAddress) channel.getRemoteAddress()).getAddress().getHostAddress(), i, this.localIp, this.localPort);
            asmHistogram = (AsmHistogram) JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_SRV_MSG_TRANS_TIME, mkString), MetricType.HISTOGRAM), new AsmHistogram());
            networkTransmitTimeMap.put(channel, asmHistogram);
            transmitNameMap.put(channel, mkString);
            LOG.info("Register Transmit Histogram of {}, channel {}", mkString, channel);
        }
        return asmHistogram;
    }

    public static void removeTransmitHistogram(Channel channel) {
        if (networkTransmitTimeMap.remove(channel) != null) {
            String remove = transmitNameMap.remove(channel);
            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_SRV_MSG_TRANS_TIME, remove), MetricType.HISTOGRAM));
            LOG.info("Remove Transmit Histogram of {}, channel {}", remove, channel);
        }
    }
}
