package io.siddhi.extension.io.grpc.sink;

import com.google.protobuf.Empty;
import io.grpc.Channel;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.grpc.util.GrpcConstants;
import io.siddhi.extension.io.grpc.util.GrpcUtils;
import io.siddhi.extension.io.grpc.util.ServiceConfigs;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.grpc.Event;
import org.wso2.grpc.EventServiceGrpc;

@Extension(name = GrpcConstants.GRPC_PROTOCOL_NAME, namespace = "sink", description = "This extension publishes event data encoded into GRPC Classes as defined in the user input jar. This extension has a default gRPC service classes added. The default service is called \"EventService\". Please find the protobuf definition [here](https://github.com/siddhi-io/siddhi-io-grpc/tree/master/component/src/main/resources/EventService.proto). If we want to use our custom gRPC services, we have to  pack auto-generated gRPC service classes and  protobuf classes into a jar file and add it into the project classpath (or to the `jars` folder in the `siddhi-tooling` folder if we use it with `siddhi-tooling`). Please find the custom protobuf definition that uses in examples [here](https://github.com/siddhi-io/siddhi-io-grpc/tree/master/component/src/main/resources/sample.proto). This grpc sink is used for scenarios where we send a request and don't expect a response back. I.e getting a google.protobuf.Empty response back.", parameters = {@Parameter(name = GrpcConstants.PUBLISHER_URL, description = "The url to which the outgoing events should be published via this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. `grpc://0.0.0.0:9763/<serviceName>/<methodName>`\nFor example:\ngrpc://0.0.0.0:9763/org.wso2.grpc.EventService/consume", type = {DataType.STRING}), @Parameter(name = GrpcConstants.HEADERS, description = "GRPC Request headers in format `\"'<key>:<value>','<key>:<value>'\"`. If header parameter is not provided just the payload is sent", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.IDLE_TIMEOUT_MILLIS, description = "Set the duration in seconds without ongoing RPCs before going to idle mode.", type = {DataType.LONG}, optional = true, defaultValue = "1800"), @Parameter(name = GrpcConstants.KEEP_ALIVE_TIME_MILLIS, description = "Sets the time in seconds without read activity before sending a keepalive ping. Keepalives can increase the load on services so must be used with caution. By default set to Long.MAX_VALUE which disables keep alive pinging.", type = {DataType.LONG}, optional = true, defaultValue = "Long.MAX_VALUE"), @Parameter(name = GrpcConstants.KEEP_ALIVE_TIMEOUT_MILLIS, description = "Sets the time in seconds waiting for read activity after sending a keepalive ping.", type = {DataType.LONG}, optional = true, defaultValue = "20"), @Parameter(name = GrpcConstants.KEEP_ALIVE_WITHOUT_CALLS, description = "Sets whether keepalive will be performed when there are no outstanding RPC on a connection.", type = {DataType.BOOL}, optional = true, defaultValue = GrpcConstants.ENABLE_RETRY_DEFAULT), @Parameter(name = GrpcConstants.ENABLE_RETRY, description = "Enables the retry mechanism provided by the gRPC library.", type = {DataType.BOOL}, optional = true, defaultValue = GrpcConstants.ENABLE_RETRY_DEFAULT), @Parameter(name = GrpcConstants.MAX_RETRY_ATTEMPTS, description = "Sets max number of retry attempts. The total number of retry attempts for each RPC will not exceed this number even if service config may allow a higher number.", type = {DataType.INT}, optional = true, defaultValue = "5"), @Parameter(name = GrpcConstants.RETRY_BUFFER_SIZE, description = "Sets the retry buffer size in bytes. If the buffer limit is exceeded, no RPC could retry at the moment, and in hedging case all hedges but one of the same RPC will cancel.", type = {DataType.LONG}, optional = true, defaultValue = "16777216"), @Parameter(name = GrpcConstants.PER_RPC_BUFFER_SIZE, description = "Sets the per RPC buffer limit in bytes used for retry. The RPC is not retriable if its buffer limit is exceeded.", type = {DataType.LONG}, optional = true, defaultValue = "1048576"), @Parameter(name = "channel.termination.waiting.time", description = "The time in seconds to wait for the channel to become terminated, giving up if the timeout is reached.", type = {DataType.LONG}, optional = true, defaultValue = "5"), @Parameter(name = GrpcConstants.TRUSTSTORE_FILE, description = "the file path of truststore. If this is provided then server authentication is enabled", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.TRUSTSTORE_PASSWORD, description = "the password of truststore. If this is provided then the integrity of the keystore is checked", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.TRUSTSTORE_ALGORITHM, description = "the encryption algorithm to be used for server authentication", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.TLS_STORE_TYPE, description = "TLS store type", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.KEYSTORE_FILE, description = "the file path of keystore. If this is provided then client authentication is enabled", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.KEYSTORE_PASSWORD, description = "the password of keystore", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.KEYSTORE_ALGORITHM, description = "the encryption algorithm to be used for client authentication", type = {DataType.STRING}, optional = true, defaultValue = "-"), @Parameter(name = GrpcConstants.ENABLE_SSL, description = "to enable ssl. If set to true and truststore.file is not given then it will be set to default carbon jks by default", type = {DataType.BOOL}, optional = true, defaultValue = "FALSE"), @Parameter(name = GrpcConstants.ENABLE_MUTUAL_AUTHENTICATION, description = "to enable mutual authentication. If set to true and truststore.file or keystore.file is not given then it will be set to default carbon jks by default", type = {DataType.BOOL}, optional = true, defaultValue = "FALSE")}, examples = {@Example(syntax = "@sink(type='grpc',\n      publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.EventService/consume',\n      @map(type='json'))\ndefine stream FooStream (message String);", description = "Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. sink.id is set to 1 here. So we can write a source with sink.id 1 so that it will listen to responses for requests published from this stream. Note that since we are using EventService/consume the sink will be operating in default mode"), @Example(syntax = "@sink(type='grpc',\n      publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.EventService/consume',\n      headers='{{headers}}',\n      @map(type='json'),\n           @payload('{{message}}'))\ndefine stream FooStream (message String, headers String);", description = "A similar example to above but with headers. Headers are also send into the stream as a data. In the sink headers dynamic property reads the value and sends it as MetaData with the request"), @Example(syntax = "@sink(type='grpc',\n      publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.MyService/send',\n      @map(type='protobuf'),\ndefine stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);", description = "Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 134.23.43.35 listening to port 8080 since there is no mapper provided, attributes of stream definition should be as same as the attributes of protobuf message definition."), @Example(syntax = "@sink(type='grpc',\n      publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.MyService/testMap',\n      @map(type='protobuf'),\ndefine stream FooStream (stringValue string, intValue int,map object);", description = "Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 134.23.43.35 listening to port 8080. The 'map object' in the stream definition defines that this stream is going to use Map object with grpc service. We can use any map object that extends 'java.util.AbstractMap' class."), @Example(syntax = "@sink(type='grpc',\n      publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.MyService/testMap',\n      @map(type='protobuf', \n@payload(stringValue='a',longValue='b',intValue='c',booleanValue='d',floatValue = 'e', doubleValue = 'f'))) \ndefine stream FooStream (a string, b long, c int,d bool,e float,f double);", description = "Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. @payload is provided in this stream, therefore we can use any name for the attributes in the stream definition, but we should correctly map those names with protobuf message attributes. If we are planning to send metadata within a stream we should use @payload to map attributes to identify the metadata attribute and the protobuf attributes separately. "), @Example(syntax = "@sink(type='grpc',\n      publisher.url = 'grpc://194.23.98.100:8888/org.wso2.grpc.test.StreamService/clientStream',\n      @map(type='protobuf')) \ndefine stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);", description = "Here in the grpc sink, we are sending a stream of requests to the server that runs on 194.23.98.100 and port 8888. When we need to send a stream of requests from the grpc sink we have to define a client stream RPC method.Then the siddhi will identify whether it's a unary method or a stream method and send requests according to the method type.")})
/* loaded from: input_file:plugins/siddhi-io-grpc-1.0.8.jar:io/siddhi/extension/io/grpc/sink/GrpcSink.class */
public class GrpcSink extends AbstractGrpcSink {
    private static final Logger logger = Logger.getLogger(GrpcSink.class.getName());
    private StreamObserver responseObserver;
    private AbstractStub asyncStub;
    private StreamObserver requestObserver;
    private Method rpcMethod;

