/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.message.netty;

import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.message.netty.MessageBatch;
import com.alibaba.jstorm.message.netty.NettyClient;
import com.alibaba.jstorm.message.netty.ReconnectRunnable;
import com.alibaba.jstorm.utils.IntervalCheck;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NettyClientAsync
extends NettyClient {
    private static final Logger LOG = LoggerFactory.getLogger(NettyClientAsync.class);
    public static final String PREFIX = "Netty-Client-";
    protected long BATCH_THREASHOLD_WARN;
    protected final boolean directlySend;
    protected AtomicBoolean flush_later;
    protected int flushCheckInterval;
    protected final boolean blockSend;

    boolean isDirectSend(Map conf) {
        if (JStormServerUtils.isOnePending(conf)) {
            return true;
        }
        return !ConfigExtension.isNettyTransferAsyncBatch(conf);
    }

    boolean isBlockSend(Map storm_conf) {
        if (!ConfigExtension.isTopologyContainAcker(storm_conf)) {
            return false;
        }
        return ConfigExtension.isNettyASyncBlock(storm_conf);
    }

    NettyClientAsync(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) {
        super(storm_conf, factory, scheduler, host, port, reconnector);
        this.BATCH_THREASHOLD_WARN = ConfigExtension.getNettyBufferThresholdSize(storm_conf);
        this.blockSend = this.isBlockSend(storm_conf);
        this.directlySend = this.isDirectSend(storm_conf);
        this.flush_later = new AtomicBoolean(false);
        this.flushCheckInterval = Utils.getInt(storm_conf.get("storm.messaging.netty.flush.check.interval.ms"), 10);
        Runnable flusher = new Runnable(){

            @Override
            public void run() {
                NettyClientAsync.this.flush();
            }
        };
        long initialDelay = Math.min(1000, this.max_sleep_ms * this.max_retries);
        scheduler.scheduleAtFixedRate(flusher, initialDelay, this.flushCheckInterval, TimeUnit.MILLISECONDS);
        this.clientChannelFactory = factory;
        this.start();
        LOG.info(this.toString());
    }

    @Override
    public synchronized void send(List<TaskMessage> messages) {
        if (this.isClosed()) {
            LOG.warn("Client is being closed, and does not take requests any more");
            return;
        }
        long start = this.enableNettyMetrics ? this.sendTimer.getTime() : 0L;
        try {
            this.pushBatch(messages);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            if (this.sendTimer != null && this.enableNettyMetrics) {
                this.sendTimer.updateTime(start);
            }
        }
    }

    @Override
    public synchronized void send(TaskMessage message) {
        if (this.isClosed()) {
            LOG.warn("Client is being closed, and does not take requests any more");
            return;
        }
        long start = this.enableNettyMetrics ? this.sendTimer.getTime() : 0L;
        try {
            this.pushBatch(message);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            if (this.sendTimer != null && this.enableNettyMetrics) {
                this.sendTimer.updateTime(start);
            }
        }
    }

    void waitChannelReady(long cachedSize, long sleepMs) {
        long begin = System.currentTimeMillis();
        boolean changeThreadhold = false;
        IntervalCheck oneSecond = new IntervalCheck();
        IntervalCheck timeoutIntervalCheck = new IntervalCheck();
        timeoutIntervalCheck.setIntervalMs(this.timeoutMs);
        long l = sleepMs = sleepMs < this.timeoutMs ? sleepMs : this.timeoutMs;
        while (this.isChannelReady() == null) {
            long now = System.currentTimeMillis();
            long delt = now - begin;
            if (oneSecond.check()) {
                LOG.warn("Target server  {} is unavailable, pending {}, bufferSize {}, block sending {}ms", new Object[]{this.name, this.pendings.get(), cachedSize, delt});
            }
            if (timeoutIntervalCheck.check()) {
                if (this.messageBatchRef.get() != null) {
                    LOG.warn("Target server  {} is unavailable, wait too much time, throw timeout message", (Object)this.name);
                    this.messageBatchRef.set(null);
                }
                this.setChannel(null);
                LOG.warn("Reset channel as null");
                if (!this.blockSend) {
                    this.reconnect();
                    break;
                }
            }
            this.reconnect();
            JStormUtils.sleepMs(sleepMs);
            if (delt > 2L * this.timeoutMs * 1000L && !changeThreadhold && this.channelRef.get() != null && this.BATCH_THREASHOLD_WARN >= (long)(2 * this.messageBatchSize)) {
                this.BATCH_THREASHOLD_WARN /= 2L;
                LOG.info("Reduce BATCH_THREASHOLD_WARN to {}", (Object)this.BATCH_THREASHOLD_WARN);
                changeThreadhold = true;
            }
            if (!this.isClosed()) continue;
            LOG.info("Channel has been closed " + this.name());
            break;
        }
    }

    long getDelaySec(long cachedSize) {
        long count = cachedSize / this.BATCH_THREASHOLD_WARN;
        long sleepMs = (long)(Math.pow(2.0, count) * 10.0);
        if (sleepMs > 1000L) {
            sleepMs = 1000L;
        }
        return sleepMs;
    }

    void handleFailedChannel(MessageBatch messageBatch) {
        this.messageBatchRef.set(messageBatch);
        this.flush_later.set(true);
        long cachedSize = messageBatch.getEncoded_length();
        if (cachedSize > this.BATCH_THREASHOLD_WARN) {
            long sleepMs = this.getDelaySec(cachedSize);
            this.waitChannelReady(cachedSize, sleepMs);
        }
    }

    void pushBatch(List<TaskMessage> messages) {
        if (messages.isEmpty()) {
            return;
        }
        MessageBatch messageBatch = this.messageBatchRef.getAndSet(null);
        if (null == messageBatch) {
            messageBatch = new MessageBatch(this.messageBatchSize);
        }
        for (TaskMessage message : messages) {
            Channel channel;
            if (TaskMessage.isEmpty(message)) continue;
            messageBatch.add(message);
            if (!messageBatch.isFull() || (channel = this.isChannelReady()) == null) continue;
            this.flushRequest(channel, messageBatch);
            messageBatch = new MessageBatch(this.messageBatchSize);
        }
        Channel channel = this.isChannelReady();
        if (channel == null) {
            this.handleFailedChannel(messageBatch);
        } else if (!messageBatch.isEmpty()) {
            this.flushRequest(channel, messageBatch);
        }
    }

    void pushBatch(TaskMessage message) {
        if (TaskMessage.isEmpty(message)) {
            return;
        }
        MessageBatch messageBatch = this.messageBatchRef.getAndSet(null);
        if (null == messageBatch) {
            messageBatch = new MessageBatch(this.messageBatchSize);
        }
        messageBatch.add(message);
        Channel channel = this.isChannelReady();
        if (channel == null) {
            this.handleFailedChannel(messageBatch);
            return;
        }
        if (messageBatch.isFull()) {
            this.flushRequest(channel, messageBatch);
            return;
        }
        if (this.directlySend) {
            this.flushRequest(channel, messageBatch);
        } else if (this.messageBatchRef.compareAndSet(null, messageBatch)) {
            this.flush_later.set(true);
        } else {
            LOG.error("MessageBatch will be lost. This should not happen.");
        }
    }

    void flush() {
        if (this.isClosed()) {
            return;
        }
        if (!this.flush_later.get()) {
            return;
        }
        Channel channel = this.isChannelReady();
        if (channel == null) {
            return;
        }
        this.flush_later.set(false);
        MessageBatch toBeFlushed = this.messageBatchRef.getAndSet(null);
        this.flushRequest(channel, toBeFlushed);
    }

    @Override
    Channel isChannelReady() {
        Channel channel = super.isChannelReady();
        if (channel == null) {
            return null;
        }
        if (this.blockSend && this.pendings.get() >= (long)this.MAX_SEND_PENDING) {
            return null;
        }
        return channel;
    }

    @Override
    public void handleResponse() {
    }

    public String toString() {
        return ToStringBuilder.reflectionToString((Object)this, (ToStringStyle)ToStringStyle.SHORT_PREFIX_STYLE);
    }
}

