package com.alibaba.jstorm.message.netty;

import backtype.storm.Config;
import backtype.storm.messaging.NettyMessage;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.daemon.worker.Flusher;
import com.alibaba.jstorm.utils.JStormUtils;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.org.apache.commons.lang.builder.ToStringBuilder;
import shade.storm.org.apache.commons.lang.builder.ToStringStyle;

/* loaded from: input_file:com/alibaba/jstorm/message/netty/NettyClientAsync.class */
class NettyClientAsync extends NettyClient {
    private static final Logger LOG = LoggerFactory.getLogger(NettyClientAsync.class);
    public static final String PREFIX = "Netty-Client-";
    protected int flushCheckInterval;
    private HashMap<Integer, Condition> targetTasksUnderFlowCtrl;
    private Map<Integer, MessageBatch> targetTasksCache;
    private ConcurrentHashMap<String, Set<Integer>> remoteAddrToTasks;
    private ReentrantLock lock;
    private ReentrantLock flowCtrlLock;
    private int flowCtrlAwaitTime;
    private int cacheSize;

    /* loaded from: input_file:com/alibaba/jstorm/message/netty/NettyClientAsync$NettyClientFlush.class */
    private class NettyClientFlush extends Flusher {
        private AtomicBoolean _isFlushing = new AtomicBoolean(false);

        public NettyClientFlush(long j) {
            this._flushIntervalMs = j;
        }