    @Override // io.siddhi.extension.io.grpc.sink.AbstractGrpcSink
    public void initSink(OptionHolder optionHolder) {
        if (!this.serviceConfigs.isDefaultService()) {
            this.responseObserver = new StreamObserver<Object>() { // from class: io.siddhi.extension.io.grpc.sink.GrpcSink.2
                @Override // io.grpc.stub.StreamObserver
                public void onNext(Object obj) {
                }

                @Override // io.grpc.stub.StreamObserver
                public void onError(Throwable th) {
                    GrpcSink.logger.error(GrpcSink.this.siddhiAppName + GrpcConstants.SEMI_COLON_STRING + GrpcSink.this.streamID + ": " + th.getMessage() + " caused by " + th.getMessage(), th);
                }

                @Override // io.grpc.stub.StreamObserver
                public void onCompleted() {
                }
            };
            return;
        }
        this.responseObserver = new StreamObserver<Empty>() { // from class: io.siddhi.extension.io.grpc.sink.GrpcSink.1
            @Override // io.grpc.stub.StreamObserver
            public void onNext(Empty empty) {
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                GrpcSink.logger.error(GrpcSink.this.siddhiAppName + GrpcConstants.SEMI_COLON_STRING + GrpcSink.this.streamID + ": " + th.getMessage() + " caused by " + th.getMessage(), th);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
            }
        };
        if (this.serviceConfigs.getMethodName() == null) {
            this.serviceConfigs.setMethodName(GrpcConstants.DEFAULT_METHOD_NAME_WITHOUT_RESPONSE);
        } else if (!this.serviceConfigs.getMethodName().equalsIgnoreCase(GrpcConstants.DEFAULT_METHOD_NAME_WITHOUT_RESPONSE)) {
            throw new SiddhiAppValidationException(this.siddhiAppName + ": " + this.streamID + ": In default mode grpc-sink when using EventService the method name should be '" + GrpcConstants.DEFAULT_METHOD_NAME_WITHOUT_RESPONSE + "' but given " + this.serviceConfigs.getMethodName());
        }
    }

