/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.message.netty;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.message.netty.NettyClientAsync;
import com.alibaba.jstorm.message.netty.NettyClientSync;
import com.alibaba.jstorm.message.netty.NettyRenameThreadFactory;
import com.alibaba.jstorm.message.netty.NettyServer;
import com.alibaba.jstorm.message.netty.ReconnectRunnable;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyContext
implements IContext {
    private static final Logger LOG = LoggerFactory.getLogger(NettyContext.class);
    private Map storm_conf;
    private NioClientSocketChannelFactory clientChannelFactory;
    private ScheduledExecutorService clientScheduleService;
    private final int MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE = 5;
    private ReconnectRunnable reconnector;
    private boolean isSyncMode = false;

    @Override
    public void prepare(Map storm_conf) {
        this.storm_conf = storm_conf;
        int maxWorkers = Utils.getInt(storm_conf.get("storm.messaging.netty.client_worker_threads"));
        NettyRenameThreadFactory bossFactory = new NettyRenameThreadFactory("NettyCliboss");
        NettyRenameThreadFactory workerFactory = new NettyRenameThreadFactory("NettyCliworker");
        this.clientChannelFactory = maxWorkers > 0 ? new NioClientSocketChannelFactory((Executor)Executors.newCachedThreadPool(bossFactory), (Executor)Executors.newCachedThreadPool(workerFactory), maxWorkers) : new NioClientSocketChannelFactory((Executor)Executors.newCachedThreadPool(bossFactory), (Executor)Executors.newCachedThreadPool(workerFactory));
        int otherWorkers = Utils.getInt(storm_conf.get("topology.workers"), 1) - 1;
        int poolSize = Math.min(Math.max(1, otherWorkers), 5);
        this.clientScheduleService = Executors.newScheduledThreadPool(poolSize, new NettyRenameThreadFactory("client-schedule-service"));
        this.reconnector = new ReconnectRunnable();
        new AsyncLoopThread(this.reconnector, true, 1, true);
        this.isSyncMode = ConfigExtension.isNettySyncMode(storm_conf);
    }

    @Override
    public IConnection bind(String topology_id, int port, ConcurrentHashMap<Integer, DisruptorQueue> deserializedQueue, DisruptorQueue recvControlQueue) {
        NettyServer retConnection = null;
        try {
            retConnection = new NettyServer(this.storm_conf, port, this.isSyncMode, deserializedQueue, recvControlQueue);
        }
        catch (Throwable e) {
            LOG.error("Failed to instance NettyServer", e.getCause());
            JStormUtils.halt_process(-1, "Failed to bind " + port);
        }
        return retConnection;
    }

    @Override
    public IConnection connect(String topology_id, String host, int port) {
        if (this.isSyncMode) {
            return new NettyClientSync(this.storm_conf, (ChannelFactory)this.clientChannelFactory, this.clientScheduleService, host, port, this.reconnector);
        }
        return new NettyClientAsync(this.storm_conf, (ChannelFactory)this.clientChannelFactory, this.clientScheduleService, host, port, this.reconnector);
    }

    @Override
    public void term() {
        this.clientScheduleService.shutdown();
        try {
            this.clientScheduleService.awaitTermination(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            LOG.error("Error when shutting down client scheduler", (Throwable)e);
        }
        this.clientChannelFactory.releaseExternalResources();
        this.reconnector.shutdown();
    }
}

