/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.broker.amqp;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.broker.amqp.AmqpServerConfiguration;
import org.wso2.broker.amqp.SslHandlerFactory;
import org.wso2.broker.amqp.codec.handlers.AmqpConnectionHandler;
import org.wso2.broker.amqp.codec.handlers.AmqpDecoder;
import org.wso2.broker.amqp.codec.handlers.AmqpEncoder;
import org.wso2.broker.amqp.codec.handlers.AmqpMessageWriter;
import org.wso2.broker.amqp.codec.handlers.BlockingTaskHandler;
import org.wso2.broker.amqp.metrics.AmqpMetricManager;
import org.wso2.broker.amqp.metrics.DefaultAmqpMetricManager;
import org.wso2.broker.amqp.metrics.NullAmqpMetricManager;
import org.wso2.broker.common.BrokerConfigProvider;
import org.wso2.broker.common.StartupContext;
import org.wso2.broker.coordination.BasicHaListener;
import org.wso2.broker.coordination.HaListener;
import org.wso2.broker.coordination.HaStrategy;
import org.wso2.broker.core.Broker;
import org.wso2.carbon.metrics.core.MetricService;

public class Server {
    private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);
    private static final int BLOCKING_TASK_EXECUTOR_THREADS = 32;
    private final Broker broker;
    private final AmqpServerConfiguration configuration;
    private final AmqpMetricManager metricManager;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private EventExecutorGroup ioExecutors;
    private Channel plainServerChannel;
    private Channel sslServerChannel;
    private HaStrategy haStrategy;
    private ServerHelper serverHelper;

    public Server(StartupContext startupContext) throws Exception {
        MetricService metrics = (MetricService)startupContext.getService(MetricService.class);
        this.metricManager = Objects.nonNull(metrics) ? new DefaultAmqpMetricManager(metrics) : new NullAmqpMetricManager();
        BrokerConfigProvider configProvider = (BrokerConfigProvider)startupContext.getService(BrokerConfigProvider.class);
        this.configuration = (AmqpServerConfiguration)configProvider.getConfigurationObject("wso2.broker.transport.amqp", AmqpServerConfiguration.class);
        this.broker = (Broker)startupContext.getService(Broker.class);
        if (this.broker == null) {
            throw new RuntimeException("Could not find the broker class to initialize AMQP server");
        }
        this.bossGroup = new NioEventLoopGroup();
        this.workerGroup = new NioEventLoopGroup();
        ThreadFactory blockingTaskThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyBlockingTaskThread-%d").build();
        this.ioExecutors = new DefaultEventExecutorGroup(32, blockingTaskThreadFactory);
        this.haStrategy = (HaStrategy)startupContext.getService(HaStrategy.class);
        if (this.haStrategy == null) {
            this.serverHelper = new ServerHelper();
        } else {
            LOGGER.info("AMQP Transport is in PASSIVE mode");
            this.serverHelper = new HaEnabledServerHelper();
        }
    }

    private void shutdownExecutors() {
        LOGGER.info("Shutting down Netty Executors for AMQP transport");
        this.workerGroup.shutdownGracefully();
        this.bossGroup.shutdownGracefully();
        this.ioExecutors.shutdownGracefully();
    }

    private ChannelFuture bindToPlainSocket() throws InterruptedException {
        String hostname = this.configuration.getHostName();
        int port = Integer.parseInt(this.configuration.getPlain().getPort());
        ServerBootstrap b = new ServerBootstrap();
        ((ServerBootstrap)((ServerBootstrap)b.group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class)).childHandler((ChannelHandler)new SocketChannelInitializer(this.ioExecutors)).option(ChannelOption.SO_BACKLOG, (Object)128)).childOption(ChannelOption.SO_KEEPALIVE, (Object)true);
        ChannelFuture future = b.bind(hostname, port).sync();
        LOGGER.info("Listening AMQP on " + hostname + ":" + port);
        return future;
    }

    private ChannelFuture bindToSslSocket() throws InterruptedException, CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
        String hostname = this.configuration.getHostName();
        int port = Integer.parseInt(this.configuration.getSsl().getPort());
        ServerBootstrap b = new ServerBootstrap();
        ((ServerBootstrap)((ServerBootstrap)b.group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class)).childHandler((ChannelHandler)new SslSocketChannelInitializer(this.ioExecutors, new SslHandlerFactory(this.configuration))).option(ChannelOption.SO_BACKLOG, (Object)128)).childOption(ChannelOption.SO_KEEPALIVE, (Object)true);
        ChannelFuture future = b.bind(hostname, port).sync();
        LOGGER.info("Listening AMQP/" + this.configuration.getSsl().getProtocol() + " on " + hostname + ":" + port);
        return future;
    }

    public void start() throws InterruptedException, CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
        this.serverHelper.start();
    }

    public void awaitServerClose() throws InterruptedException {
        if (this.plainServerChannel != null) {
            this.plainServerChannel.closeFuture().sync();
        }
        if (this.sslServerChannel != null) {
            this.sslServerChannel.closeFuture().sync();
        }
    }

    public void stop() throws InterruptedException {
        try {
            this.closeChannels();
        }
        finally {
            this.shutdownExecutors();
        }
    }

    public void shutdown() throws InterruptedException {
        this.serverHelper.shutdown();
    }

    private void closeChannels() throws InterruptedException {
        if (this.plainServerChannel != null) {
            this.plainServerChannel.close();
        }
        if (this.sslServerChannel != null) {
            this.sslServerChannel.close();
        }
    }

    private class HaEnabledServerHelper
    extends ServerHelper
    implements HaListener {
        private BasicHaListener basicHaListener;

        HaEnabledServerHelper() {
            this.basicHaListener = new BasicHaListener((HaListener)this);
            Server.this.haStrategy.registerListener((HaListener)this.basicHaListener, 2);
        }

        @Override
        public synchronized void start() throws InterruptedException, CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
            this.basicHaListener.setStartCalled();
            if (!this.basicHaListener.isActive()) {
                return;
            }
            super.start();
        }

        @Override
        public void shutdown() throws InterruptedException {
            Server.this.haStrategy.unregisterListener((HaListener)this.basicHaListener);
            super.shutdown();
        }

        public void activate() {
            this.startOnBecomingActive();
            LOGGER.info("AMQP Transport mode changed from PASSIVE to ACTIVE");
        }

        public void deactivate() {
            LOGGER.info("AMQP Transport mode changed from ACTIVE to PASSIVE");
            try {
                Server.this.closeChannels();
            }
            catch (InterruptedException e) {
                LOGGER.error("Error while stopping the AMQP transport ", (Throwable)e);
            }
        }

        private synchronized void startOnBecomingActive() {
            if (this.basicHaListener.isStartCalled()) {
                try {
                    this.start();
                }
                catch (IOException | InterruptedException | KeyManagementException | KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException | CertificateException e) {
                    LOGGER.error("Error while starting the AMQP Transport ", (Throwable)e);
                }
            }
        }
    }

    private class ServerHelper {
        private ServerHelper() {
        }

        public void start() throws InterruptedException, CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
            ChannelFuture channelFuture = Server.this.bindToPlainSocket();
            Server.this.plainServerChannel = channelFuture.channel();
            if (Server.this.configuration.getSsl().isEnabled()) {
                ChannelFuture sslSocketFuture = Server.this.bindToSslSocket();
                Server.this.sslServerChannel = sslSocketFuture.channel();
            }
        }

        public void shutdown() throws InterruptedException {
            Server.this.stop();
        }
    }

    private class SslSocketChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private final EventExecutorGroup ioExecutors;
        private final SslHandlerFactory sslHandlerFactory;

        public SslSocketChannelInitializer(EventExecutorGroup ioExecutors, SslHandlerFactory sslHandlerFactory) {
            this.ioExecutors = ioExecutors;
            this.sslHandlerFactory = sslHandlerFactory;
        }

        protected void initChannel(SocketChannel socketChannel) {
            socketChannel.pipeline().addLast(new ChannelHandler[]{this.sslHandlerFactory.create()}).addLast(new ChannelHandler[]{new AmqpDecoder()}).addLast(new ChannelHandler[]{new AmqpEncoder()}).addLast(new ChannelHandler[]{new AmqpConnectionHandler(Server.this.configuration, Server.this.broker, Server.this.metricManager)}).addLast(this.ioExecutors, new ChannelHandler[]{new AmqpMessageWriter()}).addLast(this.ioExecutors, new ChannelHandler[]{new BlockingTaskHandler()});
        }
    }

    private class SocketChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private final EventExecutorGroup ioExecutors;

        public SocketChannelInitializer(EventExecutorGroup ioExecutors) {
            this.ioExecutors = ioExecutors;
        }

        protected void initChannel(SocketChannel socketChannel) {
            socketChannel.pipeline().addLast(new ChannelHandler[]{new AmqpDecoder()}).addLast(new ChannelHandler[]{new AmqpEncoder()}).addLast(new ChannelHandler[]{new AmqpConnectionHandler(Server.this.configuration, Server.this.broker, Server.this.metricManager)}).addLast(this.ioExecutors, new ChannelHandler[]{new AmqpMessageWriter()}).addLast(this.ioExecutors, new ChannelHandler[]{new BlockingTaskHandler()});
        }
    }
}

