package io.siddhi.extension.io.grpc.source;

import com.google.protobuf.Empty;
import io.grpc.Server;
import io.grpc.ServerInterceptors;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.stub.StreamObserver;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.extension.io.grpc.util.GrpcConstants;
import io.siddhi.extension.io.grpc.util.GrpcServerConfigs;
import io.siddhi.extension.io.grpc.util.SourceServerInterceptor;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.BindException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import org.apache.log4j.Logger;
import org.wso2.grpc.Event;
import org.wso2.grpc.EventServiceGrpc;

/* loaded from: input_file:plugins/siddhi-io-grpc-1.0.8.jar:io/siddhi/extension/io/grpc/source/GrpcEventServiceServer.class */
public class GrpcEventServiceServer extends ServiceServer {
    private static final Logger logger = Logger.getLogger(GrpcEventServiceServer.class.getName());
    public static ThreadLocal<Map<String, String>> metaDataMap = new ThreadLocal<>();
    protected Server server;
    private NettyServerBuilder serverBuilder;
    private GrpcServerConfigs grpcServerConfigs;
    private SourceServerInterceptor serverInterceptor;
    private Map<String, GrpcSource> subscribersForConsume = new HashMap();
    private Map<String, GrpcServiceSource> subscribersForProcess = new HashMap();
    private int state = 0;
    private ExecutorService executorService;

