/*
 * Decompiled with CFR 0.152.
 */
package com.eureka2.shading.reactivex.netty.server;

import com.eureka2.shading.reactivex.netty.channel.ConnectionHandler;
import com.eureka2.shading.reactivex.netty.channel.UnpooledConnectionFactory;
import com.eureka2.shading.reactivex.netty.metrics.MetricEventsListener;
import com.eureka2.shading.reactivex.netty.metrics.MetricEventsPublisher;
import com.eureka2.shading.reactivex.netty.metrics.MetricEventsSubject;
import com.eureka2.shading.reactivex.netty.pipeline.PipelineConfigurator;
import com.eureka2.shading.reactivex.netty.pipeline.PipelineConfiguratorComposite;
import com.eureka2.shading.reactivex.netty.server.ErrorHandler;
import com.eureka2.shading.reactivex.netty.server.ServerChannelMetricEventProvider;
import com.eureka2.shading.reactivex.netty.server.ServerMetricsEvent;
import com.eureka2.shading.reactivex.netty.server.ServerRequiredConfigurator;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.util.concurrent.EventExecutorGroup;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Subscription;

public class AbstractServer<I, O, B extends AbstractBootstrap<B, C>, C extends Channel, S extends AbstractServer>
implements MetricEventsPublisher<ServerMetricsEvent<?>> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
    protected final UnpooledConnectionFactory<I, O> connectionFactory;
    protected final B bootstrap;
    protected final int port;
    protected final AtomicReference<ServerState> serverStateRef;
    protected final MetricEventsSubject<ServerMetricsEvent<?>> eventsSubject;
    protected ErrorHandler errorHandler;
    private ChannelFuture bindFuture;

    public AbstractServer(B bootstrap, int port) {
        if (null == bootstrap) {
            throw new NullPointerException("Bootstrap can not be null.");
        }
        this.serverStateRef = new AtomicReference<ServerState>(ServerState.Created);
        this.bootstrap = bootstrap;
        this.port = port;
        this.eventsSubject = new MetricEventsSubject();
        this.connectionFactory = UnpooledConnectionFactory.from(this.eventsSubject, ServerChannelMetricEventProvider.INSTANCE);
    }

    public void startAndWait() {
        this.start();
        try {
            this.waitTillShutdown();
        }
        catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    public S start() {
        if (!this.serverStateRef.compareAndSet(ServerState.Created, ServerState.Starting)) {
            throw new IllegalStateException("Server already started");
        }
        try {
            this.bindFuture = this.bootstrap.bind(this.port).sync();
            if (!this.bindFuture.isSuccess()) {
                throw new RuntimeException(this.bindFuture.cause());
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        this.serverStateRef.set(ServerState.Started);
        logger.info("Rx server started at port: " + this.getServerPort());
        return this.returnServer();
    }

    public S withErrorHandler(ErrorHandler errorHandler) {
        if (this.serverStateRef.get() == ServerState.Started) {
            throw new IllegalStateException("Error handler can not be set after starting the server.");
        }
        this.errorHandler = errorHandler;
        return this.returnServer();
    }

    public void shutdown() throws InterruptedException {
        if (!this.serverStateRef.compareAndSet(ServerState.Started, ServerState.Shutdown)) {
            throw new IllegalStateException("The server is already shutdown.");
        }
        this.bindFuture.channel().close().sync();
    }

    public void waitTillShutdown() throws InterruptedException {
        ServerState serverState = this.serverStateRef.get();
        switch (serverState) {
            case Created: 
            case Starting: {
                throw new IllegalStateException("Server not started yet.");
            }
            case Started: {
                this.bindFuture.channel().closeFuture().await();
                break;
            }
        }
    }

    public void waitTillShutdown(long duration, TimeUnit timeUnit) throws InterruptedException {
        ServerState serverState = this.serverStateRef.get();
        switch (serverState) {
            case Created: 
            case Starting: {
                throw new IllegalStateException("Server not started yet.");
            }
            case Started: {
                this.bindFuture.channel().closeFuture().await(duration, timeUnit);
                break;
            }
        }
    }

    public int getServerPort() {
        SocketAddress localAddress;
        if (null != this.bindFuture && this.bindFuture.isDone() && (localAddress = this.bindFuture.channel().localAddress()) instanceof InetSocketAddress) {
            return ((InetSocketAddress)localAddress).getPort();
        }
        return this.port;
    }

    public MetricEventsSubject<ServerMetricsEvent<?>> getEventsSubject() {
        return this.eventsSubject;
    }

    @Override
    public Subscription subscribe(MetricEventsListener<? extends ServerMetricsEvent<?>> listener) {
        return this.eventsSubject.subscribe(listener);
    }

    protected S returnServer() {
        return (S)this;
    }

    protected ChannelInitializer<Channel> newChannelInitializer(final PipelineConfigurator<I, O> pipelineConfigurator, final ConnectionHandler<I, O> connectionHandler, final EventExecutorGroup connHandlingExecutor) {
        return new ChannelInitializer<Channel>(){

            protected void initChannel(Channel ch) throws Exception {
                ServerRequiredConfigurator requiredConfigurator = new ServerRequiredConfigurator(connectionHandler, AbstractServer.this.connectionFactory, AbstractServer.this.errorHandler, AbstractServer.this.eventsSubject, connHandlingExecutor);
                PipelineConfigurator configurator = null == pipelineConfigurator ? requiredConfigurator : new PipelineConfiguratorComposite(pipelineConfigurator, requiredConfigurator);
                configurator.configureNewPipeline(ch.pipeline());
            }
        };
    }

    protected static enum ServerState {
        Created,
        Starting,
        Started,
        Shutdown;

    }
}

