/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.grpc.source;

import com.google.protobuf.Empty;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerInterceptors;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.stub.StreamObserver;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.extension.io.grpc.source.AbstractGrpcSource;
import io.siddhi.extension.io.grpc.source.GrpcServiceSource;
import io.siddhi.extension.io.grpc.source.GrpcSource;
import io.siddhi.extension.io.grpc.source.GrpcWorkerThread;
import io.siddhi.extension.io.grpc.source.ServiceServer;
import io.siddhi.extension.io.grpc.util.GrpcServerConfigs;
import io.siddhi.extension.io.grpc.util.SourceServerInterceptor;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.BindException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.wso2.grpc.Event;
import org.wso2.grpc.EventServiceGrpc;

public class GrpcEventServiceServer
extends ServiceServer {
    private static final Logger logger = LogManager.getLogger(GrpcEventServiceServer.class);
    public static ThreadLocal<Map<String, String>> metaDataMap = new ThreadLocal();
    protected Server server;
    private NettyServerBuilder serverBuilder;
    private GrpcServerConfigs grpcServerConfigs;
    private SourceServerInterceptor serverInterceptor;
    private Map<String, GrpcSource> subscribersForConsume = new HashMap<String, GrpcSource>();
    private Map<String, GrpcServiceSource> subscribersForProcess = new HashMap<String, GrpcServiceSource>();
    private int state = 0;
    private ExecutorService executorService;

    public GrpcEventServiceServer(GrpcServerConfigs grpcServerConfigs, SiddhiAppContext siddhiAppContext, String streamID) {
        this.serverInterceptor = new SourceServerInterceptor(grpcServerConfigs.getServiceConfigs().isDefaultService());
        this.grpcServerConfigs = grpcServerConfigs;
        this.lock = new ReentrantLock();
        this.condition = this.lock.newCondition();
        this.executorService = new ThreadPoolExecutor(grpcServerConfigs.getThreadPoolSize(), grpcServerConfigs.getThreadPoolSize(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(grpcServerConfigs.getThreadPoolBufferSize()));
        this.setServerPropertiesToBuilder(siddhiAppContext.getName(), streamID);
        this.addServicesAndBuildServer(siddhiAppContext.getName(), streamID);
    }

    @Override
    protected void setServerPropertiesToBuilder(String siddhiAppName, String streamID) {
        this.serverBuilder = NettyServerBuilder.forPort(this.grpcServerConfigs.getServiceConfigs().getPort());
        if (this.grpcServerConfigs.getServiceConfigs().getKeystoreFilePath() != null) {
            try {
                SslContextBuilder sslContextBuilder = this.getSslContextBuilder(this.grpcServerConfigs.getServiceConfigs().getKeystoreFilePath(), this.grpcServerConfigs.getServiceConfigs().getKeystorePassword(), this.grpcServerConfigs.getServiceConfigs().getKeystoreAlgorithm(), this.grpcServerConfigs.getServiceConfigs().getTlsStoreType(), siddhiAppName, streamID);
                if (this.grpcServerConfigs.getServiceConfigs().getTruststoreFilePath() != null) {
                    sslContextBuilder = this.addTrustStore(this.grpcServerConfigs.getServiceConfigs().getTruststoreFilePath(), this.grpcServerConfigs.getServiceConfigs().getTruststorePassword(), this.grpcServerConfigs.getServiceConfigs().getTruststoreAlgorithm(), sslContextBuilder, this.grpcServerConfigs.getServiceConfigs().getTlsStoreType(), siddhiAppName, streamID).clientAuth(ClientAuth.REQUIRE);
                }
                this.serverBuilder.sslContext(sslContextBuilder.build());
            }
            catch (IOException | KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException | CertificateException e) {
                throw new SiddhiAppCreationException(siddhiAppName + ": " + streamID + ": Error while creating SslContext. " + e.getMessage(), (Throwable)e);
            }
        }
        if (this.grpcServerConfigs.getMaxInboundMessageSize() != -1) {
            this.serverBuilder.maxInboundMessageSize(this.grpcServerConfigs.getMaxInboundMessageSize());
        }
        if (this.grpcServerConfigs.getMaxInboundMetadataSize() != -1) {
            this.serverBuilder.maxInboundMetadataSize(this.grpcServerConfigs.getMaxInboundMetadataSize());
        }
    }

    @Override
    protected void addServicesAndBuildServer(final String siddhiAppName, final String streamID) {
        this.server = ((NettyServerBuilder)this.serverBuilder.addService(ServerInterceptors.intercept((BindableService)new EventServiceGrpc.EventServiceImplBase(){

            @Override
            public StreamObserver<Event> consume(final StreamObserver<Empty> responseObserver) {
                GrpcEventServiceServer.this.handlePause(logger);
                return new StreamObserver<Event>(){

                    @Override
                    public void onNext(Event request) {
                        if (request.getPayload() == null) {
                            logger.error(siddhiAppName + ":" + streamID + ": Dropping request due to missing payload ");
                            responseObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                        } else if (!request.getHeadersMap().containsKey("stream.id")) {
                            logger.error(siddhiAppName + ":" + streamID + ": Dropping request due to missing stream.id ");
                            responseObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                        } else if (!GrpcEventServiceServer.this.subscribersForConsume.containsKey(request.getHeadersMap().get("stream.id"))) {
                            logger.error(siddhiAppName + ":" + streamID + ": Dropping request because requested stream with stream.id " + request.getHeadersMap().get("streamID") + " not subcribed to the gRPC server on port " + GrpcEventServiceServer.this.grpcServerConfigs.getServiceConfigs().getPort());
                            responseObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                        } else {
                            try {
                                GrpcSource relevantSource = (GrpcSource)((Object)GrpcEventServiceServer.this.subscribersForConsume.get(request.getHeadersMap().get("stream.id")));
                                GrpcEventServiceServer.this.executorService.execute(new GrpcWorkerThread((AbstractGrpcSource)relevantSource, request.getPayload(), request.getHeadersMap(), metaDataMap.get()));
                                responseObserver.onNext(Empty.getDefaultInstance());
                                responseObserver.onCompleted();
                            }
                            catch (SiddhiAppRuntimeException e) {
                                logger.error(siddhiAppName + ":" + streamID + ": Dropping request. " + e.getMessage());
                                responseObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                            }
                            finally {
                                metaDataMap.remove();
                            }
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                    }

                    @Override
                    public void onCompleted() {
                    }
                };
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void process(Event request, StreamObserver<Event> responseObserver) {
                GrpcEventServiceServer.this.handlePause(logger);
                if (request.getPayload() == null) {
                    logger.error(siddhiAppName + ":" + streamID + ": Dropping request due to missing payload ");
                    responseObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                } else if (!request.getHeadersMap().containsKey("stream.id")) {
                    logger.error(siddhiAppName + ":" + streamID + ": Dropping request due to missing stream.id ");
                    responseObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                } else if (!GrpcEventServiceServer.this.subscribersForProcess.containsKey(request.getHeadersMap().get("stream.id"))) {
                    logger.error(siddhiAppName + ":" + streamID + ": Dropping request because requested stream with stream.id " + request.getHeadersMap().get("stream.id") + " not subcribed to the gRPC server on port " + GrpcEventServiceServer.this.grpcServerConfigs.getServiceConfigs().getPort());
                    responseObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                } else {
                    String messageId = UUID.randomUUID().toString();
                    HashMap<String, String> transportPropertyMap = new HashMap<String, String>();
                    transportPropertyMap.put("message.id", messageId);
                    transportPropertyMap.putAll(request.getHeadersMap());
                    try {
                        GrpcServiceSource relevantSource = (GrpcServiceSource)((Object)GrpcEventServiceServer.this.subscribersForProcess.get(request.getHeadersMap().get("stream.id")));
                        GrpcEventServiceServer.this.executorService.execute(new GrpcWorkerThread((AbstractGrpcSource)relevantSource, request.getPayload(), transportPropertyMap, metaDataMap.get()));
                        relevantSource.putStreamObserver(messageId, responseObserver);
                        relevantSource.scheduleServiceTimeout(messageId);
                    }
                    catch (SiddhiAppRuntimeException e) {
                        logger.error(siddhiAppName + ":" + streamID + ": Dropping request. " + e.getMessage(), (Throwable)e);
                        responseObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                    }
                    finally {
                        metaDataMap.remove();
                    }
                }
            }
        }, this.serverInterceptor))).build();
    }

    @Override
    protected void connectServer(Logger logger, Source.ConnectionCallback connectionCallback, String siddhiAppName, String streamID) {
        try {
            this.server.start();
            this.state = 1;
            if (logger.isDebugEnabled()) {
                logger.debug(siddhiAppName + ":" + streamID + ": gRPC Server started");
            }
        }
        catch (IOException e) {
            if (e.getCause() instanceof BindException) {
                throw new SiddhiAppValidationException(siddhiAppName + ":" + streamID + ": Another server is already running on the port " + this.grpcServerConfigs.getServiceConfigs().getPort() + ". Please provide a different port");
            }
            connectionCallback.onError(new ConnectionUnavailableException(siddhiAppName + ":" + streamID + ": Error when starting the server. " + e.getMessage(), (Throwable)e));
            throw new SiddhiAppRuntimeException(siddhiAppName + ": " + streamID + ": ", (Throwable)e);
        }
    }

    @Override
    public void disconnectServer(Logger logger, String siddhiAppName, String streamID) {
        try {
            if (this.server == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug(siddhiAppName + ":" + streamID + ": Illegal state. Server already stopped.");
                }
                return;
            }
            this.server.shutdown();
            if (this.getGrpcServerConfigs().getServerShutdownWaitingTimeInMillis() > 0L) {
                if (this.server.awaitTermination(this.getGrpcServerConfigs().getServerShutdownWaitingTimeInMillis(), TimeUnit.MILLISECONDS)) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(siddhiAppName + ": " + streamID + ": Server stopped");
                    }
                    return;
                }
                this.server.shutdownNow();
                if (this.server.awaitTermination(this.getGrpcServerConfigs().getServerShutdownWaitingTimeInMillis(), TimeUnit.MILLISECONDS)) {
                    return;
                }
                throw new SiddhiAppRuntimeException(siddhiAppName + ":" + streamID + ": Unable to shutdown server");
            }
            this.state = 2;
        }
        catch (InterruptedException e) {
            throw new SiddhiAppRuntimeException(siddhiAppName + ": " + streamID + ": " + e.getMessage(), (Throwable)e);
        }
    }

    @Override
    protected SslContextBuilder getSslContextBuilder(String filePath, String password, String algorithm, String storeType, String siddhiAppName, String streamID) throws KeyStoreException, NoSuchAlgorithmException, CertificateException, UnrecoverableKeyException {
        char[] passphrase = password.toCharArray();
        KeyStore keyStore = KeyStore.getInstance(storeType);
        try (FileInputStream fis = new FileInputStream(filePath);){
            keyStore.load(fis, passphrase);
        }
        catch (IOException e) {
            throw new SiddhiAppCreationException(siddhiAppName + ": " + streamID + ": " + e.getMessage(), (Throwable)e);
        }
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(algorithm);
        kmf.init(keyStore, passphrase);
        SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(kmf);
        sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder);
        return sslContextBuilder;
    }

    @Override
    protected SslContextBuilder addTrustStore(String filePath, String password, String algorithm, SslContextBuilder sslContextBuilder, String storeType, String siddhiAppName, String streamID) throws NoSuchAlgorithmException, KeyStoreException, CertificateException {
        char[] passphrase = password.toCharArray();
        KeyStore keyStore = KeyStore.getInstance(storeType);
        try (FileInputStream fis = new FileInputStream(filePath);){
            keyStore.load(fis, passphrase);
        }
        catch (IOException e) {
            throw new SiddhiAppCreationException(siddhiAppName + ": " + streamID + ": " + e.getMessage(), (Throwable)e);
        }
        TrustManagerFactory tmf = TrustManagerFactory.getInstance(algorithm);
        tmf.init(keyStore);
        return sslContextBuilder.trustManager(tmf).clientAuth(ClientAuth.REQUIRE);
    }

    public GrpcServerConfigs getGrpcServerConfigs() {
        return this.grpcServerConfigs;
    }

    public void subscribe(String streamID, AbstractGrpcSource source, String methodName, SiddhiAppContext siddhiAppContext) {
        if (methodName.equalsIgnoreCase("consume")) {
            if (source instanceof GrpcSource) {
                this.subscribersForConsume.putIfAbsent(streamID, (GrpcSource)source);
            }
        } else if (methodName.equalsIgnoreCase("process")) {
            if (source instanceof GrpcServiceSource) {
                this.subscribersForProcess.putIfAbsent(streamID, (GrpcServiceSource)source);
            }
        } else {
            throw new SiddhiAppValidationException(siddhiAppContext.getName() + ": " + streamID + ": method name should be either process or consume but given as " + methodName);
        }
    }

    public void unsubscribe(String streamID, String methodName, SiddhiAppContext siddhiAppContext) {
        if (methodName.equalsIgnoreCase("consume")) {
            this.subscribersForConsume.remove(streamID);
        } else if (methodName.equalsIgnoreCase("process")) {
            this.subscribersForProcess.remove(streamID);
        } else {
            throw new SiddhiAppValidationException(siddhiAppContext.getName() + ": " + streamID + ": method name should be either process or consume but given as " + methodName);
        }
    }

    public boolean isShutDown() {
        return this.server.isShutdown();
    }

    public int getState() {
        return this.state;
    }

    public int getNumSubscribers() {
        return this.subscribersForConsume.size() + this.subscribersForProcess.size();
    }
}

