/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.grpc.source;

import com.google.protobuf.AbstractMessageLite;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.grpc.source.AbstractGrpcSource;
import io.siddhi.extension.io.grpc.util.GenericServiceClass;
import io.siddhi.extension.io.grpc.util.GrpcSourceRegistry;
import io.siddhi.extension.io.grpc.util.GrpcUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.wso2.grpc.Event;
import org.wso2.grpc.EventServiceGrpc;

@Extension(name="grpc-service", namespace="source", description="This extension implements a grpc server for receiving and responding to requests. During initialization time a grpc server is started on the user specified port exposing the required service as given in the url. This source also has a default mode and a user defined grpc service mode. By default this uses EventService. Please find the proto definition [here](https://github.com/siddhi-io/siddhi-io-grpc/tree/master/component/src/main/resources/EventService.proto) In the default mode this will use the EventService process method. This accepts grpc message class Event as defined in the EventService proto. This uses GrpcServiceResponse sink to send reponses back in the same Event message format.", parameters={@Parameter(name="receiver.url", description="The url which can be used by a client to access the grpc server in this extension. This url should consist the host address, port, service name, method name in the following format. `grpc://0.0.0.0:9763/<serviceName>/<methodName>`", type={DataType.STRING}), @Parameter(name="max.inbound.message.size", description="Sets the maximum message size in bytes allowed to be received on the server.", type={DataType.INT}, optional=true, defaultValue="4194304"), @Parameter(name="max.inbound.metadata.size", description="Sets the maximum size of metadata in bytes allowed to be received.", type={DataType.INT}, optional=true, defaultValue="8192"), @Parameter(name="service.timeout", description="The period of time in milliseconds to wait for siddhi to respond to a request received. After this time period of receiving a request it will be closed with an error message.", type={DataType.INT}, optional=true, defaultValue="10000"), @Parameter(name="server.shutdown.waiting.time", description="The time in seconds to wait for the server to shutdown, giving up if the timeout is reached.", type={DataType.LONG}, optional=true, defaultValue="5"), @Parameter(name="truststore.file", description="the file path of truststore. If this is provided then server authentication is enabled", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="truststore.password", description="the password of truststore. If this is provided then the integrity of the keystore is checked", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="truststore.algorithm", description="the encryption algorithm to be used for server authentication", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="tls.store.type", description="TLS store type", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="keystore.file", description="the file path of keystore. If this is provided then client authentication is enabled", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="keystore.password", description="the password of keystore", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="keystore.algorithm", description="the encryption algorithm to be used for client authentication", type={DataType.STRING}, optional=true, defaultValue="-")}, examples={@Example(syntax="@source(type='grpc-service',\n       receiver.url='grpc://localhost:8888/org.wso2.grpc.EventService/process',\n       source.id='1',\n       @map(type='json', @attributes(messageId='trp:messageId', message='message')))\ndefine stream FooStream (messageId String, message String);", description="Here a grpc server will be started at port 8888. The process method of EventService will be exposed for clients. source.id is set as 1. So a grpc-service-response sink with source.id = 1 will send responses back for requests received to this source. Note that it is required to specify the transport property messageId since we need to correlate the request message with the response."), @Example(syntax="@sink(type='grpc-service-response',\n      source.id='1',\n      @map(type='json'))\ndefine stream BarStream (messageId String, message String);\n\n@source(type='grpc-service',\n       receiver.url='grpc://134.23.43.35:8080/org.wso2.grpc.EventService/process',\n       source.id='1',\n       @map(type='json', @attributes(messageId='trp:messageId', message='message')))\ndefine stream FooStream (messageId String, message String);\n\nfrom FooStream\nselect * \ninsert into BarStream;", description="The grpc requests are received through the grpc-service sink. Each received event is sent back through grpc-service-source. This is just a passthrough through Siddhi as we are selecting everything from FooStream and inserting into BarStream."), @Example(syntax="@source(type='grpc-service', source.id='1'        receiver.url='grpc://locanhost:8888/org.wso2.grpc.EventService/consume',        @map(type='json', @attributes(name='trp:name', age='trp:age', message='message'))) define stream BarStream (message String, name String, age int);", description="Here we are getting headers sent with the request as transport properties and injecting them into the stream. With each request a header will be sent in MetaData in the following format: 'Name:John', 'Age:23'"), @Example(syntax="@sink(type='grpc-service-response',\n      source.id='1',\n      message.id='{{messageId}}',\n      @map(type='protobuf',\n@payload(stringValue='a',intValue='b',longValue='c',booleanValue='d',floatValue = 'e', doubleValue ='f')))\ndefine stream BarStream (a string,messageId string, b int,c long,d bool,e float,f double);\n\n@source(type='grpc-service',\n       receiver.url='grpc://134.23.43.35:8888/org.wso2.grpc.test.MyService/process',\n       source.id='1',\n       @map(type='protobuf', @attributes(messageId='trp:message.id', a = 'stringValue', b = 'intValue', c = 'longValue',d = 'booleanValue', e = 'floatValue', f ='doubleValue')))\ndefine stream FooStream (a string,messageId string, b int,c long,d bool,e float,f double);\n\nfrom FooStream\nselect * \ninsert into BarStream;", description="Here a grpc server will be started at port 8888. The process method of the MyService will be exposed to the clients.source.id is set as 1.So a grpc-service-response sink with source.id = 1 will send responses back for requests received to this source. Note that it is required to specify the transport property messageId since we need to correlate the request message with the response.")})
public class GrpcServiceSource
extends AbstractGrpcSource {
    private static final Logger logger = Logger.getLogger((String)GrpcServiceSource.class.getName());
    protected String[] requestedTransportPropertyNames;
    protected Server server;
    private Map<String, StreamObserver<Event>> streamObserverMap = Collections.synchronizedMap(new HashMap());
    private Map<String, StreamObserver<Any>> genericStreamObserverMap = Collections.synchronizedMap(new HashMap());
    private String sourceId;
    private long serviceTimeout;
    private Timer timer;

    @Override
    public void initializeGrpcServer(int port) {
        if (this.isDefaultMode) {
            this.server = ((NettyServerBuilder)this.serverBuilder.addService(ServerInterceptors.intercept((BindableService)new EventServiceGrpc.EventServiceImplBase(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void process(Event request, StreamObserver<Event> responseObserver) {
                    if (request.getPayload() == null) {
                        logger.error((Object)(GrpcServiceSource.this.siddhiAppContext.getName() + ":" + GrpcServiceSource.this.streamID + ": Dropping request due to missing payload "));
                        responseObserver.onError((Throwable)new StatusRuntimeException(Status.DATA_LOSS));
                    } else {
                        String messageId = UUID.randomUUID().toString();
                        HashMap<String, String> transportPropertyMap = new HashMap<String, String>();
                        transportPropertyMap.put("message.id", messageId);
                        transportPropertyMap.putAll(request.getHeadersMap());
                        try {
                            GrpcServiceSource.this.streamObserverMap.put(messageId, responseObserver);
                            GrpcServiceSource.this.timer.schedule((TimerTask)new ServiceSourceTimeoutChecker(messageId, GrpcServiceSource.this.siddhiAppContext.getTimestampGenerator().currentTime()), GrpcServiceSource.this.serviceTimeout);
                            GrpcServiceSource.this.sourceEventListener.onEvent((Object)request.getPayload(), GrpcUtils.extractHeaders(transportPropertyMap, AbstractGrpcSource.metaDataMap.get(), GrpcServiceSource.this.requestedTransportPropertyNames));
                        }
                        catch (SiddhiAppRuntimeException e) {
                            logger.error((Object)(GrpcServiceSource.this.siddhiAppContext.getName() + ":" + GrpcServiceSource.this.streamID + ": Dropping request. " + e.getMessage()), (Throwable)e);
                            responseObserver.onError((Throwable)new StatusRuntimeException(Status.DATA_LOSS));
                        }
                        finally {
                            AbstractGrpcSource.metaDataMap.remove();
                        }
                    }
                }
            }, (ServerInterceptor[])new ServerInterceptor[]{this.serverInterceptor}))).build();
        } else {
            GenericServiceClass.setServiceName(this.serviceName);
            GenericServiceClass.setNonEmptyResponseMethodName(this.methodName);
            GenericServiceClass.AnyServiceImplBase service = new GenericServiceClass.AnyServiceImplBase(){

                @Override
                public void handleNonEmptyResponse(Any request, StreamObserver<Any> responseObserver) {
                    Object requestObject;
                    try {
                        Method parseFrom = GrpcServiceSource.this.requestClass.getDeclaredMethod("parseFrom", ByteString.class);
                        requestObject = parseFrom.invoke((Object)GrpcServiceSource.this.requestClass, request.toByteString());
                    }
                    catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                        throw new SiddhiAppCreationException(GrpcServiceSource.this.siddhiAppContext.getName() + ":" + GrpcServiceSource.this.streamID + ": Invalid method name provided in the url, provided method name : '" + GrpcServiceSource.this.methodName + "' expected one of these methods : " + GrpcUtils.getRPCmethodList(GrpcServiceSource.this.serviceReference, GrpcServiceSource.this.siddhiAppContext.getName()), (Throwable)e);
                    }
                    String messageId = UUID.randomUUID().toString();
                    HashMap<String, String> transportPropertyMap = new HashMap<String, String>();
                    transportPropertyMap.put("message.id", messageId);
                    try {
                        GrpcServiceSource.this.genericStreamObserverMap.put(messageId, responseObserver);
                        GrpcServiceSource.this.timer.schedule((TimerTask)new ServiceSourceTimeoutChecker(messageId, GrpcServiceSource.this.siddhiAppContext.getTimestampGenerator().currentTime()), GrpcServiceSource.this.serviceTimeout);
                        GrpcServiceSource.this.sourceEventListener.onEvent(requestObject, GrpcUtils.extractHeaders(transportPropertyMap, AbstractGrpcSource.metaDataMap.get(), GrpcServiceSource.this.requestedTransportPropertyNames));
                    }
                    catch (SiddhiAppRuntimeException e) {
                        logger.error((Object)(GrpcServiceSource.this.siddhiAppContext.getName() + ":" + GrpcServiceSource.this.streamID + ": Dropping request. " + e.getMessage()), (Throwable)e);
                        responseObserver.onError((Throwable)new StatusRuntimeException(Status.DATA_LOSS));
                    }
                }
            };
            this.server = ((NettyServerBuilder)this.serverBuilder.addService(ServerInterceptors.intercept((BindableService)service, (ServerInterceptor[])new ServerInterceptor[]{this.serverInterceptor}))).build();
        }
    }

    @Override
    public void initSource(OptionHolder optionHolder, String[] requestedTransportPropertyNames) {
        this.sourceId = optionHolder.validateAndGetOption("source.id").getValue();
        this.requestedTransportPropertyNames = (String[])requestedTransportPropertyNames.clone();
        this.serviceTimeout = Long.parseLong(optionHolder.getOrCreateOption("service.timeout", "10000").getValue());
        this.timer = new Timer();
        GrpcSourceRegistry.getInstance().putGrpcServiceSource(this.sourceId, this);
    }

    public void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        this.connectGrpcServer(this.server, logger, connectionCallback);
    }

    public void disconnect() {
        this.disconnectGrpcServer(this.server, logger);
    }

    public void handleCallback(Object messageId, Object responsePayload) {
        if (this.isDefaultMode) {
            StreamObserver<Event> streamObserver = this.streamObserverMap.remove(messageId);
            String responsePayloadString = (String)responsePayload;
            if (streamObserver != null) {
                Event.Builder responseBuilder = Event.newBuilder();
                responseBuilder.setPayload(responsePayloadString);
                Event response = responseBuilder.build();
                streamObserver.onNext((Object)response);
                streamObserver.onCompleted();
            }
        } else {
            StreamObserver<Any> genericStreamObserver = this.genericStreamObserverMap.remove(messageId);
            if (genericStreamObserver != null) {
                try {
                    Method toByteString = AbstractMessageLite.class.getDeclaredMethod("toByteString", new Class[0]);
                    ByteString responseByteString = (ByteString)toByteString.invoke(responsePayload, new Object[0]);
                    Any response = Any.parseFrom((ByteString)responseByteString);
                    genericStreamObserver.onNext((Object)response);
                    genericStreamObserver.onCompleted();
                }
                catch (InvalidProtocolBufferException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                    throw new SiddhiAppCreationException(this.siddhiAppContext.getName() + ":" + this.streamID + ": Invalid method name provided in the url, provided method name : '" + this.methodName + "' expected one of these methods : " + GrpcUtils.getRPCmethodList(this.serviceReference, this.siddhiAppContext.getName()) + ". " + e.getMessage(), e);
                }
            }
        }
    }

    @Override
    public void destroy() {
        GrpcSourceRegistry.getInstance().removeGrpcServiceSource(this.sourceId);
    }

    class ServiceSourceTimeoutChecker
    extends TimerTask {
        private String messageId;
        private long requestReceivedTime;

        public ServiceSourceTimeoutChecker(String messageId, long requestReceivedTime) {
            this.messageId = messageId;
            this.requestReceivedTime = requestReceivedTime;
        }

        @Override
        public void run() {
            while (this.requestReceivedTime > GrpcServiceSource.this.siddhiAppContext.getTimestampGenerator().currentTime() - GrpcServiceSource.this.serviceTimeout) {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    throw new SiddhiAppRuntimeException(GrpcServiceSource.this.siddhiAppContext.getName() + ": " + GrpcServiceSource.this.streamID + ": " + e.getMessage(), (Throwable)e);
                }
            }
            StreamObserver streamObserver = (StreamObserver)GrpcServiceSource.this.streamObserverMap.remove(this.messageId);
            if (streamObserver != null) {
                streamObserver.onError((Throwable)new StatusRuntimeException(Status.DEADLINE_EXCEEDED));
            }
        }
    }
}