    public GrpcEventServiceServer(GrpcServerConfigs grpcServerConfigs, SiddhiAppContext siddhiAppContext, String str) {
        this.serverInterceptor = new SourceServerInterceptor(grpcServerConfigs.getServiceConfigs().isDefaultService());
        this.grpcServerConfigs = grpcServerConfigs;
        this.lock = new ReentrantLock();
        this.condition = this.lock.newCondition();
        this.executorService = new ThreadPoolExecutor(grpcServerConfigs.getThreadPoolSize(), grpcServerConfigs.getThreadPoolSize(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(grpcServerConfigs.getThreadPoolBufferSize()));
        setServerPropertiesToBuilder(siddhiAppContext.getName(), str);
        addServicesAndBuildServer(siddhiAppContext.getName(), str);
    }

    @Override // io.siddhi.extension.io.grpc.source.ServiceServer
    protected void setServerPropertiesToBuilder(String str, String str2) {
        this.serverBuilder = NettyServerBuilder.forPort(this.grpcServerConfigs.getServiceConfigs().getPort());
        if (this.grpcServerConfigs.getServiceConfigs().getKeystoreFilePath() != null) {
            try {
                SslContextBuilder sslContextBuilder = getSslContextBuilder(this.grpcServerConfigs.getServiceConfigs().getKeystoreFilePath(), this.grpcServerConfigs.getServiceConfigs().getKeystorePassword(), this.grpcServerConfigs.getServiceConfigs().getKeystoreAlgorithm(), this.grpcServerConfigs.getServiceConfigs().getTlsStoreType(), str, str2);
                if (this.grpcServerConfigs.getServiceConfigs().getTruststoreFilePath() != null) {
                    sslContextBuilder = addTrustStore(this.grpcServerConfigs.getServiceConfigs().getTruststoreFilePath(), this.grpcServerConfigs.getServiceConfigs().getTruststorePassword(), this.grpcServerConfigs.getServiceConfigs().getTruststoreAlgorithm(), sslContextBuilder, this.grpcServerConfigs.getServiceConfigs().getTlsStoreType(), str, str2).clientAuth(ClientAuth.REQUIRE);
                }
                this.serverBuilder.sslContext(sslContextBuilder.build());
            } catch (IOException | KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException | CertificateException e) {
                throw new SiddhiAppCreationException(str + ": " + str2 + ": Error while creating SslContext. " + e.getMessage(), e);
            }
        }
        if (this.grpcServerConfigs.getMaxInboundMessageSize() != -1) {
            this.serverBuilder.maxInboundMessageSize(this.grpcServerConfigs.getMaxInboundMessageSize());
        }
        if (this.grpcServerConfigs.getMaxInboundMetadataSize() != -1) {
            this.serverBuilder.maxInboundMetadataSize(this.grpcServerConfigs.getMaxInboundMetadataSize());
        }
    }

    @Override // io.siddhi.extension.io.grpc.source.ServiceServer
    protected void addServicesAndBuildServer(final String str, final String str2) {
        this.server = this.serverBuilder.addService(ServerInterceptors.intercept(new EventServiceGrpc.EventServiceImplBase() { // from class: io.siddhi.extension.io.grpc.source.GrpcEventServiceServer.1
            @Override // org.wso2.grpc.EventServiceGrpc.EventServiceImplBase
            public StreamObserver<Event> consume(final StreamObserver<Empty> streamObserver) {
                GrpcEventServiceServer.this.handlePause(GrpcEventServiceServer.logger);
                return new StreamObserver<Event>() { // from class: io.siddhi.extension.io.grpc.source.GrpcEventServiceServer.1.1
                    @Override // io.grpc.stub.StreamObserver
                    public void onNext(Event event) {
                        if (event.getPayload() == null) {
                            GrpcEventServiceServer.logger.error(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": Dropping request due to missing payload ");
                            streamObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                            return;
                        }
                        if (!event.getHeadersMap().containsKey(GrpcConstants.STREAM_ID)) {
                            GrpcEventServiceServer.logger.error(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": Dropping request due to missing stream.id ");
                            streamObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                            return;
                        }
                        if (!GrpcEventServiceServer.this.subscribersForConsume.containsKey(event.getHeadersMap().get(GrpcConstants.STREAM_ID))) {
                            GrpcEventServiceServer.logger.error(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": Dropping request because requested stream with stream.id " + event.getHeadersMap().get("streamID") + " not subcribed to the gRPC server on port " + GrpcEventServiceServer.this.grpcServerConfigs.getServiceConfigs().getPort());
                            streamObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                            return;
                        }
                        try {
                            try {
                                GrpcEventServiceServer.this.executorService.execute(new GrpcWorkerThread((AbstractGrpcSource) GrpcEventServiceServer.this.subscribersForConsume.get(event.getHeadersMap().get(GrpcConstants.STREAM_ID)), event.getPayload(), event.getHeadersMap(), GrpcEventServiceServer.metaDataMap.get()));
                                streamObserver.onNext(Empty.getDefaultInstance());
                                streamObserver.onCompleted();
                                GrpcEventServiceServer.metaDataMap.remove();
                            } catch (SiddhiAppRuntimeException e) {
                                GrpcEventServiceServer.logger.error(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": Dropping request. " + e.getMessage());
                                streamObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                                GrpcEventServiceServer.metaDataMap.remove();
                            }
                        } catch (Throwable th) {
                            GrpcEventServiceServer.metaDataMap.remove();
                            throw th;
                        }
                    }

                    @Override // io.grpc.stub.StreamObserver
                    public void onError(Throwable th) {
                    }

                    @Override // io.grpc.stub.StreamObserver
                    public void onCompleted() {
                    }
                };
            }

            @Override // org.wso2.grpc.EventServiceGrpc.EventServiceImplBase
            public void process(Event event, StreamObserver<Event> streamObserver) {
                GrpcEventServiceServer.this.handlePause(GrpcEventServiceServer.logger);
                if (event.getPayload() == null) {
                    GrpcEventServiceServer.logger.error(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": Dropping request due to missing payload ");
                    streamObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                    return;
                }
                if (!event.getHeadersMap().containsKey(GrpcConstants.STREAM_ID)) {
                    GrpcEventServiceServer.logger.error(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": Dropping request due to missing stream.id ");
                    streamObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                    return;
                }
                if (!GrpcEventServiceServer.this.subscribersForProcess.containsKey(event.getHeadersMap().get(GrpcConstants.STREAM_ID))) {
                    GrpcEventServiceServer.logger.error(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": Dropping request because requested stream with stream.id " + event.getHeadersMap().get(GrpcConstants.STREAM_ID) + " not subcribed to the gRPC server on port " + GrpcEventServiceServer.this.grpcServerConfigs.getServiceConfigs().getPort());
                    streamObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                    return;
                }
                String uuid = UUID.randomUUID().toString();
                HashMap hashMap = new HashMap();
                hashMap.put(GrpcConstants.MESSAGE_ID, uuid);
                hashMap.putAll(event.getHeadersMap());
                try {
                    try {
                        GrpcServiceSource grpcServiceSource = (GrpcServiceSource) GrpcEventServiceServer.this.subscribersForProcess.get(event.getHeadersMap().get(GrpcConstants.STREAM_ID));
                        GrpcEventServiceServer.this.executorService.execute(new GrpcWorkerThread((AbstractGrpcSource) grpcServiceSource, event.getPayload(), (Map<String, String>) hashMap, GrpcEventServiceServer.metaDataMap.get()));
                        grpcServiceSource.putStreamObserver(uuid, streamObserver);
                        grpcServiceSource.scheduleServiceTimeout(uuid);
                        GrpcEventServiceServer.metaDataMap.remove();
                    } catch (SiddhiAppRuntimeException e) {
                        GrpcEventServiceServer.logger.error(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": Dropping request. " + e.getMessage(), e);
                        streamObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
                        GrpcEventServiceServer.metaDataMap.remove();
                    }
                } catch (Throwable th) {
                    GrpcEventServiceServer.metaDataMap.remove();
                    throw th;
                }
            }
        }, this.serverInterceptor)).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.siddhi.extension.io.grpc.source.ServiceServer
    public void connectServer(Logger logger2, Source.ConnectionCallback connectionCallback, String str, String str2) {
        try {
            this.server.start();
            this.state = 1;
            if (logger2.isDebugEnabled()) {
                logger2.debug(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": gRPC Server started");
            }
        } catch (IOException e) {
            if (e.getCause() instanceof BindException) {
                throw new SiddhiAppValidationException(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": Another server is already running on the port " + this.grpcServerConfigs.getServiceConfigs().getPort() + ". Please provide a different port");
            }
            connectionCallback.onError(new ConnectionUnavailableException(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": Error when starting the server. " + e.getMessage(), e));
            throw new SiddhiAppRuntimeException(str + ": " + str2 + ": ", e);
        }
    }

    @Override // io.siddhi.extension.io.grpc.source.ServiceServer
    public void disconnectServer(Logger logger2, String str, String str2) {
        try {
            if (this.server == null) {
                if (logger2.isDebugEnabled()) {
                    logger2.debug(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": Illegal state. Server already stopped.");
                    return;
                }
                return;
            }
            this.server.shutdown();
            if (getGrpcServerConfigs().getServerShutdownWaitingTimeInMillis() <= 0) {
                this.state = 2;
                return;
            }
            if (this.server.awaitTermination(getGrpcServerConfigs().getServerShutdownWaitingTimeInMillis(), TimeUnit.MILLISECONDS)) {
                if (logger2.isDebugEnabled()) {
                    logger2.debug(str + ": " + str2 + ": Server stopped");
                }
            } else {
                this.server.shutdownNow();
                if (!this.server.awaitTermination(getGrpcServerConfigs().getServerShutdownWaitingTimeInMillis(), TimeUnit.MILLISECONDS)) {
                    throw new SiddhiAppRuntimeException(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": Unable to shutdown server");
                }
            }
        } catch (InterruptedException e) {
            throw new SiddhiAppRuntimeException(str + ": " + str2 + ": " + e.getMessage(), e);
        }
    }

    @Override // io.siddhi.extension.io.grpc.source.ServiceServer
    protected SslContextBuilder getSslContextBuilder(String str, String str2, String str3, String str4, String str5, String str6) throws KeyStoreException, NoSuchAlgorithmException, CertificateException, UnrecoverableKeyException {
        char[] charArray = str2.toCharArray();
        KeyStore keyStore = KeyStore.getInstance(str4);
        try {
            FileInputStream fileInputStream = new FileInputStream(str);
            Throwable th = null;
            try {
                keyStore.load(fileInputStream, charArray);
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(str3);
                keyManagerFactory.init(keyStore, charArray);
                return GrpcSslContexts.configure(SslContextBuilder.forServer(keyManagerFactory));
            } finally {
            }
        } catch (IOException e) {
            throw new SiddhiAppCreationException(str5 + ": " + str6 + ": " + e.getMessage(), e);
        }
    }

    @Override // io.siddhi.extension.io.grpc.source.ServiceServer
    protected SslContextBuilder addTrustStore(String str, String str2, String str3, SslContextBuilder sslContextBuilder, String str4, String str5, String str6) throws NoSuchAlgorithmException, KeyStoreException, CertificateException {
        char[] charArray = str2.toCharArray();
        KeyStore keyStore = KeyStore.getInstance(str4);
        try {
            FileInputStream fileInputStream = new FileInputStream(str);
            Throwable th = null;
            try {
                keyStore.load(fileInputStream, charArray);
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(str3);
                trustManagerFactory.init(keyStore);
                return sslContextBuilder.trustManager(trustManagerFactory).clientAuth(ClientAuth.REQUIRE);
            } finally {
            }
        } catch (IOException e) {
            throw new SiddhiAppCreationException(str5 + ": " + str6 + ": " + e.getMessage(), e);
        }
    }

    public GrpcServerConfigs getGrpcServerConfigs() {
        return this.grpcServerConfigs;
    }

    public void subscribe(String str, AbstractGrpcSource abstractGrpcSource, String str2, SiddhiAppContext siddhiAppContext) {
        if (str2.equalsIgnoreCase(GrpcConstants.DEFAULT_METHOD_NAME_WITHOUT_RESPONSE)) {
            if (abstractGrpcSource instanceof GrpcSource) {
                this.subscribersForConsume.putIfAbsent(str, (GrpcSource) abstractGrpcSource);
            }
        } else {
            if (!str2.equalsIgnoreCase(GrpcConstants.DEFAULT_METHOD_NAME_WITH_RESPONSE)) {
                throw new SiddhiAppValidationException(siddhiAppContext.getName() + ": " + str + ": method name should be either process or consume but given as " + str2);
            }
            if (abstractGrpcSource instanceof GrpcServiceSource) {
                this.subscribersForProcess.putIfAbsent(str, (GrpcServiceSource) abstractGrpcSource);
            }
        }
    }

    public void unsubscribe(String str, String str2, SiddhiAppContext siddhiAppContext) {
        if (str2.equalsIgnoreCase(GrpcConstants.DEFAULT_METHOD_NAME_WITHOUT_RESPONSE)) {
            this.subscribersForConsume.remove(str);
        } else {
            if (!str2.equalsIgnoreCase(GrpcConstants.DEFAULT_METHOD_NAME_WITH_RESPONSE)) {
                throw new SiddhiAppValidationException(siddhiAppContext.getName() + ": " + str + ": method name should be either process or consume but given as " + str2);
            }
            this.subscribersForProcess.remove(str);
        }
    }

    public boolean isShutDown() {
        return this.server.isShutdown();
    }

    public int getState() {
        return this.state;
    }

    public int getNumSubscribers() {
        return this.subscribersForConsume.size() + this.subscribersForProcess.size();
    }
}
