package org.wso2.carbon.stream.processor.core.ha.tcp;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import org.wso2.carbon.stream.processor.core.event.queue.EventListMapManager;
import org.wso2.carbon.stream.processor.core.ha.transport.handlers.MessageDecoder;
import org.wso2.carbon.stream.processor.core.ha.util.HAConstants;
import org.wso2.carbon.stream.processor.core.internal.beans.DeploymentConfig;
import org.wso2.carbon.stream.processor.core.internal.beans.EventSyncServerConfig;
import org.wso2.carbon.stream.processor.core.util.BinaryMessageConverterUtil;

/* loaded from: input_file:org/wso2/carbon/stream/processor/core/ha/tcp/EventSyncServer.class */
public class EventSyncServer {
    private ServerBootstrap bootstrap;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private ChannelFuture channelFuture;
    private String hostAndPort;
    private EventSyncServerConfig serverConfig;
    private static final Logger log = Logger.getLogger(EventSyncServer.class);
    private BlockingQueue<ByteBuffer> eventByteBufferQueue;
    private EventListMapManager eventListMapManager = new EventListMapManager();
    private ExecutorService eventBufferExtractorExecutorService = Executors.newFixedThreadPool(5);
    private EventBufferExtractor eventBufferExtractor = new EventBufferExtractor();

    /* loaded from: input_file:org/wso2/carbon/stream/processor/core/ha/tcp/EventSyncServer$EventBufferExtractor.class */
    private class EventBufferExtractor implements Runnable {
        volatile boolean run;

        private EventBufferExtractor() {
            this.run = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.run) {
                try {
                    ByteBuffer byteBuffer = (ByteBuffer) EventSyncServer.this.eventByteBufferQueue.take();
                    try {
                        String string = BinaryMessageConverterUtil.getString(byteBuffer, byteBuffer.getInt());
                        byte[] bArr = new byte[byteBuffer.getInt()];
                        byteBuffer.get(bArr);
                        if (string.equals(HAConstants.CHANNEL_ID_CONTROL_MESSAGE)) {
                            if (EventSyncServer.log.isDebugEnabled()) {
                                EventSyncServer.log.debug("Received a control message");
                            }
                            EventSyncServer.this.eventListMapManager.parseControlMessage(bArr);
                        } else if (string.equals(HAConstants.CHANNEL_ID_MESSAGE)) {
                            if (EventSyncServer.log.isDebugEnabled()) {
                                EventSyncServer.log.debug("Received a event message");
                            }
                            EventSyncServer.this.eventListMapManager.parseMessage(bArr);
                        }
                    } catch (UnsupportedEncodingException e) {
                        EventSyncServer.log.warn("Error when converting bytes " + e.getMessage(), e);
                    } catch (Throwable th) {
                        EventSyncServer.log.error("Error occurred while processing eventByteBufferQueue " + th.getMessage(), th);
                    }
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    if (this.run) {
                        return;
                    }
                    EventSyncServer.log.error("EventSyncServer EventBufferExtractor Job is interrupted");
                    return;
                }
            }
        }
    }

    public void start(DeploymentConfig deploymentConfig) {
        this.eventByteBufferQueue = new LinkedBlockingQueue(deploymentConfig.getEventByteBufferQueueCapacity());
        this.serverConfig = deploymentConfig.eventSyncServerConfigs();
        this.bossGroup = new NioEventLoopGroup(this.serverConfig.getBossThreads());
        this.workerGroup = new NioEventLoopGroup(this.serverConfig.getWorkerThreads());
        this.hostAndPort = this.serverConfig.getHost() + ":" + this.serverConfig.getPort();
        this.bootstrap = new ServerBootstrap();
        this.bootstrap.group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() { // from class: org.wso2.carbon.stream.processor.core.ha.tcp.EventSyncServer.1
            protected void initChannel(Channel channel) throws Exception {
                channel.pipeline().addLast(new ChannelHandler[]{new MessageDecoder(EventSyncServer.this.eventByteBufferQueue)});
            }
        }).option(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true);
        try {
            this.channelFuture = this.bootstrap.bind(this.serverConfig.getHost(), this.serverConfig.getPort()).sync();
            for (int i = 0; i < 5; i++) {
                this.eventBufferExtractorExecutorService.submit(this.eventBufferExtractor);
            }
            log.info("EventSyncServer started in " + this.hostAndPort + "");
        } catch (InterruptedException e) {
            log.error("Error when booting up EventSyncServer on '" + this.hostAndPort + "' " + e.getMessage(), e);
        }
    }

    public void shutdownGracefully() {
        this.channelFuture.channel().close();
        try {
            this.channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("Error when shutdowning the EventSyncServer " + e.getMessage(), e);
        }
        this.workerGroup.shutdownGracefully();
        this.bossGroup.shutdownGracefully();
        log.info("EventSyncServer running on '" + this.hostAndPort + "' stopped.");
        this.workerGroup = null;
        this.bossGroup = null;
    }

    public void clearResources() {
        this.eventBufferExtractor.run = false;
        this.eventBufferExtractorExecutorService.shutdownNow();
        this.eventByteBufferQueue.clear();
    }

    public BlockingQueue<ByteBuffer> getEventByteBufferQueue() {
        return this.eventByteBufferQueue;
    }
}