        @Override // com.alibaba.jstorm.daemon.worker.Flusher, java.lang.Runnable
        public void run() {
            if (this._isFlushing.compareAndSet(false, true)) {
                synchronized (NettyClientAsync.this.writeLock) {
                    Channel channel = NettyClientAsync.this.channelRef.get();
                    if (channel != null && channel.isWritable() && NettyClientAsync.this.messageBuffer.size() > 0) {
                        NettyClientAsync.this.flushRequest(channel, NettyClientAsync.this.messageBuffer.drain());
                    }
                }
                this._isFlushing.set(false);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyClientAsync(Map map, ChannelFactory channelFactory, String str, int i, ReconnectRunnable reconnectRunnable) {
        super(map, channelFactory, str, i, reconnectRunnable);
        this.clientChannelFactory = channelFactory;
        this.targetTasksUnderFlowCtrl = new HashMap<>();
        this.targetTasksCache = new HashMap();
        this.remoteAddrToTasks = new ConcurrentHashMap<>();
        this.lock = new ReentrantLock();
        this.flowCtrlLock = new ReentrantLock();
        this.flowCtrlAwaitTime = ConfigExtension.getNettyFlowCtrlWaitTime(map);
        this.cacheSize = ConfigExtension.getNettyFlowCtrlCacheSize(map) != null ? ConfigExtension.getNettyFlowCtrlCacheSize(map).intValue() : this.messageBatchSize;
        this.flushCheckInterval = Utils.getInt(map.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 5).intValue();
        new NettyClientFlush(this.flushCheckInterval).start();
        start();
    }

    @Override // com.alibaba.jstorm.message.netty.NettyClient, backtype.storm.messaging.IConnection
    public void send(List<TaskMessage> list) {
        if (isClosed()) {
            LOG.warn("Client is being closed, and does not take requests any more");
            return;
        }
        long time = (!this.enableNettyMetrics || this.sendTimer == null) ? 0L : this.sendTimer.getTime();
        try {
            try {
                Iterator<TaskMessage> it = list.iterator();
                while (it.hasNext()) {
                    waitforFlowCtrlAndSend(it.next());
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            if (this.sendTimer != null && this.enableNettyMetrics) {
                this.sendTimer.updateTime(time);
            }
        }
    }

    @Override // com.alibaba.jstorm.message.netty.NettyClient, backtype.storm.messaging.IConnection
    public void send(TaskMessage taskMessage) {
        if (isClosed()) {
            LOG.warn("Client is being closed, and does not take requests any more");
            return;
        }
        long time = (!this.enableNettyMetrics || this.sendTimer == null) ? 0L : this.sendTimer.getTime();
        try {
            try {
                waitforFlowCtrlAndSend(taskMessage);
                if (this.sendTimer == null || !this.enableNettyMetrics) {
                    return;
                }
                this.sendTimer.updateTime(time);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            if (this.sendTimer != null && this.enableNettyMetrics) {
                this.sendTimer.updateTime(time);
            }
            throw th;
        }
    }

    void pushBatch(NettyMessage nettyMessage) {
        if (nettyMessage == null || nettyMessage.isEmpty()) {
            return;
        }
        synchronized (this.writeLock) {
            Channel channel = this.channelRef.get();
            if (channel == null) {
                this.messageBuffer.add(nettyMessage, false);
                LOG.debug("Pending requested message, the size is {}, because channel is not ready.", Integer.valueOf(this.messageBuffer.size()));
            } else if (channel.isWritable()) {
                MessageBatch add = this.messageBuffer.add(nettyMessage);
                if (add != null) {
                    flushRequest(channel, add);
                }
            } else {
                this.messageBuffer.add(nettyMessage, false);
            }
            if (this.messageBuffer.size() >= this.BATCH_THREASHOLD_WARN) {
                waitForChannelReady();
            }
        }
    }

    public void waitForChannelReady() {
        Channel channel = this.channelRef.get();
        long j = 0;
        while (true) {
            if (channel != null && channel.isWritable()) {
                flushRequest(channel, this.messageBuffer.drain());
                return;
            }
            JStormUtils.sleepMs(1L);
            j++;
            if (this.timeoutMs != -1 && j >= this.timeoutMs) {
                LOG.warn("Discard message due to pending message timeout({}ms), messageSize={}", Long.valueOf(this.timeoutMs), Integer.valueOf(this.messageBuffer.size()));
                this.messageBuffer.clear();
                return;
            }
            if (j % 30000 == 0) {
                Logger logger = LOG;
                Object[] objArr = new Object[3];
                objArr[0] = Long.valueOf(j);
                objArr[1] = channel != null ? Boolean.valueOf(channel.isWritable()) : null;
                objArr[2] = channel != null ? channel.getRemoteAddress() : null;
                logger.info("Pending total time={}, channel.isWritable={}, remoteAddress={}", objArr);
            }
            channel = this.channelRef.get();
        }
    }

    @Override // com.alibaba.jstorm.message.netty.NettyClient
    public void handleResponse(Channel channel, Object obj) {
        if (obj == null) {
            return;
        }
        TaskMessage taskMessage = (TaskMessage) obj;
        short s = taskMessage.get_type();
        if (s != 2) {
            LOG.warn("Unexpected message (type={}) was received from task {}", Short.valueOf(s), Integer.valueOf(taskMessage.task()));
            return;
        }
        byte[] message = taskMessage.message();
        ByteBuffer allocate = ByteBuffer.allocate(33);
        allocate.put(message);
        allocate.flip();
        boolean z = allocate.get() == 1;
        int i = allocate.getInt();
        Set<Integer> set = this.remoteAddrToTasks.get(channel.getRemoteAddress().toString());
        if (set != null) {
            synchronized (set) {
                if (!set.contains(Integer.valueOf(i))) {
                    set.add(Integer.valueOf(i));
                }
            }
        } else {
            LOG.warn("TargetTasks set was not initialized correctly!");
            HashSet hashSet = new HashSet();
            hashSet.add(Integer.valueOf(i));
            this.remoteAddrToTasks.put(channel.getRemoteAddress().toString(), hashSet);
        }
        try {
            this.flowCtrlLock.lock();
            if (z) {
                this.targetTasksUnderFlowCtrl.put(Integer.valueOf(i), this.lock.newCondition());
            } else {
                Condition remove = this.targetTasksUnderFlowCtrl.remove(Integer.valueOf(i));
                if (remove != null) {
                    try {
                        this.lock.lock();
                        remove.signalAll();
                        this.lock.unlock();
                    } catch (Throwable th) {
                        this.lock.unlock();
                        throw th;
                    }
                }
                MessageBatch messageBatch = null;
                synchronized (this.targetTasksCache) {
                    if (this.targetTasksCache.get(Integer.valueOf(i)) != null) {
                        messageBatch = this.targetTasksCache.remove(Integer.valueOf(i));
                    }
                }
                if (messageBatch != null) {
                    pushBatch(messageBatch);
                }
            }
        } finally {
            this.flowCtrlLock.unlock();
        }
    }

    public String toString() {
        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
    }

    private void waitforFlowCtrlAndSend(TaskMessage taskMessage) {
        int task = taskMessage.task();
        boolean z = true;
        MessageBatch messageBatch = null;
        try {
            this.flowCtrlLock.lock();
            Condition condition = this.targetTasksUnderFlowCtrl.get(Integer.valueOf(task));
            if (condition != null) {
                MessageBatch messageBatch2 = this.targetTasksCache.get(Integer.valueOf(task));
                if (messageBatch2 == null) {
                    messageBatch2 = new MessageBatch(this.cacheSize);
                    this.targetTasksCache.put(Integer.valueOf(task), messageBatch2);
                }
                messageBatch2.add(taskMessage);
                if (messageBatch2.isFull()) {
                    messageBatch = this.targetTasksCache.remove(Integer.valueOf(task));
                } else {
                    z = false;
                }
            }
            if (z) {
                if (messageBatch == null) {
                    pushBatch(taskMessage);
                    return;
                }
                long j = 0;
                boolean z2 = false;
                while (condition != null && !z2) {
                    try {
                        try {
                            this.lock.lock();
                            z2 = condition.await(this.flowCtrlAwaitTime, TimeUnit.MILLISECONDS);
                            j += this.flowCtrlAwaitTime;
                        } catch (InterruptedException e) {
                            LOG.info("flow control was interrupted! targetTask-{}", Integer.valueOf(task));
                            this.lock.unlock();
                        }
                        if (this.timeoutMs != -1 && j >= this.timeoutMs) {
                            LOG.warn("Discard message under flow ctrl due to pending message timeout({}ms), messageSize={}", Long.valueOf(this.timeoutMs), Integer.valueOf(messageBatch.getEncodedLength()));
                            this.targetTasksUnderFlowCtrl.remove(Integer.valueOf(task));
                            this.lock.unlock();
                            return;
                        }
                        if (j % 30000 == 0) {
                            LOG.info("Pending total time={} since target task-{} is under flow control ", Long.valueOf(j), Integer.valueOf(task));
                        }
                        this.lock.unlock();
                        try {
                            this.flowCtrlLock.lock();
                            condition = this.targetTasksUnderFlowCtrl.get(Integer.valueOf(task));
                            this.flowCtrlLock.unlock();
                        } finally {
                            this.flowCtrlLock.unlock();
                        }
                    } catch (Throwable th) {
                        this.lock.unlock();
                        throw th;
                    }
                }
                pushBatch(messageBatch);
            }
        } finally {
            this.flowCtrlLock.unlock();
        }
    }

    @Override // com.alibaba.jstorm.message.netty.NettyClient
    public void connectChannel(Channel channel) {
        this.remoteAddrToTasks.put(channel.getRemoteAddress().toString(), new HashSet());
    }

    private void releaseFlowCtrlsForRemoteAddr(String str) {
        Set<Integer> set = this.remoteAddrToTasks.get(str);
        if (set != null) {
            try {
                this.flowCtrlLock.lock();
                Iterator<Integer> it = set.iterator();
                while (it.hasNext()) {
                    Condition remove = this.targetTasksUnderFlowCtrl.remove(it.next());
                    if (remove != null) {
                        try {
                            this.lock.lock();
                            remove.signalAll();
                            this.lock.unlock();
                        } catch (Throwable th) {
                            this.lock.unlock();
                            throw th;
                        }
                    }
                }
            } finally {
                this.flowCtrlLock.unlock();
            }
        }
    }

    @Override // com.alibaba.jstorm.message.netty.NettyClient
    public void disconnectChannel(Channel channel) {
        if (isClosed()) {
            return;
        }
        if (channel != this.channelRef.get()) {
            releaseFlowCtrlsForRemoteAddr(channel.getRemoteAddress().toString());
            closeChannel(channel);
        } else {
            setChannel(null);
            releaseFlowCtrlsForRemoteAddr(channel.getRemoteAddress().toString());
            reconnect();
        }
    }
}