    public void publish(Object obj, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        if (!this.serviceConfigs.isDefaultService()) {
            if (this.requestObserver != null) {
                this.requestObserver.onNext(obj);
                return;
            }
            try {
                this.rpcMethod.invoke(this.asyncStub, obj, this.responseObserver);
                return;
            } catch (IllegalAccessException | InvocationTargetException e) {
                throw new SiddhiAppValidationException(this.siddhiAppName + GrpcConstants.SEMI_COLON_STRING + this.streamID + ": Invalid method name provided in the url, provided method name: '" + this.serviceConfigs.getMethodName() + "', expected one of these methods: " + GrpcUtils.getRpcMethodList(this.serviceConfigs, this.siddhiAppName, this.streamID), e);
            }
        }
        Event.Builder payload = Event.newBuilder().setPayload(obj.toString());
        if (this.headersOption != null || this.serviceConfigs.getSequenceName() != null) {
            if (this.headersOption == null || !this.headersOption.isStatic()) {
                payload = addHeadersToEventBuilder(dynamicOptions, payload);
            } else {
                payload.putAllHeaders(this.headersMap);
            }
        }
        this.requestObserver.onNext(payload.build());
    }

    public void connect() throws ConnectionUnavailableException {
        if (this.channel == null || this.channel.isShutdown()) {
            this.channel = this.managedChannelBuilder.build();
            if (this.serviceConfigs.isDefaultService()) {
                this.asyncStub = EventServiceGrpc.newStub(this.channel);
                if (this.metadataOption != null && this.metadataOption.isStatic()) {
                    this.asyncStub = attachMetaDataToStub(null, this.asyncStub);
                }
                this.requestObserver = ((EventServiceGrpc.EventServiceStub) this.asyncStub).consume(this.responseObserver);
            } else {
                this.rpcMethod = getRpcMethod(this.serviceConfigs, this.siddhiAppName, this.streamID);
                this.asyncStub = createStub(this.serviceConfigs);
                if (this.metadataOption != null && this.metadataOption.isStatic()) {
                    this.asyncStub = attachMetaDataToStub(null, this.asyncStub);
                }
                try {
                    if (this.rpcMethod.getParameterCount() == 1) {
                        this.requestObserver = (StreamObserver) this.rpcMethod.invoke(this.asyncStub, this.responseObserver);
                    }
                } catch (IllegalAccessException | InvocationTargetException e) {
                    throw new SiddhiAppRuntimeException(this.siddhiAppName + GrpcConstants.SEMI_COLON_STRING + this.streamID + ": Invalid method name provided in the url, provided method name: '" + this.serviceConfigs.getMethodName() + "', expected one of these methods: " + GrpcUtils.getRpcMethodList(this.serviceConfigs, this.siddhiAppName, this.streamID), e);
                }
            }
            if (this.channel.isShutdown()) {
                throw new ConnectionUnavailableException(this.siddhiAppName + ": gRPC service on" + this.streamID + " could not connect to " + this.serviceConfigs.getUrl());
            }
            logger.info(this.siddhiAppName + ": gRPC service on " + this.streamID + " has successfully connected to " + this.serviceConfigs.getUrl());
        }
    }

