package io.siddhi.extension.io.grpc.source;

import com.google.protobuf.AbstractMessageLite;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.SystemParameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.grpc.util.GenericService;
import io.siddhi.extension.io.grpc.util.GrpcConstants;
import io.siddhi.extension.io.grpc.util.GrpcSourceRegistry;
import io.siddhi.extension.io.grpc.util.GrpcUtils;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.log4j.Logger;
import org.wso2.grpc.Event;

@Extension(name = "grpc-service", namespace = "source", description = "This extension implements a grpc server for receiving and responding to requests. During initialization time a grpc server is started on the user specified port exposing the required service as given in the url. This source also has a default mode and a user defined grpc service mode. By default this uses EventService. Please find the proto definition [here](https://github.com/siddhi-io/siddhi-io-grpc/tree/master/component/src/main/resources/EventService.proto) In the default mode this will use the EventService process method. If we want to use our custom gRPC services, we have to  pack auto-generated gRPC service classes and  protobuf classes into a jar file and add it into the project classpath (or to the `jars` folder in the `siddhi-tooling` folder if we use it with `siddhi-tooling`). Please find the custom protobuf definition that uses in examples [here](https://github.com/siddhi-io/siddhi-io-grpc/tree/master/component/src/main/resources/sample.proto). This accepts grpc message class Event as defined in the EventService proto. This uses GrpcServiceResponse sink to send reponses back in the same Event message format.", parameters = {@Parameter(name = GrpcConstants.RECEIVER_URL, description = "The url which can be used by a client to access the grpc server in this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. `grpc://0.0.0.0:9763/<serviceName>/<methodName>`\nFor example:\ngrpc://0.0.0.0:9763/org.wso2.grpc.EventService/consume", type = {DataType.STRING}), @Parameter(name = GrpcConstants.MAX_INBOUND_MESSAGE_SIZE, description = "Sets the maximum message size in bytes allowed to be received on the server.", type = {DataType.INT}, optional = true, defaultValue = "4194304"), @Parameter(name = GrpcConstants.MAX_INBOUND_METADATA_SIZE, description = "Sets the maximum size of metadata in bytes allowed to be received.", type = {DataType.INT}, optional = true, defaultValue = "8192"), @Parameter(name = GrpcConstants.SERVICE_TIMEOUT, description = "The period of time in milliseconds to wait for siddhi to respond to a request received. After this time period of receiving a request it will be closed with an error message.", type = {DataType.INT}, optional = true, defaultValue = GrpcConstants.SERVICE_TIMEOUT_DEFAULT), @Parameter(name = GrpcConstants.SERVER_SHUTDOWN_WAITING_TIME, description = "The time in seconds to wait for the server to shutdown, giving up if the timeout is reached.", type = {DataType.LONG}, optional = true, defaultValue = "5"), @Parameter(name = GrpcConstants.TRUSTSTORE_FILE, description = "the file path of truststore. If this is provided then server authentication is enabled", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.TRUSTSTORE_PASSWORD, description = "the password of truststore. If this is provided then the integrity of the keystore is checked", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.TRUSTSTORE_ALGORITHM, description = "the encryption algorithm to be used for server authentication", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.TLS_STORE_TYPE, description = "TLS store type", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.KEYSTORE_FILE, description = "the file path of keystore. If this is provided then client authentication is enabled", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.KEYSTORE_PASSWORD, description = "the password of keystore", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.KEYSTORE_ALGORITHM, description = "the encryption algorithm to be used for client authentication", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.ENABLE_SSL, description = "to enable ssl. If set to true and keystore.file is not given then it will be set to default carbon jks by default", type = {DataType.BOOL}, optional = true, defaultValue = "FALSE"), @Parameter(name = GrpcConstants.ENABLE_MUTUAL_AUTHENTICATION, description = "to enable mutual authentication. If set to true and truststore.file or keystore.file is not given then it will be set to default carbon jks by default", type = {DataType.BOOL}, optional = true, defaultValue = "FALSE"), @Parameter(name = GrpcConstants.THREADPOOL_SIZE, description = "Sets the maximum size of threadpool dedicated to serve requests at the gRPC server", type = {DataType.INT}, optional = true, defaultValue = "100"), @Parameter(name = GrpcConstants.THREADPOOL_BUFFER_SIZE, description = "Sets the maximum size of threadpool buffer server", type = {DataType.INT}, optional = true, defaultValue = "100")}, examples = {@Example(syntax = "@source(type='grpc-service',\n       receiver.url='grpc://localhost:8888/org.wso2.grpc.EventService/process',\n       source.id='1',\n       @map(type='json', @attributes(messageId='trp:messageId', message='message')))\ndefine stream FooStream (messageId String, message String);", description = "Here a grpc server will be started at port 8888. The process method of EventService will be exposed for clients. source.id is set as 1. So a grpc-service-response sink with source.id = 1 will send responses back for requests received to this source. Note that it is required to specify the transport property messageId since we need to correlate the request message with the response."), @Example(syntax = "@sink(type='grpc-service-response',\n      source.id='1',\n      @map(type='json'))\ndefine stream BarStream (messageId String, message String);\n\n@source(type='grpc-service',\n       receiver.url='grpc://134.23.43.35:8080/org.wso2.grpc.EventService/process',\n       source.id='1',\n       @map(type='json', @attributes(messageId='trp:messageId', message='message')))\ndefine stream FooStream (messageId String, message String);\n\nfrom FooStream\nselect * \ninsert into BarStream;", description = "The grpc requests are received through the grpc-service sink. Each received event is sent back through grpc-service-source. This is just a passthrough through Siddhi as we are selecting everything from FooStream and inserting into BarStream."), @Example(syntax = "@source(type='grpc-service', source.id='1' \n       receiver.url='grpc://locanhost:8888/org.wso2.grpc.EventService/consume', \n       @map(type='json', @attributes(name='trp:name', age='trp:age', message='message'))) define stream BarStream (message String, name String, age int);", description = "Here we are getting headers sent with the request as transport properties and injecting them into the stream. With each request a header will be sent in MetaData in the following format: 'Name:John', 'Age:23'"), @Example(syntax = "@sink(type='grpc-service-response',\n      source.id='1',\n      message.id='{{messageId}}',\n      @map(type='protobuf',\n@payload(stringValue='a',intValue='b',longValue='c',booleanValue='d',floatValue = 'e', doubleValue ='f')))\ndefine stream BarStream (a string,messageId string, b int,c long,d bool,e float,f double);\n\n@source(type='grpc-service',\n       receiver.url='grpc://134.23.43.35:8888/org.wso2.grpc.test.MyService/process',\n       source.id='1',\n       @map(type='protobuf', @attributes(messageId='trp:message.id', a = 'stringValue', b = 'intValue', c = 'longValue',d = 'booleanValue', e = 'floatValue', f ='doubleValue')))\ndefine stream FooStream (a string,messageId string, b int,c long,d bool,e float,f double);\n\nfrom FooStream\nselect * \ninsert into BarStream;", description = "Here a grpc server will be started at port 8888. The process method of the MyService will be exposed to the clients. 'source.id' is set as 1. So a grpc-service-response sink with source.id = 1 will send responses back for requests received to this source. Note that it is required to specify the transport property messageId since we need to correlate the request message with the response and also we should map stream attributes with correct protobuf message attributes even they define using the same name as protobuf message attributes.")}, systemParameter = {@SystemParameter(name = GrpcConstants.SYS_KEYSTORE_FILE, description = "This is the key store file with the path ", defaultValue = GrpcConstants.DEFAULT_KEYSTORE_FILE, possibleParameters = {"valid path for a key store file"}), @SystemParameter(name = GrpcConstants.SYS_KEYSTORE_PASSWORD, description = "This is the password used with key store file", defaultValue = "wso2carbon", possibleParameters = {"valid password for the key store file"}), @SystemParameter(name = GrpcConstants.SYS_KEYSTORE_ALGORITHM, description = "The encryption algorithm to be used for client authentication", defaultValue = "SunX509", possibleParameters = {"-"}), @SystemParameter(name = GrpcConstants.SYS_TRUSTSTORE_FILE_PATH, description = "This is the trust store file with the path", defaultValue = GrpcConstants.DEFAULT_TRUSTSTORE_FILE, possibleParameters = {"-"}), @SystemParameter(name = GrpcConstants.SYS_TRUSTSTORE_PASSWORD, description = "This is the password used with trust store file", defaultValue = "wso2carbon", possibleParameters = {"valid password for the trust store file"}), @SystemParameter(name = GrpcConstants.SYS_TRUSTSTORE_ALGORITHM, description = "the encryption algorithm to be used for server authentication", defaultValue = "SunX509", possibleParameters = {"-"})})
/* loaded from: input_file:plugins/siddhi-io-grpc-1.0.8.jar:io/siddhi/extension/io/grpc/source/GrpcServiceSource.class */
public class GrpcServiceSource extends AbstractGrpcSource {
    private static final Logger logger = Logger.getLogger(GrpcServiceSource.class.getName());
    protected Server server;
    private Map<String, StreamObserver<Event>> streamObserverMap = Collections.synchronizedMap(new HashMap());
    private Map<String, StreamObserver<Any>> genericStreamObserverMap = Collections.synchronizedMap(new HashMap());
    private String sourceId;
    private long serviceTimeout;
    private Timer timer;

