/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.message.netty;

import backtype.storm.messaging.ControlMessage;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.message.netty.MessageBatch;
import com.alibaba.jstorm.message.netty.NettyClient;
import com.alibaba.jstorm.message.netty.NettyRenameThreadFactory;
import com.alibaba.jstorm.message.netty.ReconnectRunnable;
import com.alibaba.jstorm.metric.JStormHealthCheck;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.codahale.metrics.Gauge;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NettyClientSync
extends NettyClient
implements EventHandler {
    private static final Logger LOG = LoggerFactory.getLogger(NettyClientSync.class);
    private ConcurrentLinkedQueue<MessageBatch> batchQueue;
    private DisruptorQueue disruptorQueue;
    private ExecutorService bossExecutor;
    private ExecutorService workerExecutor;
    private AtomicLong emitTs = new AtomicLong(0L);

    NettyClientSync(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) {
        super(storm_conf, factory, scheduler, host, port, reconnector);
        this.batchQueue = new ConcurrentLinkedQueue();
        WaitStrategy waitStrategy = (WaitStrategy)Utils.newInstance((String)storm_conf.get("topology.disruptor.wait.strategy"));
        this.disruptorQueue = DisruptorQueue.mkInstance(this.name, ProducerType.MULTI, this.MAX_SEND_PENDING * 8, waitStrategy);
        if (!this.connectMyself) {
            this.registerSyncMetrics();
        }
        Runnable trigger = new Runnable(){

            @Override
            public void run() {
                NettyClientSync.this.trigger();
            }
        };
        scheduler.scheduleAtFixedRate(trigger, 10L, 1L, TimeUnit.SECONDS);
        NettyRenameThreadFactory bossFactory = new NettyRenameThreadFactory("NettyCli" + JStormServerUtils.getName(host, port) + "-boss");
        this.bossExecutor = Executors.newCachedThreadPool(bossFactory);
        NettyRenameThreadFactory workerFactory = new NettyRenameThreadFactory("NettyCli" + JStormServerUtils.getName(host, port) + "-worker");
        this.workerExecutor = Executors.newCachedThreadPool(workerFactory);
        this.clientChannelFactory = new NioClientSocketChannelFactory((Executor)this.bossExecutor, (Executor)this.workerExecutor, 1);
        this.start();
        LOG.info(this.toString());
    }

    public void registerSyncMetrics() {
        if (this.enableNettyMetrics) {
            JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName("NettyCliSyncBatchQ" + this.nettyConnection.toString(), MetricType.GAUGE), new AsmGauge(new Gauge<Double>(){

                public Double getValue() {
                    return NettyClientSync.this.batchQueue.size();
                }
            }));
            QueueGauge cacheQueueGauge = new QueueGauge(this.disruptorQueue, "NettyCliSyncDisrQ", this.nettyConnection.toString());
            JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName("NettyCliSyncDisrQ" + this.nettyConnection.toString(), MetricType.GAUGE), new AsmGauge(cacheQueueGauge));
            JStormHealthCheck.registerWorkerHealthCheck("NettyCliSyncDisrQ:" + this.nettyConnection.toString(), cacheQueueGauge);
        }
    }

    @Override
    public void send(List<TaskMessage> messages) {
        for (TaskMessage msg : messages) {
            this.disruptorQueue.publish(msg);
        }
    }

    @Override
    public void send(TaskMessage message) {
        this.disruptorQueue.publish(message);
    }

    public void flushBatch(MessageBatch batch, Channel channel) {
        this.emitTs.set(System.currentTimeMillis());
        if (batch == null) {
            LOG.warn("Handle no data to {}, this shouldn't occur", (Object)this.name);
        } else if (channel == null || !channel.isWritable()) {
            LOG.warn("Channel occur exception, during batch messages {}", (Object)this.name);
            this.batchQueue.offer(batch);
        } else {
            this.flushRequest(channel, batch);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendData() {
        long start = this.enableNettyMetrics ? this.sendTimer.getTime() : 0L;
        try {
            MessageBatch batch = this.batchQueue.poll();
            if (batch == null) {
                this.disruptorQueue.consumeBatchWhenAvailable(this);
                batch = this.batchQueue.poll();
            }
            Channel channel = (Channel)this.channelRef.get();
            this.flushBatch(batch, channel);
        }
        catch (Throwable e) {
            LOG.error("Occur e", e);
            String err = this.name + " nettyclient occur unknow exception";
            JStormUtils.halt_process(-1, err);
        }
        finally {
            if (this.sendTimer != null && this.enableNettyMetrics) {
                this.sendTimer.updateTime(start);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendAllData() {
        long start = this.enableNettyMetrics ? this.sendTimer.getTime() : 0L;
        try {
            this.disruptorQueue.consumeBatch(this);
            MessageBatch batch = this.batchQueue.poll();
            while (batch != null) {
                Channel channel = (Channel)this.channelRef.get();
                if (channel == null) {
                    LOG.info("No channel {} to flush all data", (Object)this.name);
                    return;
                }
                if (!channel.isWritable()) {
                    LOG.info("Channel {} is no writable", (Object)this.name);
                    return;
                }
                this.flushBatch(batch, channel);
                batch = this.batchQueue.poll();
            }
        }
        catch (Throwable e) {
            LOG.error("Occur e", e);
            String err = this.name + " nettyclient occur unknow exception";
            JStormUtils.halt_process(-1, err);
        }
        finally {
            if (this.sendTimer != null && this.enableNettyMetrics) {
                this.sendTimer.updateTime(start);
            }
        }
    }

    @Override
    public void handleResponse() {
        this.emitTs.set(0L);
        this.sendData();
    }

    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        if (event == null) {
            return;
        }
        MessageBatch messageBatch = this.messageBatchRef.getAndSet(null);
        if (null == messageBatch) {
            messageBatch = new MessageBatch(this.messageBatchSize);
        }
        messageBatch.add(event);
        if (messageBatch.isFull()) {
            this.batchQueue.offer(messageBatch);
        } else if (endOfBatch) {
            this.batchQueue.offer(messageBatch);
        } else {
            this.messageBatchRef.set(messageBatch);
        }
    }

    void trigger() {
        if (this.isClosed()) {
            return;
        }
        long emitTime = this.emitTs.get();
        if (emitTime == 0L) {
            return;
        }
        long now = System.currentTimeMillis();
        long delt = now - emitTime;
        if (delt < this.timeoutMs) {
            return;
        }
        Channel channel = (Channel)this.channelRef.get();
        if (channel != null) {
            LOG.info("Long time no response of {}, {}s", (Object)this.name, (Object)(delt / 1000L));
            channel.write((Object)ControlMessage.EOB_MESSAGE);
        }
    }

    protected void shutdownPool() {
        this.bossExecutor.shutdownNow();
        this.workerExecutor.shutdownNow();
        try {
            this.bossExecutor.awaitTermination(1L, TimeUnit.SECONDS);
            this.workerExecutor.awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            LOG.error("Error when shutting down client scheduler", (Throwable)e);
        }
        this.clientChannelFactory.releaseExternalResources();
    }

    public void unregisterSyncMetrics() {
        if (this.enableNettyMetrics) {
            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName("NettyCliSyncBatchQ" + this.nettyConnection.toString(), MetricType.GAUGE));
            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName("NettyCliSyncDisrQ" + this.nettyConnection.toString(), MetricType.GAUGE));
            JStormHealthCheck.unregisterWorkerHealthCheck("NettyCliSyncDisrQ:" + this.nettyConnection.toString());
        }
    }

    @Override
    public void close() {
        LOG.info("Begin to close connection to {} and flush all data, batchQueue {}, disruptor {}", new Object[]{this.name, this.batchQueue.size(), this.disruptorQueue.population()});
        this.sendAllData();
        this.disruptorQueue.haltWithInterrupt();
        if (!this.connectMyself) {
            this.unregisterSyncMetrics();
        }
        super.close();
        this.shutdownPool();
    }

    public String toString() {
        return ToStringBuilder.reflectionToString((Object)this, (ToStringStyle)ToStringStyle.SHORT_PREFIX_STYLE);
    }
}

