/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.message.netty;

import backtype.storm.messaging.ControlMessage;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.AsmMeter;
import com.alibaba.jstorm.common.metric.AsmMetric;
import com.alibaba.jstorm.message.netty.MessageBatch;
import com.alibaba.jstorm.message.netty.NettyConnection;
import com.alibaba.jstorm.message.netty.ReconnectRunnable;
import com.alibaba.jstorm.message.netty.StormClientPipelineFactory;
import com.alibaba.jstorm.metric.JStormHealthCheck;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.NetWorkUtils;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.health.HealthCheck;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NettyClient
implements IConnection {
    private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class);
    protected String name;
    protected final int max_retries;
    protected final int base_sleep_ms;
    protected final int max_sleep_ms;
    protected final long timeoutMs;
    protected final int MAX_SEND_PENDING;
    protected AtomicInteger retries;
    protected AtomicReference<Channel> channelRef;
    protected ClientBootstrap bootstrap;
    protected final InetSocketAddress remote_addr;
    protected final ChannelFactory factory;
    protected final int buffer_size;
    protected final AtomicBoolean being_closed;
    protected AtomicLong pendings;
    protected int messageBatchSize;
    protected AtomicReference<MessageBatch> messageBatchRef;
    protected ScheduledExecutorService scheduler;
    protected String address;
    protected AsmHistogram sendTimer;
    protected AsmMeter sendSpeed;
    protected static AsmMeter totalSendSpeed = (AsmMeter)JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName("NettyCliSendSpeed", MetricType.METER), new AsmMeter());
    protected AsmHistogram batchSizeWorkerHistogram = (AsmHistogram)JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName("NettyCliSendBatchSize", MetricType.HISTOGRAM), new AsmHistogram());
    protected ReconnectRunnable reconnector;
    protected ChannelFactory clientChannelFactory;
    protected Set<Channel> closingChannel;
    protected AtomicBoolean isConnecting = new AtomicBoolean(false);
    protected NettyConnection nettyConnection;
    protected Map stormConf;
    protected boolean connectMyself;
    protected final Object channelClosing = new Object();
    protected boolean enableNettyMetrics;

    NettyClient(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) {
        this.stormConf = storm_conf;
        this.factory = factory;
        this.scheduler = scheduler;
        this.reconnector = reconnector;
        this.retries = new AtomicInteger(0);
        this.channelRef = new AtomicReference<Object>(null);
        this.being_closed = new AtomicBoolean(false);
        this.pendings = new AtomicLong(0L);
        this.nettyConnection = new NettyConnection();
        this.nettyConnection.setClientPort(NetWorkUtils.ip(), ConfigExtension.getLocalWorkerPort(storm_conf));
        this.nettyConnection.setServerPort(host, port);
        this.buffer_size = Utils.getInt(storm_conf.get("storm.messaging.netty.buffer_size"));
        this.max_retries = Math.min(30, Utils.getInt(storm_conf.get("storm.messaging.netty.max_retries")));
        this.base_sleep_ms = Utils.getInt(storm_conf.get("storm.messaging.netty.min_wait_ms"));
        this.max_sleep_ms = Utils.getInt(storm_conf.get("storm.messaging.netty.max_wait_ms"));
        this.timeoutMs = ConfigExtension.getNettyPendingBufferTimeout(storm_conf);
        this.MAX_SEND_PENDING = (int)ConfigExtension.getNettyMaxSendPending(storm_conf);
        this.messageBatchSize = Utils.getInt(storm_conf.get("storm.messaging.netty.transfer.batch.size"), 262144);
        this.messageBatchRef = new AtomicReference();
        this.remote_addr = new InetSocketAddress(host, port);
        this.name = this.remote_addr.toString();
        this.connectMyself = this.isConnectMyself(this.stormConf, host, port);
        this.address = JStormServerUtils.getName(host, port);
        this.enableNettyMetrics = MetricUtils.isEnableNettyMetrics(storm_conf);
        LOG.info("* enable netty metrics: {}", (Object)this.enableNettyMetrics);
        if (!this.connectMyself) {
            this.registerMetrics();
        }
        this.closingChannel = new HashSet<Channel>();
    }

    public void registerMetrics() {
        if (this.enableNettyMetrics) {
            this.sendTimer = (AsmHistogram)JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName("NettyCliSendTime", this.nettyConnection), MetricType.HISTOGRAM), new AsmHistogram());
            this.sendSpeed = (AsmMeter)JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName("NettyCliSendSpeed", this.nettyConnection), MetricType.METER), new AsmMeter());
            CacheGaugeHealthCheck cacheGauge = new CacheGaugeHealthCheck(this.messageBatchRef, "NettyCliCacheSize:" + this.nettyConnection.toString());
            JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName("NettyCliCacheSize", this.nettyConnection), MetricType.GAUGE), new AsmGauge(cacheGauge));
            JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName("NettyCliSendPending", this.nettyConnection), MetricType.GAUGE), new AsmGauge(new Gauge<Double>(){

                public Double getValue() {
                    return Long.valueOf(NettyClient.this.pendings.get()).doubleValue();
                }
            }));
            JStormHealthCheck.registerWorkerHealthCheck("NettyCliCacheSize:" + this.nettyConnection.toString(), cacheGauge);
        }
        JStormHealthCheck.registerWorkerHealthCheck("NettyCliConnCheck:" + this.nettyConnection.toString(), new HealthCheck(){
            HealthCheck.Result healthy = HealthCheck.Result.healthy();
            HealthCheck.Result unhealthy;
            {
                this.unhealthy = HealthCheck.Result.unhealthy((String)("NettyConnection " + NettyClient.this.nettyConnection.toString() + " is broken."));
            }

            protected HealthCheck.Result check() throws Exception {
                if (NettyClient.this.isChannelReady() == null) {
                    return this.unhealthy;
                }
                return this.healthy;
            }
        });
    }

    public void start() {
        this.bootstrap = new ClientBootstrap(this.clientChannelFactory);
        this.bootstrap.setOption("tcpNoDelay", (Object)true);
        this.bootstrap.setOption("reuserAddress", (Object)true);
        this.bootstrap.setOption("sendBufferSize", (Object)this.buffer_size);
        this.bootstrap.setOption("keepAlive", (Object)true);
        this.bootstrap.setPipelineFactory((ChannelPipelineFactory)new StormClientPipelineFactory(this, this.stormConf));
        this.reconnect();
    }

    public boolean isConnectMyself(Map conf, String host, int port) {
        String localIp = NetWorkUtils.ip();
        String remoteIp = NetWorkUtils.host2Ip(host);
        int localPort = ConfigExtension.getLocalWorkerPort(conf);
        return localPort == port && localIp.equals(remoteIp);
    }

    public void notifyInterestChanged(Channel channel) {
        if (channel.isWritable()) {
            MessageBatch messageBatch = this.messageBatchRef.getAndSet(null);
            this.flushRequest(channel, messageBatch);
        }
    }

    public void doReconnect() {
        if (this.channelRef.get() != null) {
            return;
        }
        if (this.isClosed()) {
            return;
        }
        if (this.isConnecting.getAndSet(true)) {
            LOG.info("Connect twice {}", (Object)this.name());
            return;
        }
        long sleepMs = this.getSleepTimeMs();
        LOG.info("Reconnect ... [{}], {}, sleep {}ms", new Object[]{this.retries.get(), this.name, sleepMs});
        ChannelFuture future = this.bootstrap.connect((SocketAddress)this.remote_addr);
        future.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                NettyClient.this.isConnecting.set(false);
                Channel channel = future.getChannel();
                if (future.isSuccess()) {
                    LOG.info("Connection established, channel = :{}", (Object)channel);
                    NettyClient.this.setChannel(channel);
                } else {
                    LOG.info("Failed to reconnect ... [{}], {}, channel = {}, cause = {}", new Object[]{NettyClient.this.retries.get(), NettyClient.this.name, channel, future.getCause()});
                    NettyClient.this.reconnect();
                }
            }
        });
        JStormUtils.sleepMs(sleepMs);
    }

    public void reconnect() {
        this.reconnector.pushEvent(this);
    }

    private int getSleepTimeMs() {
        int sleepMs = this.base_sleep_ms * this.retries.incrementAndGet();
        if (sleepMs > 1000) {
            sleepMs = 1000;
        }
        return sleepMs;
    }

    @Override
    public void send(List<TaskMessage> messages) {
        LOG.warn("Should be overload");
    }

    @Override
    public void send(TaskMessage message) {
        LOG.warn("Should be overload");
    }

    Channel isChannelReady() {
        Channel channel = this.channelRef.get();
        if (channel == null) {
            return null;
        }
        if (!channel.isWritable()) {
            return null;
        }
        return channel;
    }

    protected synchronized void flushRequest(Channel channel, ControlMessage request) {
        if (request == null) {
            return;
        }
        ChannelFuture future = channel.write((Object)request);
        future.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    Channel channel = future.getChannel();
                    if (!NettyClient.this.isClosed()) {
                        LOG.info("Failed to send request to " + NettyClient.this.name + ": " + channel.toString() + ":", future.getCause());
                    }
                    if (null != channel) {
                        NettyClient.this.exceptionChannel(channel);
                    }
                }
            }
        });
    }

    protected synchronized void flushRequest(Channel channel, MessageBatch requests) {
        if (requests == null || requests.isEmpty()) {
            return;
        }
        Long batchSize = requests.getEncoded_length();
        this.pendings.incrementAndGet();
        if (this.enableNettyMetrics && this.sendSpeed != null) {
            this.sendSpeed.update(batchSize);
        }
        totalSendSpeed.update(batchSize);
        this.batchSizeWorkerHistogram.update(batchSize);
        ChannelFuture future = channel.write((Object)requests);
        future.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                NettyClient.this.pendings.decrementAndGet();
                if (!future.isSuccess()) {
                    Channel channel = future.getChannel();
                    if (!NettyClient.this.isClosed()) {
                        LOG.info("Failed to send requests to " + NettyClient.this.name + ": " + channel.toString() + ":", future.getCause());
                    }
                    if (null != channel) {
                        NettyClient.this.exceptionChannel(channel);
                    }
                }
            }
        });
    }

    public void unregisterMetrics() {
        if (this.enableNettyMetrics) {
            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName("NettyCliSendTime", this.nettyConnection), MetricType.HISTOGRAM));
            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName("NettyCliSendPending", this.nettyConnection), MetricType.GAUGE));
            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName("NettyCliCacheSize", this.nettyConnection), MetricType.GAUGE));
            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName("NettyCliSendSpeed", this.nettyConnection), MetricType.METER));
        }
        JStormMetrics.unregisterWorkerMetric(MetricUtils.workerMetricName("NettyCliSendSpeed", MetricType.METER));
        JStormMetrics.unregisterWorkerMetric(MetricUtils.workerMetricName("NettyCliSendBatchSize", MetricType.HISTOGRAM));
        JStormHealthCheck.unregisterWorkerHealthCheck("NettyCliCacheSize:" + this.nettyConnection.toString());
        JStormHealthCheck.unregisterWorkerHealthCheck("NettyCliConnCheck:" + this.nettyConnection.toString());
    }

    @Override
    public void close() {
        Channel channel;
        LOG.info("Close netty connection to {}", (Object)this.name());
        if (!this.being_closed.compareAndSet(false, true)) {
            LOG.info("Netty client has been closed.");
            return;
        }
        if (!this.connectMyself) {
            this.unregisterMetrics();
        }
        if ((channel = this.channelRef.get()) == null) {
            LOG.info("Channel {} has been closed before", (Object)this.name());
            return;
        }
        if (channel.isWritable()) {
            MessageBatch toBeFlushed = this.messageBatchRef.getAndSet(null);
            this.flushRequest(channel, toBeFlushed);
        }
        long timeoutMilliSeconds = 10000L;
        long start = System.currentTimeMillis();
        LOG.info("Waiting for pending batchs to be sent with " + this.name() + "..., timeout: {}ms, pendings: {}", (Object)10000L, (Object)this.pendings.get());
        while (this.pendings.get() != 0L) {
            try {
                long delta = System.currentTimeMillis() - start;
                if (delta > 10000L) {
                    LOG.error("Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent", (Object)this.name(), (Object)this.pendings.get());
                    break;
                }
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                // empty catch block
                break;
            }
        }
        this.close_n_release();
    }

    void close_n_release() {
        if (this.channelRef.get() != null) {
            this.setChannel(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void closeChannel(final Channel channel) {
        Object object = this.channelClosing;
        synchronized (object) {
            if (this.closingChannel.contains(channel)) {
                LOG.info(channel.toString() + " is already closed");
                return;
            }
            this.closingChannel.add(channel);
        }
        LOG.debug(channel.toString() + " begin to closed");
        ChannelFuture closeFuture = channel.close();
        closeFuture.addListener(new ChannelFutureListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void operationComplete(ChannelFuture future) throws Exception {
                Object object = NettyClient.this.channelClosing;
                synchronized (object) {
                    NettyClient.this.closingChannel.remove(channel);
                }
                LOG.debug(channel.toString() + " finish closed");
            }
        });
    }

    void disconnectChannel(Channel channel) {
        if (this.isClosed()) {
            return;
        }
        if (channel == this.channelRef.get()) {
            this.setChannel(null);
            this.reconnect();
        } else {
            this.closeChannel(channel);
        }
    }

    void exceptionChannel(Channel channel) {
        if (channel == this.channelRef.get()) {
            this.setChannel(null);
        } else {
            this.closeChannel(channel);
        }
    }

    void setChannel(Channel newChannel) {
        Channel oldChannel = this.channelRef.getAndSet(newChannel);
        if (newChannel != null) {
            this.retries.set(0);
        }
        String oldLocalAddres = oldChannel == null ? "null" : oldChannel.getLocalAddress().toString();
        String newLocalAddress = newChannel == null ? "null" : newChannel.getLocalAddress().toString();
        LOG.info("Use new channel {} replace old channel {}", (Object)newLocalAddress, (Object)oldLocalAddres);
        if (oldChannel != newChannel && oldChannel != null) {
            this.closeChannel(oldChannel);
            LOG.info("Successfully close old channel " + oldLocalAddres);
        }
    }

    @Override
    public boolean isClosed() {
        return this.being_closed.get();
    }

    public AtomicBoolean getBeing_closed() {
        return this.being_closed;
    }

    public int getBuffer_size() {
        return this.buffer_size;
    }

    public SocketAddress getRemoteAddr() {
        return this.remote_addr;
    }

    public String name() {
        return this.name;
    }

    public void handleResponse() {
        LOG.warn("Should be overload");
    }

    @Override
    public Object recv(Integer taskId, int flags) {
        throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages");
    }

    @Override
    public void registerQueue(Integer taskId, DisruptorQueue recvQueu) {
        throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages");
    }

    @Override
    public void enqueue(TaskMessage message) {
        throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages");
    }

    @Override
    public boolean available() {
        return this.isChannelReady() != null;
    }

    public static class CacheGaugeHealthCheck
    extends HealthCheck
    implements Gauge<Double> {
        AtomicReference<MessageBatch> messageBatchRef;
        String name;
        HealthCheck.Result healthy;

        public CacheGaugeHealthCheck(AtomicReference<MessageBatch> messageBatchRef, String name) {
            this.messageBatchRef = messageBatchRef;
            this.name = name;
            this.healthy = HealthCheck.Result.healthy();
        }

        public Double getValue() {
            MessageBatch messageBatch = this.messageBatchRef.get();
            if (messageBatch == null) {
                return 0.0;
            }
            return messageBatch.getEncoded_length();
        }

        protected HealthCheck.Result check() throws Exception {
            Double size = this.getValue();
            if (size > (double)(8L * JStormUtils.SIZE_1_M)) {
                return HealthCheck.Result.unhealthy((String)(this.name + " is full"));
            }
            return this.healthy;
        }
    }
}