    /* loaded from: input_file:plugins/siddhi-io-grpc-1.0.8.jar:io/siddhi/extension/io/grpc/source/GrpcServiceSource$ServiceSourceTimeoutChecker.class */
    class ServiceSourceTimeoutChecker extends TimerTask {
        private String messageId;
        private long requestReceivedTime;

        public ServiceSourceTimeoutChecker(String str, long j) {
            this.messageId = str;
            this.requestReceivedTime = j;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            while (this.requestReceivedTime > GrpcServiceSource.this.siddhiAppContext.getTimestampGenerator().currentTime() - GrpcServiceSource.this.serviceTimeout) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    throw new SiddhiAppRuntimeException(GrpcServiceSource.this.siddhiAppContext.getName() + ": " + GrpcServiceSource.this.streamID + ": " + e.getMessage(), e);
                }
            }
            StreamObserver streamObserver = (StreamObserver) GrpcServiceSource.this.streamObserverMap.remove(this.messageId);
            if (streamObserver != null) {
                streamObserver.onError(new StatusRuntimeException(Status.DEADLINE_EXCEEDED));
            }
        }
    }

    public void scheduleServiceTimeout(String str) {
        this.timer.schedule(new ServiceSourceTimeoutChecker(str, this.siddhiAppContext.getTimestampGenerator().currentTime()), this.serviceTimeout);
    }

    @Override // io.siddhi.extension.io.grpc.source.AbstractGrpcSource
    public void initSource(OptionHolder optionHolder, String[] strArr) {
        this.sourceId = optionHolder.validateAndGetOption(GrpcConstants.SOURCE_ID).getValue();
        this.serviceTimeout = Long.parseLong(optionHolder.getOrCreateOption(GrpcConstants.SERVICE_TIMEOUT, GrpcConstants.SERVICE_TIMEOUT_DEFAULT).getValue());
        this.timer = new Timer();
        GrpcSourceRegistry.getInstance().putGrpcServiceSource(this.sourceId, this);
        if (this.grpcServerConfigs.getServiceConfigs().isDefaultService()) {
            GrpcServerManager.getInstance().registerSource(this.grpcServerConfigs, this, GrpcConstants.DEFAULT_METHOD_NAME_WITH_RESPONSE, this.siddhiAppContext, this.streamID);
            return;
        }
        GenericService.setServiceName(this.grpcServerConfigs.getServiceConfigs().getServiceName());
        GenericService.setNonEmptyResponseMethodName(this.grpcServerConfigs.getServiceConfigs().getMethodName());
        this.serviceServer = new GenericServiceServer(this.grpcServerConfigs, this, this.requestClass, this.siddhiAppName, this.streamID);
    }

    public void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        if (!this.grpcServerConfigs.getServiceConfigs().isDefaultService()) {
            this.serviceServer.connectServer(logger, connectionCallback, this.siddhiAppName, this.streamID);
        } else if (GrpcServerManager.getInstance().getServer(this.grpcServerConfigs.getServiceConfigs().getPort()).getState() == 0) {
            this.serviceServer = GrpcServerManager.getInstance().getServer(this.grpcServerConfigs.getServiceConfigs().getPort());
            this.serviceServer.connectServer(logger, connectionCallback, this.siddhiAppContext.getName(), this.streamID);
        }
    }

    public void disconnect() {
        if (this.grpcServerConfigs.getServiceConfigs().isDefaultService()) {
            GrpcServerManager.getInstance().unregisterSource(this.grpcServerConfigs.getServiceConfigs().getPort(), this.streamID, GrpcConstants.DEFAULT_METHOD_NAME_WITH_RESPONSE, logger, this.siddhiAppContext);
        } else {
            this.serviceServer.disconnectServer(logger, this.siddhiAppName, this.streamID);
        }
    }

    public void handleCallback(String str, Object obj) {
        if (this.grpcServerConfigs.getServiceConfigs().isDefaultService()) {
            StreamObserver<Event> remove = this.streamObserverMap.remove(str);
            if (remove != null) {
                Event.Builder newBuilder = Event.newBuilder();
                newBuilder.setPayload((String) obj);
                remove.onNext(newBuilder.build());
                remove.onCompleted();
                return;
            }
            return;
        }
        StreamObserver<Any> remove2 = this.genericStreamObserverMap.remove(str);
        if (remove2 != null) {
            try {
                remove2.onNext(Any.parseFrom((ByteString) AbstractMessageLite.class.getDeclaredMethod(GrpcConstants.TO_BYTE_STRING, new Class[0]).invoke(obj, new Object[0])));
                remove2.onCompleted();
            } catch (InvalidProtocolBufferException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                throw new SiddhiAppValidationException(this.siddhiAppName + GrpcConstants.SEMI_COLON_STRING + this.streamID + ": Invalid method name provided in the url, provided method name: " + this.grpcServerConfigs.getServiceConfigs().getMethodName() + ", Expected one of these these methods: " + GrpcUtils.getRpcMethodList(this.grpcServerConfigs.getServiceConfigs(), this.siddhiAppName, this.streamID), e);
            }
        }
    }

    public void putStreamObserver(String str, StreamObserver streamObserver) {
        if (this.grpcServerConfigs.getServiceConfigs().isDefaultService()) {
            this.streamObserverMap.put(str, streamObserver);
        } else {
            this.genericStreamObserverMap.put(str, streamObserver);
        }
    }

    @Override // io.siddhi.extension.io.grpc.source.AbstractGrpcSource
    public void destroy() {
        GrpcSourceRegistry.getInstance().removeGrpcServiceSource(this.sourceId);
    }

    @Override // io.siddhi.extension.io.grpc.source.AbstractGrpcSource
    public void logError(String str) {
        logger.error(this.siddhiAppContext.getName() + ": " + this.streamID + ": " + str);
    }

    public void pause() {
        this.serviceServer.pause(logger, this.grpcServerConfigs.getServiceConfigs().getUrl());
    }

    public void resume() {
        this.serviceServer.resume(logger, this.grpcServerConfigs.getServiceConfigs().getUrl());
    }
}