    public void disconnect() {
        try {
            if (this.requestObserver != null) {
                this.requestObserver.onCompleted();
            }
            if (this.channel != null) {
                if (this.channelTerminationWaitingTimeInMillis > 0) {
                    this.channel.shutdown().awaitTermination(this.channelTerminationWaitingTimeInMillis, TimeUnit.MILLISECONDS);
                } else {
                    this.channel.shutdown();
                }
            }
            this.channel = null;
        } catch (InterruptedException e) {
            logger.error(this.siddhiAppName + ": " + this.streamID + ": Error in shutting down the channel. " + e.getMessage(), e);
        }
    }

    private AbstractStub createStub(ServiceConfigs serviceConfigs) {
        try {
            Class<?> cls = Class.forName(serviceConfigs.getFullyQualifiedServiceName() + GrpcConstants.GRPC_PROTOCOL_NAME_UPPERCAMELCASE);
            return (AbstractStub) cls.getDeclaredMethod(GrpcConstants.NEW_STUB_NAME, Channel.class).invoke(cls, this.channel);
        } catch (ClassNotFoundException e) {
            throw new SiddhiAppRuntimeException(this.siddhiAppName + GrpcConstants.SEMI_COLON_STRING + this.streamID + ": Invalid service name provided in the url, provided service name: '" + serviceConfigs.getFullyQualifiedServiceName() + GrpcConstants.INVERTED_COMMA_STRING, e);
        } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e2) {
            throw new SiddhiAppRuntimeException(this.siddhiAppName + GrpcConstants.SEMI_COLON_STRING + this.streamID + ": Invalid method name provided in the url, provided method name: '" + serviceConfigs.getMethodName() + "', expected one of these methods: " + GrpcUtils.getRpcMethodList(serviceConfigs, this.siddhiAppName, this.streamID), e2);
        }
    }

    private static Method getRpcMethod(ServiceConfigs serviceConfigs, String str, String str2) {
        Method method = null;
        try {
            Method[] methods = Class.forName(serviceConfigs.getFullyQualifiedServiceName() + GrpcConstants.GRPC_PROTOCOL_NAME_UPPERCAMELCASE + GrpcConstants.DOLLAR_SIGN + serviceConfigs.getServiceName() + GrpcConstants.STUB).getMethods();
            int length = methods.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                Method method2 = methods[i];
                if (method2.getName().equalsIgnoreCase(serviceConfigs.getMethodName())) {
                    method = method2;
                    break;
                }
                i++;
            }
            if (method == null) {
                throw new SiddhiAppValidationException(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": Invalid method name provided in the url, provided method name: " + serviceConfigs.getMethodName() + "expected one of these methods: " + GrpcUtils.getRpcMethodList(serviceConfigs, str, str2));
            }
            return method;
        } catch (ClassNotFoundException e) {
            throw new SiddhiAppValidationException(str + GrpcConstants.SEMI_COLON_STRING + str2 + ": Invalid service name provided in the url, provided service name: '" + serviceConfigs.getFullyQualifiedServiceName() + GrpcConstants.INVERTED_COMMA_STRING, e);
        }
    }
}
