package org.wso2.siddhi.tcp.transport;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.log4j.Logger;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.siddhi.tcp.transport.callback.LogStreamListener;
import org.wso2.siddhi.tcp.transport.callback.StreamListener;
import org.wso2.siddhi.tcp.transport.config.ServerConfig;
import org.wso2.siddhi.tcp.transport.handlers.EventDecoder;
import org.wso2.siddhi.tcp.transport.utils.StreamTypeHolder;

/* loaded from: input_file:org/wso2/siddhi/tcp/transport/TCPNettyServer.class */
public class TCPNettyServer {
    private static final Logger log = Logger.getLogger(TCPNettyServer.class);
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private StreamTypeHolder streamInfoHolder = new StreamTypeHolder();
    private ChannelFuture channelFuture;
    private String hostAndPort;
    private FlowController flowController;

    public static void main(String[] strArr) {
        StreamDefinition attribute = StreamDefinition.id("StockStream").attribute("symbol", Attribute.Type.STRING).attribute("price", Attribute.Type.INT).attribute("volume", Attribute.Type.INT);
        TCPNettyServer tCPNettyServer = new TCPNettyServer();
        tCPNettyServer.addStreamListener(new LogStreamListener(attribute));
        tCPNettyServer.bootServer(new ServerConfig());
        try {
            Thread.sleep(10000L);
            tCPNettyServer.shutdownGracefully();
        } catch (InterruptedException e) {
            tCPNettyServer.shutdownGracefully();
        } catch (Throwable th) {
            tCPNettyServer.shutdownGracefully();
            throw th;
        }
    }

    public void bootServer(ServerConfig serverConfig) {
        this.bossGroup = new NioEventLoopGroup(serverConfig.getReceiverThreads());
        this.workerGroup = new NioEventLoopGroup(serverConfig.getWorkerThreads());
        this.hostAndPort = serverConfig.getHost() + ":" + serverConfig.getPort();
        try {
            this.flowController = new FlowController(serverConfig.getQueueSizeOfTcpTransport());
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.AUTO_READ, false).childHandler(new ChannelInitializer() { // from class: org.wso2.siddhi.tcp.transport.TCPNettyServer.1
                protected void initChannel(Channel channel) throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();
                    pipeline.addLast(new ChannelHandler[]{TCPNettyServer.this.flowController});
                    pipeline.addLast(new ChannelHandler[]{new EventDecoder(TCPNettyServer.this.streamInfoHolder)});
                }
            });
            this.channelFuture = serverBootstrap.bind(serverConfig.getHost(), serverConfig.getPort()).sync();
            log.info("Tcp Server started in " + this.hostAndPort + "");
        } catch (InterruptedException e) {
            log.error("Error when booting up tcp server on '" + this.hostAndPort + "' " + e.getMessage(), e);
        }
    }

    public void shutdownGracefully() {
        this.channelFuture.channel().close();
        try {
            this.channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("Error when shutdowning the tcp server " + e.getMessage(), e);
        }
        this.workerGroup.shutdownGracefully();
        this.bossGroup.shutdownGracefully();
        log.info("Tcp Server running on '" + this.hostAndPort + "' stopped.");
        this.workerGroup = null;
        this.bossGroup = null;
    }

    public synchronized void addStreamListener(StreamListener streamListener) {
        this.streamInfoHolder.putStreamCallback(streamListener);
    }

    public synchronized void removeStreamListener(String str) {
        this.streamInfoHolder.removeStreamCallback(str);
    }

    public synchronized int getNoOfRegisteredStreamListeners() {
        return this.streamInfoHolder.getNoOfRegisteredStreamListeners();
    }

    public void isPaused(boolean z) {
        this.flowController.isPaused(z);
    }
}
