/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.message.netty;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.message.netty.MessageDecoder;
import com.alibaba.jstorm.message.netty.NettyRenameThreadFactory;
import com.alibaba.jstorm.message.netty.StormServerPipelineFactory;
import com.alibaba.jstorm.utils.JStormUtils;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NettyServer
implements IConnection {
    private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class);
    Map storm_conf;
    int port;
    volatile ChannelGroup allChannels = new DefaultChannelGroup("jstorm-server");
    final ChannelFactory factory;
    final ServerBootstrap bootstrap;
    private final boolean isSyncMode;
    private ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues;
    private DisruptorQueue recvControlQueue;

    NettyServer(Map storm_conf, int port, boolean isSyncMode, ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues, DisruptorQueue recvControlQueue) {
        this.storm_conf = storm_conf;
        this.port = port;
        this.isSyncMode = isSyncMode;
        this.deserializeQueues = deserializeQueues;
        this.recvControlQueue = recvControlQueue;
        int buffer_size = Utils.getInt(storm_conf.get("storm.messaging.netty.buffer_size"));
        int maxWorkers = Utils.getInt(storm_conf.get("storm.messaging.netty.server_worker_threads"));
        NettyRenameThreadFactory bossFactory = new NettyRenameThreadFactory("server-boss");
        NettyRenameThreadFactory workerFactory = new NettyRenameThreadFactory("server-worker");
        this.factory = maxWorkers > 0 ? new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool(bossFactory), (Executor)Executors.newCachedThreadPool(workerFactory), maxWorkers) : new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool(bossFactory), (Executor)Executors.newCachedThreadPool(workerFactory));
        this.bootstrap = new ServerBootstrap(this.factory);
        this.bootstrap.setOption("reuserAddress", (Object)true);
        this.bootstrap.setOption("child.tcpNoDelay", (Object)true);
        this.bootstrap.setOption("child.receiveBufferSize", (Object)buffer_size);
        this.bootstrap.setOption("child.keepAlive", (Object)true);
        this.bootstrap.setPipelineFactory((ChannelPipelineFactory)new StormServerPipelineFactory(this, storm_conf));
        Channel channel = this.bootstrap.bind((SocketAddress)new InetSocketAddress(port));
        this.allChannels.add((Object)channel);
        LOG.info("Successfull bind {}, buffer_size:{}, maxWorkers:{}", new Object[]{port, buffer_size, maxWorkers});
    }

    @Override
    public void registerQueue(Integer taskId, DisruptorQueue recvQueu) {
        this.deserializeQueues.put(taskId, recvQueu);
    }

    @Override
    public void enqueue(TaskMessage message) {
        short type = message.get_type();
        if (type == 0) {
            int task = message.task();
            DisruptorQueue queue = this.deserializeQueues.get(task);
            if (queue == null) {
                LOG.warn("Received invalid message directed at port " + task + ". Dropping...");
                return;
            }
            queue.publish(message.message());
        } else if (type == 1) {
            if (this.recvControlQueue == null) {
                LOG.info("this worker's recvControlQueue is null, so  Dropping this control message");
                return;
            }
            this.recvControlQueue.publish(message);
        }
    }

    @Override
    public Object recv(Integer taskId, int flags) {
        try {
            DisruptorQueue recvQueue = this.deserializeQueues.get(taskId);
            if ((flags & 1) == 1) {
                return recvQueue.poll();
            }
            return recvQueue.take();
        }
        catch (Exception e) {
            LOG.warn("Occur unexception ", (Throwable)e);
            return null;
        }
    }

    protected void addChannel(Channel channel) {
        this.allChannels.add((Object)channel);
    }

    protected void closeChannel(Channel channel) {
        MessageDecoder.removeTransmitHistogram(channel);
        channel.close().awaitUninterruptibly();
        this.allChannels.remove((Object)channel);
    }

    @Override
    public synchronized void close() {
        LOG.info("Begin to shutdown NettyServer");
        if (this.allChannels != null) {
            new Thread(new Runnable(){

                @Override
                public void run() {
                    try {
                        NettyServer.this.allChannels.close().await(1L, TimeUnit.SECONDS);
                        LOG.info("Successfully close all channel");
                        NettyServer.this.factory.releaseExternalResources();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    NettyServer.this.allChannels = null;
                }
            }).start();
            JStormUtils.sleepMs(1000L);
        }
        LOG.info("Successfully shutdown NettyServer");
    }

    @Override
    public void send(List<TaskMessage> messages) {
        throw new UnsupportedOperationException("Server connection should not send any messages");
    }

    @Override
    public void send(TaskMessage message) {
        throw new UnsupportedOperationException("Server connection should not send any messages");
    }

    @Override
    public boolean isClosed() {
        return false;
    }

    public boolean isSyncMode() {
        return this.isSyncMode;
    }

    @Override
    public boolean available() {
        return true;
    }
}

