package io.ballerina.messaging.broker.amqp;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.ballerina.messaging.broker.amqp.codec.AmqpChannelFactory;
import io.ballerina.messaging.broker.amqp.codec.auth.AuthenticationStrategyFactory;
import io.ballerina.messaging.broker.amqp.codec.frames.AmqMethodRegistryFactory;
import io.ballerina.messaging.broker.amqp.codec.handlers.AmqpConnectionHandler;
import io.ballerina.messaging.broker.amqp.codec.handlers.AmqpDecoder;
import io.ballerina.messaging.broker.amqp.codec.handlers.AmqpEncoder;
import io.ballerina.messaging.broker.amqp.codec.handlers.AmqpMessageWriter;
import io.ballerina.messaging.broker.amqp.codec.handlers.BlockingTaskHandler;
import io.ballerina.messaging.broker.amqp.metrics.AmqpMetricManager;
import io.ballerina.messaging.broker.amqp.metrics.DefaultAmqpMetricManager;
import io.ballerina.messaging.broker.amqp.metrics.NullAmqpMetricManager;
import io.ballerina.messaging.broker.amqp.rest.api.ConnectionsApi;
import io.ballerina.messaging.broker.auth.AuthManager;
import io.ballerina.messaging.broker.auth.authorization.AuthorizationHandler;
import io.ballerina.messaging.broker.auth.authorization.authorizer.empty.NoOpAuthorizer;
import io.ballerina.messaging.broker.common.StartupContext;
import io.ballerina.messaging.broker.common.config.BrokerConfigProvider;
import io.ballerina.messaging.broker.coordination.BasicHaListener;
import io.ballerina.messaging.broker.coordination.HaListener;
import io.ballerina.messaging.broker.coordination.HaStrategy;
import io.ballerina.messaging.broker.core.Broker;
import io.ballerina.messaging.broker.core.DefaultBrokerFactory;
import io.ballerina.messaging.broker.core.SecureBrokerFactory;
import io.ballerina.messaging.broker.rest.BrokerServiceRunner;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.carbon.metrics.core.MetricService;

