package com.alibaba.jstorm.message.netty;

import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.BackpressureCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.utils.JStormUtils;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/alibaba/jstorm/message/netty/NettyServer.class */
public class NettyServer implements IConnection {
    private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class);
    Map stormConf;
    int port;
    volatile StormChannelGroup allChannels = new StormChannelGroup("jstorm-server");
    final ChannelFactory factory;
    final ServerBootstrap bootstrap;
    private ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues;
    private DisruptorQueue recvControlQueue;
    private Lock lock;
    private Map<Integer, HashSet<String>> remoteClientsUnderFlowCtrl;
    private boolean isBackpressureEnable;
    private float lowMark;
    private float highMark;
    private volatile boolean bstartRec;
    private final Set<Integer> workerTasks;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyServer(Map map, int i, ConcurrentHashMap<Integer, DisruptorQueue> concurrentHashMap, DisruptorQueue disruptorQueue, boolean z, Set<Integer> set) {
        this.stormConf = map;
        this.port = i;
        this.deserializeQueues = concurrentHashMap;
        this.recvControlQueue = disruptorQueue;
        this.bstartRec = z;
        this.workerTasks = set;
        int intValue = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_RECEIVE_BUFFER_SIZE)).intValue();
        int intValue2 = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS)).intValue();
        NettyRenameThreadFactory nettyRenameThreadFactory = new NettyRenameThreadFactory("server-boss");
        NettyRenameThreadFactory nettyRenameThreadFactory2 = new NettyRenameThreadFactory("server-worker");
        if (intValue2 > 0) {
            this.factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(nettyRenameThreadFactory), Executors.newCachedThreadPool(nettyRenameThreadFactory2), intValue2);
        } else {
            this.factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(nettyRenameThreadFactory), Executors.newCachedThreadPool(nettyRenameThreadFactory2));
        }
        this.bootstrap = new ServerBootstrap(this.factory);
        this.bootstrap.setOption("reuserAddress", true);
        this.bootstrap.setOption("child.tcpNoDelay", true);
        this.bootstrap.setOption("child.receiveBufferSize", Integer.valueOf(intValue));
        this.bootstrap.setOption("child.keepAlive", true);
        this.bootstrap.setPipelineFactory(new StormServerPipelineFactory(this, map));
        this.allChannels.add(this.bootstrap.bind(new InetSocketAddress(i)));
        LOG.info("Successfull bind {}, buffer_size:{}, maxWorkers:{}", new Object[]{Integer.valueOf(i), Integer.valueOf(intValue), Integer.valueOf(intValue2)});
        this.lock = new ReentrantLock();
        this.remoteClientsUnderFlowCtrl = new HashMap();
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            this.remoteClientsUnderFlowCtrl.put(it.next(), new HashSet<>());
        }
        this.isBackpressureEnable = ConfigExtension.isBackpressureEnable(map);
        this.highMark = (float) ConfigExtension.getBackpressureWaterMarkHigh(map);
        this.lowMark = (float) ConfigExtension.getBackpressureWaterMarkLow(map);
        LOG.info("isBackpressureEnable: {}, highMark: {}, lowMark: {}", new Object[]{Boolean.valueOf(this.isBackpressureEnable), Float.valueOf(this.highMark), Float.valueOf(this.lowMark)});
    }

    @Override // backtype.storm.messaging.IConnection
    public void registerQueue(Integer num, DisruptorQueue disruptorQueue) {
        this.deserializeQueues.put(num, disruptorQueue);
    }

    private void sendFlowCtrlResp(Channel channel, int i) {
        ByteBuffer allocate = ByteBuffer.allocate(33);
        allocate.put((byte) 1);
        allocate.putInt(i);
        channel.write(new TaskMessage((short) 2, 1, allocate.array()));
    }

    private void flowCtrl(Channel channel, String str, DisruptorQueue disruptorQueue, int i, byte[] bArr) {
        boolean z = false;
        boolean z2 = false;
        if (disruptorQueue.pctFull() > this.lowMark || disruptorQueue.cacheSize() > 0) {
            HashSet<String> hashSet = this.remoteClientsUnderFlowCtrl.get(Integer.valueOf(i));
            synchronized (hashSet) {
                if (hashSet.isEmpty()) {
                    if (disruptorQueue.pctFull() >= this.highMark) {
                        hashSet.add(str);
                        z2 = true;
                        sendFlowCtrlResp(channel, i);
                        z = true;
                    }
                } else if (hashSet.contains(str)) {
                    z = true;
                } else {
                    hashSet.add(str);
                    sendFlowCtrlResp(channel, i);
                    z = true;
                }
            }
        }
        if (!z) {
            disruptorQueue.publish(bArr);
            return;
        }
        disruptorQueue.publishCache(bArr);
        if (z2) {
            disruptorQueue.publishCallback(new BackpressureCallback(this.allChannels, disruptorQueue, this.lowMark, i, this.remoteClientsUnderFlowCtrl));
        }
    }

    @Override // backtype.storm.messaging.IConnection
    public void enqueue(TaskMessage taskMessage, Channel channel) {
        while (!this.bstartRec) {
            LOG.info("check deserializeQueues have already been created");
            boolean z = true;
            Iterator<Integer> it = this.workerTasks.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (this.deserializeQueues.get(it.next()) == null) {
                    z = false;
                    JStormUtils.sleepMs(10L);
                    break;
                }
            }
            if (z) {
                this.bstartRec = z;
            }
        }
        short s = taskMessage.get_type();
        if (s != 0) {
            if (s != 1) {
                LOG.warn("Unexpected message (type={}) was received from task {}", Short.valueOf(s), Integer.valueOf(taskMessage.task()));
                return;
            } else if (this.recvControlQueue == null) {
                LOG.info("Can not find the recvControlQueue. So, dropping this control message");
                return;
            } else {
                this.recvControlQueue.publish(taskMessage);
                return;
            }
        }
        int task = taskMessage.task();
        DisruptorQueue disruptorQueue = this.deserializeQueues.get(Integer.valueOf(task));
        if (disruptorQueue == null) {
            LOG.warn("Received invalid message directed at task {}. Dropping...", Integer.valueOf(task));
            LOG.debug("Message data: {}", JStormUtils.toPrintableString(taskMessage.message()));
        } else if (this.isBackpressureEnable) {
            flowCtrl(channel, channel.getRemoteAddress().toString(), disruptorQueue, task, taskMessage.message());
        } else {
            disruptorQueue.publish(taskMessage.message());
        }
    }

    @Override // backtype.storm.messaging.IConnection
    public Object recv(Integer num, int i) {
        try {
            DisruptorQueue disruptorQueue = this.deserializeQueues.get(num);
            return (i & 1) == 1 ? disruptorQueue.poll() : disruptorQueue.take();
        } catch (Exception e) {
            LOG.warn("Occur unexception ", e);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addChannel(Channel channel) {
        this.allChannels.add(channel);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeChannel(Channel channel) {
        MessageDecoder.removeTransmitHistogram(channel);
        channel.close().awaitUninterruptibly();
        this.allChannels.remove(channel);
    }

    @Override // backtype.storm.messaging.IConnection
    public void close() {
        LOG.info("Begin to shutdown NettyServer");
        if (this.allChannels != null) {
            new Thread(new Runnable() { // from class: com.alibaba.jstorm.message.netty.NettyServer.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        NettyServer.this.allChannels.close().await(1L, TimeUnit.SECONDS);
                        NettyServer.LOG.info("Successfully close all channel");
                        NettyServer.this.factory.releaseExternalResources();
                    } catch (Exception e) {
                    }
                    NettyServer.this.allChannels = null;
                }
            }).start();
            JStormUtils.sleepMs(1000L);
        }
        LOG.info("Successfully shutdown NettyServer");
    }

    @Override // backtype.storm.messaging.IConnection
    public void send(List<TaskMessage> list) {
        throw new UnsupportedOperationException("Server connection should not send any messages");
    }

    @Override // backtype.storm.messaging.IConnection
    public void send(TaskMessage taskMessage) {
        throw new UnsupportedOperationException("Server connection should not send any messages");
    }

    @Override // backtype.storm.messaging.IConnection
    public boolean isClosed() {
        return false;
    }

    @Override // backtype.storm.messaging.IConnection
    public boolean available() {
        return true;
    }

    public StormChannelGroup getChannelGroup() {
        return this.allChannels;
    }
}
