package com.alibaba.jstorm.message.netty;

import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.NettyMessage;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.AsmMeter;
import com.alibaba.jstorm.common.metric.AsmMetric;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.metric.JStormHealthCheck;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.metrics.Gauge;
import com.alibaba.jstorm.metrics.health.HealthCheck;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.NetWorkUtils;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.org.apache.thrift.protocol.TMultiplexedProtocol;
import shade.storm.org.objectweb.asm.Opcodes;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/alibaba/jstorm/message/netty/NettyClient.class */
public class NettyClient implements IConnection {
    private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class);
    protected String name;
    protected final int maxRetries;
    protected final int baseSleepMs;
    protected final int maxSleepMs;
    protected final long timeoutMs;
    protected final int MAX_SEND_PENDING;
    protected ClientBootstrap bootstrap;
    protected final InetSocketAddress remoteAddr;
    protected final ChannelFactory factory;
    protected final int bufferSize;
    protected int messageBatchSize;
    protected MessageBuffer messageBuffer;
    protected Object writeLock;
    protected String address;
    protected AsmHistogram sendTimer;
    protected AsmMeter sendSpeed;
    protected ReconnectRunnable reconnector;
    protected ChannelFactory clientChannelFactory;
    protected Set<Channel> closingChannel;
    protected Map stormConf;
    protected boolean connectMyself;
    protected boolean enableNettyMetrics;
    protected long BATCH_THREASHOLD_WARN;
    protected AtomicBoolean isConnecting = new AtomicBoolean(false);
    protected final Object channelClosing = new Object();
    protected AtomicInteger retries = new AtomicInteger(0);
    protected AtomicReference<Channel> channelRef = new AtomicReference<>(null);
    protected final AtomicBoolean beingClosed = new AtomicBoolean(false);
    protected AtomicLong pendings = new AtomicLong(0);
    protected NettyConnection nettyConnection = new NettyConnection();

    /* loaded from: input_file:com/alibaba/jstorm/message/netty/NettyClient$CacheGaugeHealthCheck.class */
    public static class CacheGaugeHealthCheck extends HealthCheck implements Gauge<Double> {
        MessageBuffer messageBuffer;
        String name;
        HealthCheck.Result healthy = HealthCheck.Result.healthy();

        public CacheGaugeHealthCheck(MessageBuffer messageBuffer, String str) {
            this.messageBuffer = messageBuffer;
            this.name = str;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.alibaba.jstorm.metrics.Gauge
        public Double getValue() {
            return Double.valueOf(this.messageBuffer.size());
        }

        @Override // com.alibaba.jstorm.metrics.health.HealthCheck
        protected HealthCheck.Result check() throws Exception {
            return getValue().doubleValue() > ((double) (8 * JStormUtils.SIZE_1_M)) ? HealthCheck.Result.unhealthy(this.name + QueueGauge.QUEUE_IS_FULL) : this.healthy;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyClient(Map map, ChannelFactory channelFactory, String str, int i, ReconnectRunnable reconnectRunnable) {
        this.stormConf = map;
        this.factory = channelFactory;
        this.reconnector = reconnectRunnable;
        this.nettyConnection.setClientPort(NetWorkUtils.ip(), ConfigExtension.getLocalWorkerPort(map));
        this.nettyConnection.setServerPort(str, i);
        this.bufferSize = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)).intValue();
        this.maxRetries = Math.min(30, Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)).intValue());
        this.baseSleepMs = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS)).intValue();
        this.maxSleepMs = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)).intValue();
        this.timeoutMs = ConfigExtension.getNettyPendingBufferTimeout(map);
        this.MAX_SEND_PENDING = (int) ConfigExtension.getNettyMaxSendPending(map);
        this.messageBatchSize = Utils.getInt(map.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), Integer.valueOf(Opcodes.ASM4)).intValue();
        this.messageBuffer = new MessageBuffer(this.messageBatchSize);
        this.writeLock = new Object();
        this.remoteAddr = new InetSocketAddress(str, i);
        this.name = this.remoteAddr.toString();
        this.connectMyself = isConnectMyself(this.stormConf, str, i);
        this.address = JStormServerUtils.getName(str, i);
        this.enableNettyMetrics = MetricUtils.isEnableNettyMetrics(map);
        NettyMetricInstance.register();
        LOG.info("* enable netty metrics: {}", Boolean.valueOf(this.enableNettyMetrics));
        if (!this.connectMyself) {
            registerMetrics();
        }
        this.closingChannel = new HashSet();
        this.BATCH_THREASHOLD_WARN = ConfigExtension.getNettyBufferThresholdSize(this.stormConf);
    }

    public void registerMetrics() {
        if (this.enableNettyMetrics) {
            this.sendTimer = (AsmHistogram) JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_TIME, this.nettyConnection), MetricType.HISTOGRAM), new AsmHistogram());
            this.sendSpeed = (AsmMeter) JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_SPEED, this.nettyConnection), MetricType.METER), new AsmMeter());
            CacheGaugeHealthCheck cacheGaugeHealthCheck = new CacheGaugeHealthCheck(this.messageBuffer, "NettyCliCacheSize:" + this.nettyConnection.toString());
            JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_CACHE_SIZE, this.nettyConnection), MetricType.GAUGE), new AsmGauge(cacheGaugeHealthCheck));
            JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_PENDING, this.nettyConnection), MetricType.GAUGE), new AsmGauge(new Gauge<Double>() { // from class: com.alibaba.jstorm.message.netty.NettyClient.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.alibaba.jstorm.metrics.Gauge
                public Double getValue() {
                    return Double.valueOf(Long.valueOf(NettyClient.this.pendings.get()).doubleValue());
                }
            }));
            JStormHealthCheck.registerWorkerHealthCheck("NettyCliCacheSize:" + this.nettyConnection.toString(), cacheGaugeHealthCheck);
        }
        JStormHealthCheck.registerWorkerHealthCheck("NettyCliConnCheck:" + this.nettyConnection.toString(), new HealthCheck() { // from class: com.alibaba.jstorm.message.netty.NettyClient.2
            HealthCheck.Result healthy = HealthCheck.Result.healthy();
            HealthCheck.Result unhealthy;

            {
                this.unhealthy = HealthCheck.Result.unhealthy("NettyConnection " + NettyClient.this.nettyConnection.toString() + " is broken.");
            }

            @Override // com.alibaba.jstorm.metrics.health.HealthCheck
            protected HealthCheck.Result check() throws Exception {
                return NettyClient.this.isChannelReady() == null ? this.unhealthy : this.healthy;
            }
        });
    }

    public void start() {
        this.bootstrap = new ClientBootstrap(this.clientChannelFactory);
        this.bootstrap.setOption("tcpNoDelay", true);
        this.bootstrap.setOption("reuseAddress", true);
        this.bootstrap.setOption("sendBufferSize", Integer.valueOf(this.bufferSize));
        this.bootstrap.setOption("keepAlive", true);
        this.bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, this.stormConf));
        reconnect();
    }

    public boolean isConnectMyself(Map map, String str, int i) {
        return ConfigExtension.getLocalWorkerPort(map) == i && NetWorkUtils.ip().equals(NetWorkUtils.host2Ip(str));
    }

    public void notifyInterestChanged(Channel channel) {
        MessageBatch drain;
        synchronized (this.writeLock) {
            if (channel.isWritable() && (drain = this.messageBuffer.drain()) != null) {
                flushRequest(channel, drain);
            }
        }
    }

    public void doReconnect() {
        if (this.channelRef.get() == null && !isClosed()) {
            if (this.isConnecting.getAndSet(true)) {
                LOG.info("Connect twice {}", name());
                return;
            }
            long sleepTimeMs = getSleepTimeMs();
            LOG.info("Reconnect ... [{}], {}, sleep {}ms", new Object[]{Integer.valueOf(this.retries.get()), this.name, Long.valueOf(sleepTimeMs)});
            this.bootstrap.connect(this.remoteAddr).addListener(new ChannelFutureListener() { // from class: com.alibaba.jstorm.message.netty.NettyClient.3
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    NettyClient.this.isConnecting.set(false);
                    Channel channel = channelFuture.getChannel();
                    if (!channelFuture.isSuccess()) {
                        NettyClient.LOG.info("Failed to reconnect ... [{}], {}, channel = {}, cause = {}", new Object[]{Integer.valueOf(NettyClient.this.retries.get()), NettyClient.this.name, channel, channelFuture.getCause()});
                        NettyClient.this.reconnect();
                        return;
                    }
                    NettyClient.LOG.info("Connection established, channel = :{}", channel);
                    NettyClient.this.setChannel(channel);
                    NettyClient.this.BATCH_THREASHOLD_WARN = ConfigExtension.getNettyBufferThresholdSize(NettyClient.this.stormConf);
                    synchronized (NettyClient.this.writeLock) {
                        if (channel != null) {
                            if (NettyClient.this.messageBuffer.size() > 0) {
                                NettyClient.this.flushRequest(channel, NettyClient.this.messageBuffer.drain());
                            }
                        }
                        NettyClient.LOG.warn("Failed to flush pending message after reconnecting, channel={}, messageBuffer.size={}", channel, Integer.valueOf(NettyClient.this.messageBuffer.size()));
                    }
                }
            });
            JStormUtils.sleepMs(sleepTimeMs);
        }
    }

    public void reconnect() {
        this.reconnector.pushEvent(this);
    }

    private int getSleepTimeMs() {
        int incrementAndGet = this.baseSleepMs * this.retries.incrementAndGet();
        if (incrementAndGet > 1000) {
            incrementAndGet = 1000;
        }
        return incrementAndGet;
    }

    @Override // backtype.storm.messaging.IConnection
    public void send(List<TaskMessage> list) {
        LOG.warn("Should be overload");
    }

    @Override // backtype.storm.messaging.IConnection
    public void send(TaskMessage taskMessage) {
        LOG.warn("Should be overload");
    }

    Channel isChannelReady() {
        Channel channel = this.channelRef.get();
        if (channel != null && channel.isWritable()) {
            return channel;
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void flushRequest(Channel channel, NettyMessage nettyMessage) {
        if (nettyMessage == null || nettyMessage.isEmpty()) {
            return;
        }
        Long valueOf = Long.valueOf(nettyMessage.getEncodedLength());
        this.pendings.incrementAndGet();
        if (this.enableNettyMetrics && this.sendSpeed != null) {
            this.sendSpeed.update(valueOf);
        }
        NettyMetricInstance.totalSendSpeed.update(valueOf);
        if (MetricUtils.metricAccurateCal) {
            NettyMetricInstance.batchSizeWorkerHistogram.update(valueOf);
        }
        channel.write(nettyMessage).addListener(new ChannelFutureListener() { // from class: com.alibaba.jstorm.message.netty.NettyClient.4
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                NettyClient.this.pendings.decrementAndGet();
                if (channelFuture.isSuccess()) {
                    return;
                }
                Channel channel2 = channelFuture.getChannel();
                if (!NettyClient.this.isClosed()) {
                    NettyClient.LOG.info("Failed to send requests to " + NettyClient.this.name + ": " + channel2.toString() + TMultiplexedProtocol.SEPARATOR, channelFuture.getCause());
                }
                if (null != channel2) {
                    NettyClient.this.exceptionChannel(channel2);
                }
            }
        });
    }

    public void unregisterMetrics() {
        if (this.enableNettyMetrics) {
            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_TIME, this.nettyConnection), MetricType.HISTOGRAM));
            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_PENDING, this.nettyConnection), MetricType.GAUGE));
            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_CACHE_SIZE, this.nettyConnection), MetricType.GAUGE));
            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_SPEED, this.nettyConnection), MetricType.METER));
        }
        JStormMetrics.unregisterWorkerMetric(MetricUtils.workerMetricName(MetricDef.NETTY_CLI_SEND_SPEED, MetricType.METER));
        JStormMetrics.unregisterWorkerMetric(MetricUtils.workerMetricName(MetricDef.NETTY_CLI_BATCH_SIZE, MetricType.HISTOGRAM));
        JStormHealthCheck.unregisterWorkerHealthCheck("NettyCliCacheSize:" + this.nettyConnection.toString());
        JStormHealthCheck.unregisterWorkerHealthCheck("NettyCliConnCheck:" + this.nettyConnection.toString());
    }

    /* JADX WARN: Code restructure failed: missing block: B:21:0x00ac, code lost:
    
        com.alibaba.jstorm.message.netty.NettyClient.LOG.error("Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent", name(), java.lang.Long.valueOf(r6.pendings.get()));
     */
    @Override // backtype.storm.messaging.IConnection
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void close() {
        /*
            r6 = this;
            org.slf4j.Logger r0 = com.alibaba.jstorm.message.netty.NettyClient.LOG
            java.lang.String r1 = "Close netty connection to {}"
            r2 = r6
            java.lang.String r2 = r2.name()
            r0.info(r1, r2)
            r0 = r6
            java.util.concurrent.atomic.AtomicBoolean r0 = r0.beingClosed
            r1 = 0
            r2 = 1
            boolean r0 = r0.compareAndSet(r1, r2)
            if (r0 != 0) goto L27
            org.slf4j.Logger r0 = com.alibaba.jstorm.message.netty.NettyClient.LOG
            java.lang.String r1 = "Netty client has been closed."
            r0.info(r1)
            return
        L27:
            r0 = r6
            boolean r0 = r0.connectMyself
            if (r0 != 0) goto L32
            r0 = r6
            r0.unregisterMetrics()
        L32:
            r0 = r6
            java.util.concurrent.atomic.AtomicReference<org.jboss.netty.channel.Channel> r0 = r0.channelRef
            java.lang.Object r0 = r0.get()
            org.jboss.netty.channel.Channel r0 = (org.jboss.netty.channel.Channel) r0
            r7 = r0
            r0 = r7
            if (r0 != 0) goto L51
            org.slf4j.Logger r0 = com.alibaba.jstorm.message.netty.NettyClient.LOG
            java.lang.String r1 = "Channel {} has been closed before"
            r2 = r6
            java.lang.String r2 = r2.name()
            r0.info(r1, r2)
            return
        L51:
            r0 = 10000(0x2710, double:4.9407E-320)
            r8 = r0
            long r0 = java.lang.System.currentTimeMillis()
            r10 = r0
            org.slf4j.Logger r0 = com.alibaba.jstorm.message.netty.NettyClient.LOG
            java.lang.StringBuilder r1 = new java.lang.StringBuilder
            r2 = r1
            r2.<init>()
            java.lang.String r2 = "Waiting for pending batchs to be sent with "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r6
            java.lang.String r2 = r2.name()
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r2 = "..., timeout: {}ms, pendings: {}"
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r1 = r1.toString()
            r2 = 10000(0x2710, double:4.9407E-320)
            java.lang.Long r2 = java.lang.Long.valueOf(r2)
            r3 = r6
            java.util.concurrent.atomic.AtomicLong r3 = r3.pendings
            long r3 = r3.get()
            java.lang.Long r3 = java.lang.Long.valueOf(r3)
            r0.info(r1, r2, r3)
        L8f:
            r0 = r6
            java.util.concurrent.atomic.AtomicLong r0 = r0.pendings
            long r0 = r0.get()
            r1 = 0
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 == 0) goto Ld6
            long r0 = java.lang.System.currentTimeMillis()     // Catch: java.lang.InterruptedException -> Ld1
            r1 = r10
            long r0 = r0 - r1
            r12 = r0
            r0 = r12
            r1 = 10000(0x2710, double:4.9407E-320)
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 <= 0) goto Lc8
            org.slf4j.Logger r0 = com.alibaba.jstorm.message.netty.NettyClient.LOG     // Catch: java.lang.InterruptedException -> Ld1
            java.lang.String r1 = "Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent"
            r2 = r6
            java.lang.String r2 = r2.name()     // Catch: java.lang.InterruptedException -> Ld1
            r3 = r6
            java.util.concurrent.atomic.AtomicLong r3 = r3.pendings     // Catch: java.lang.InterruptedException -> Ld1
            long r3 = r3.get()     // Catch: java.lang.InterruptedException -> Ld1
            java.lang.Long r3 = java.lang.Long.valueOf(r3)     // Catch: java.lang.InterruptedException -> Ld1
            r0.error(r1, r2, r3)     // Catch: java.lang.InterruptedException -> Ld1
            goto Ld6
        Lc8:
            r0 = 1000(0x3e8, double:4.94E-321)
            java.lang.Thread.sleep(r0)     // Catch: java.lang.InterruptedException -> Ld1
            goto L8f
        Ld1:
            r14 = move-exception
            goto Ld6
        Ld6:
            r0 = r6
            r0.close_n_release()
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.alibaba.jstorm.message.netty.NettyClient.close():void");
    }

    void close_n_release() {
        if (this.channelRef.get() != null) {
            setChannel(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeChannel(final Channel channel) {
        synchronized (this.channelClosing) {
            if (this.closingChannel.contains(channel)) {
                LOG.info(channel.toString() + " is already closed");
                return;
            }
            this.closingChannel.add(channel);
            LOG.debug(channel.toString() + " begin to closed");
            channel.close().addListener(new ChannelFutureListener() { // from class: com.alibaba.jstorm.message.netty.NettyClient.5
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    synchronized (NettyClient.this.channelClosing) {
                        NettyClient.this.closingChannel.remove(channel);
                    }
                    NettyClient.LOG.debug(channel.toString() + " finish closed");
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void connectChannel(Channel channel) {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void disconnectChannel(Channel channel) {
        if (isClosed()) {
            return;
        }
        if (channel != this.channelRef.get()) {
            closeChannel(channel);
        } else {
            setChannel(null);
            reconnect();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void exceptionChannel(Channel channel) {
        if (channel == this.channelRef.get()) {
            setChannel(null);
        } else {
            closeChannel(channel);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setChannel(Channel channel) {
        Channel andSet = this.channelRef.getAndSet(channel);
        if (channel != null) {
            this.retries.set(0);
        }
        String obj = andSet == null ? "null" : andSet.getLocalAddress().toString();
        LOG.info("Use new channel {} replace old channel {}", channel == null ? "null" : channel.getLocalAddress().toString(), obj);
        if (andSet == channel || andSet == null) {
            return;
        }
        closeChannel(andSet);
        LOG.info("Successfully close old channel " + obj);
    }

    @Override // backtype.storm.messaging.IConnection
    public boolean isClosed() {
        return this.beingClosed.get();
    }

    public AtomicBoolean getBeing_closed() {
        return this.beingClosed;
    }

    public int getBuffer_size() {
        return this.bufferSize;
    }

    public SocketAddress getRemoteAddr() {
        return this.remoteAddr;
    }

    public String name() {
        return this.name;
    }

    public void handleResponse(Channel channel, Object obj) {
        LOG.warn("Should be overload");
    }

    @Override // backtype.storm.messaging.IConnection
    public Object recv(Integer num, int i) {
        throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages");
    }

    @Override // backtype.storm.messaging.IConnection
    public void registerQueue(Integer num, DisruptorQueue disruptorQueue) {
        throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages");
    }

    @Override // backtype.storm.messaging.IConnection
    public void enqueue(TaskMessage taskMessage, Channel channel) {
        throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages");
    }

    @Override // backtype.storm.messaging.IConnection
    public boolean available() {
        return isChannelReady() != null && this.pendings.get() < ((long) this.MAX_SEND_PENDING);
    }
}