/* loaded from: input_file:io/ballerina/messaging/broker/amqp/Server.class */
public class Server {
    private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);
    private static final int BLOCKING_TASK_EXECUTOR_THREADS = 32;
    private final AmqpServerConfiguration configuration;
    private final AmqpMetricManager metricManager;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private EventExecutorGroup ioExecutors;
    private Channel plainServerChannel;
    private Channel sslServerChannel;
    private AmqpConnectionManager connectionManager;
    private HaStrategy haStrategy;
    private ServerHelper serverHelper;
    private AmqMethodRegistryFactory amqMethodRegistryFactory;
    private AmqpChannelFactory amqpChannelFactory;

    /* loaded from: input_file:io/ballerina/messaging/broker/amqp/Server$HaEnabledServerHelper.class */
    private class HaEnabledServerHelper extends ServerHelper implements HaListener {
        private BasicHaListener basicHaListener;

        HaEnabledServerHelper(AmqpServerConfiguration amqpServerConfiguration) {
            super(amqpServerConfiguration);
            this.basicHaListener = new BasicHaListener(this);
            Server.this.haStrategy.registerListener(this.basicHaListener, 2);
        }

        @Override // io.ballerina.messaging.broker.amqp.Server.ServerHelper
        public synchronized void start() throws InterruptedException, CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
            this.basicHaListener.setStartCalled();
            if (this.basicHaListener.isActive()) {
                super.start();
            }
        }

        @Override // io.ballerina.messaging.broker.amqp.Server.ServerHelper
        public void shutdown() {
            Server.this.haStrategy.unregisterListener(this.basicHaListener);
            super.shutdown();
        }

        public void activate() {
            startOnBecomingActive();
            Server.LOGGER.info("AMQP Transport mode changed from PASSIVE to ACTIVE");
        }

        public void deactivate() {
            Server.LOGGER.info("AMQP Transport mode changed from ACTIVE to PASSIVE");
            Server.this.closeChannels();
        }

        private synchronized void startOnBecomingActive() {
            if (this.basicHaListener.isStartCalled()) {
                try {
                    start();
                } catch (IOException | InterruptedException | KeyManagementException | KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException | CertificateException e) {
                    Server.LOGGER.error("Error while starting the AMQP Transport ", e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/ballerina/messaging/broker/amqp/Server$ServerHelper.class */
    public class ServerHelper {
        private final int socketBufferSize;

        private ServerHelper(AmqpServerConfiguration amqpServerConfiguration) {
            this.socketBufferSize = amqpServerConfiguration.getSocketBufferSize();
        }

        public void start() throws InterruptedException, CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
            ChannelFuture bindToPlainSocket = bindToPlainSocket();
            Server.this.plainServerChannel = bindToPlainSocket.channel();
            if (Server.this.configuration.getSsl().isEnabled()) {
                ChannelFuture bindToSslSocket = bindToSslSocket();
                Server.this.sslServerChannel = bindToSslSocket.channel();
            }
        }

        private ChannelFuture bindToPlainSocket() throws InterruptedException {
            String hostName = Server.this.configuration.getHostName();
            int parseInt = Integer.parseInt(Server.this.configuration.getPlain().getPort());
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(Server.this.bossGroup, Server.this.workerGroup).channel(NioServerSocketChannel.class).childHandler(new SocketChannelInitializer(Server.this.ioExecutors)).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_RCVBUF, Integer.valueOf(this.socketBufferSize)).childOption(ChannelOption.SO_SNDBUF, Integer.valueOf(this.socketBufferSize)).childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture sync = serverBootstrap.bind(hostName, parseInt).sync();
            Server.LOGGER.info("Listening AMQP on {}:{}", hostName, Integer.valueOf(parseInt));
            return sync;
        }

        private ChannelFuture bindToSslSocket() throws InterruptedException, CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
            String hostName = Server.this.configuration.getHostName();
            int parseInt = Integer.parseInt(Server.this.configuration.getSsl().getPort());
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(Server.this.bossGroup, Server.this.workerGroup).channel(NioServerSocketChannel.class).childHandler(new SslSocketChannelInitializer(Server.this.ioExecutors, new SslHandlerFactory(Server.this.configuration))).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_RCVBUF, Integer.valueOf(this.socketBufferSize)).childOption(ChannelOption.SO_SNDBUF, Integer.valueOf(this.socketBufferSize)).childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture sync = serverBootstrap.bind(hostName, parseInt).sync();
            Server.LOGGER.info("Listening AMQP/{} on {}:{}", new Object[]{Server.this.configuration.getSsl().getProtocol(), hostName, Integer.valueOf(parseInt)});
            return sync;
        }

        public void shutdown() {
            Server.this.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/ballerina/messaging/broker/amqp/Server$SocketChannelInitializer.class */
    public class SocketChannelInitializer extends ChannelInitializer<SocketChannel> {
        private final EventExecutorGroup ioExecutors;

        SocketChannelInitializer(EventExecutorGroup eventExecutorGroup) {
            this.ioExecutors = eventExecutorGroup;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void initChannel(SocketChannel socketChannel) {
            socketChannel.pipeline().addLast(new ChannelHandler[]{new AmqpDecoder(Server.this.amqMethodRegistryFactory.newInstance())}).addLast(new ChannelHandler[]{new AmqpEncoder()}).addLast(new ChannelHandler[]{new AmqpConnectionHandler(Server.this.metricManager, Server.this.amqpChannelFactory, Server.this.connectionManager)}).addLast(this.ioExecutors, new ChannelHandler[]{new AmqpMessageWriter()}).addLast(this.ioExecutors, new ChannelHandler[]{new BlockingTaskHandler()});
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/ballerina/messaging/broker/amqp/Server$SslSocketChannelInitializer.class */
    public class SslSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
        private final EventExecutorGroup ioExecutors;
        private final SslHandlerFactory sslHandlerFactory;

        SslSocketChannelInitializer(EventExecutorGroup eventExecutorGroup, SslHandlerFactory sslHandlerFactory) {
            this.ioExecutors = eventExecutorGroup;
            this.sslHandlerFactory = sslHandlerFactory;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void initChannel(SocketChannel socketChannel) {
            socketChannel.pipeline().addLast(new ChannelHandler[]{this.sslHandlerFactory.create()}).addLast(new ChannelHandler[]{new AmqpDecoder(Server.this.amqMethodRegistryFactory.newInstance())}).addLast(new ChannelHandler[]{new AmqpEncoder()}).addLast(new ChannelHandler[]{new AmqpConnectionHandler(Server.this.metricManager, Server.this.amqpChannelFactory, Server.this.connectionManager)}).addLast(this.ioExecutors, new ChannelHandler[]{new AmqpMessageWriter()}).addLast(this.ioExecutors, new ChannelHandler[]{new BlockingTaskHandler()});
        }
    }

    public Server(StartupContext startupContext) throws Exception {
        MetricService metricService = (MetricService) startupContext.getService(MetricService.class);
        if (Objects.nonNull(metricService)) {
            this.metricManager = new DefaultAmqpMetricManager(metricService);
        } else {
            this.metricManager = new NullAmqpMetricManager();
        }
        this.configuration = (AmqpServerConfiguration) ((BrokerConfigProvider) startupContext.getService(BrokerConfigProvider.class)).getConfigurationObject(AmqpServerConfiguration.NAMESPACE, AmqpServerConfiguration.class);
        if (((Broker) startupContext.getService(Broker.class)) == null) {
            throw new RuntimeException("Could not find the broker class to initialize AMQP server");
        }
        AuthManager authManager = (AuthManager) startupContext.getService(AuthManager.class);
        SecureBrokerFactory secureBrokerFactory = (null != authManager && authManager.isAuthenticationEnabled() && authManager.isAuthorizationEnabled()) ? new SecureBrokerFactory(startupContext) : new DefaultBrokerFactory(startupContext);
        this.bossGroup = new NioEventLoopGroup();
        this.workerGroup = new NioEventLoopGroup();
        this.ioExecutors = new DefaultEventExecutorGroup(BLOCKING_TASK_EXECUTOR_THREADS, new ThreadFactoryBuilder().setNameFormat("NettyBlockingTaskThread-%d").build());
        this.haStrategy = (HaStrategy) startupContext.getService(HaStrategy.class);
        if (this.haStrategy == null) {
            this.serverHelper = new ServerHelper(this.configuration);
        } else {
            LOGGER.info("AMQP Transport is in PASSIVE mode");
            this.serverHelper = new HaEnabledServerHelper(this.configuration);
        }
        this.amqMethodRegistryFactory = new AmqMethodRegistryFactory(AuthenticationStrategyFactory.getStrategy((AuthManager) startupContext.getService(AuthManager.class), secureBrokerFactory, this.configuration));
        this.amqpChannelFactory = new AmqpChannelFactory(this.configuration, this.metricManager);
        initConnectionsRestApi(startupContext);
    }

    private void shutdownExecutors() {
        LOGGER.info("Shutting down Netty Executors for AMQP transport");
        this.workerGroup.shutdownGracefully();
        this.bossGroup.shutdownGracefully();
        this.ioExecutors.shutdownGracefully();
    }

    public void start() throws InterruptedException, CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
        this.serverHelper.start();
    }

    public void awaitServerClose() throws InterruptedException {
        if (this.plainServerChannel != null) {
            this.plainServerChannel.closeFuture().sync();
        }
        if (this.sslServerChannel != null) {
            this.sslServerChannel.closeFuture().sync();
        }
    }

    public void stop() {
        try {
            closeChannels();
        } finally {
            shutdownExecutors();
        }
    }

    public void shutdown() {
        this.serverHelper.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeChannels() {
        if (this.plainServerChannel != null) {
            this.plainServerChannel.close();
        }
        if (this.sslServerChannel != null) {
            this.sslServerChannel.close();
        }
    }

    private void initConnectionsRestApi(StartupContext startupContext) {
        BrokerServiceRunner brokerServiceRunner = (BrokerServiceRunner) startupContext.getService(BrokerServiceRunner.class);
        this.connectionManager = new AmqpConnectionManager();
        if (Objects.nonNull(brokerServiceRunner)) {
            AuthManager authManager = (AuthManager) startupContext.getService(AuthManager.class);
            brokerServiceRunner.deploy(new Object[]{new ConnectionsApi(this.connectionManager, new AuthorizationHandler((null != authManager && authManager.isAuthenticationEnabled() && authManager.isAuthorizationEnabled()) ? authManager.getAuthorizer() : new NoOpAuthorizer()))});
        }
    }
}
