/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.grpc.sink;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Channel;
import io.grpc.stub.AbstractStub;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.event.Event;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.grpc.sink.AbstractGrpcSink;
import io.siddhi.extension.io.grpc.util.GrpcSourceRegistry;
import io.siddhi.extension.io.grpc.util.GrpcUtils;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.grpc.Event;
import org.wso2.grpc.EventServiceGrpc;

@Extension(name="grpc-call", namespace="sink", description="This extension publishes event data encoded into GRPC Classes as defined in the user input jar. This extension has a default gRPC service classes jar added. The default service is called \"EventService\". Please find the protobuf definition [here](https://github.com/siddhi-io/siddhi-io-grpc/tree/master/component/src/main/resources/EventService.proto). This grpc-call sink is used for scenarios where we send a request out and expect a response back. In default mode this will use EventService process method. grpc-call-response source is used to receive the responses. A unique sink.id is used to correlate between the sink and its corresponding source.", parameters={@Parameter(name="publisher.url", description="The url to which the outgoing events should be published via this extension. This url should consist the host address, port, service name, method name in the following format. `grpc://0.0.0.0:9763/<serviceName>/<methodName>`", type={DataType.STRING}), @Parameter(name="sink.id", description="a unique ID that should be set for each grpc-call-sink. There is a 1:1 mapping between grpc-call sinks and grpc-call-response sources. Each sink has one particular source listening to the responses to requests published from that sink. So the same sink.id should be given when writing the source also.", type={DataType.INT}), @Parameter(name="headers", description="GRPC Request headers in format `\"'<key>:<value>','<key>:<value>'\"`. If header parameter is not provided just the payload is sent", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="idle.timeout", description="Set the duration in seconds without ongoing RPCs before going to idle mode.", type={DataType.LONG}, optional=true, defaultValue="1800"), @Parameter(name="keep.alive.time", description="Sets the time in seconds without read activity before sending a keepalive ping. Keepalives can increase the load on services so must be used with caution. By default set to Long.MAX_VALUE which disables keep alive pinging.", type={DataType.LONG}, optional=true, defaultValue="Long.MAX_VALUE"), @Parameter(name="keep.alive.timeout", description="Sets the time in seconds waiting for read activity after sending a keepalive ping.", type={DataType.LONG}, optional=true, defaultValue="20"), @Parameter(name="keep.alive.without.calls", description="Sets whether keepalive will be performed when there are no outstanding RPC on a connection.", type={DataType.BOOL}, optional=true, defaultValue="false"), @Parameter(name="enable.retry", description="Enables the retry and hedging mechanism provided by the gRPC library.", type={DataType.BOOL}, optional=true, defaultValue="false"), @Parameter(name="max.retry.attempts", description="Sets max number of retry attempts. The total number of retry attempts for each RPC will not exceed this number even if service config may allow a higher number.", type={DataType.INT}, optional=true, defaultValue="5"), @Parameter(name="retry.buffer.size", description="Sets the retry buffer size in bytes. If the buffer limit is exceeded, no RPC could retry at the moment, and in hedging case all hedges but one of the same RPC will cancel.", type={DataType.LONG}, optional=true, defaultValue="16777216"), @Parameter(name="per.rpc.buffer.size", description="Sets the per RPC buffer limit in bytes used for retry. The RPC is not retriable if its buffer limit is exceeded.", type={DataType.LONG}, optional=true, defaultValue="1048576"), @Parameter(name="channel.termination.waiting.time", description="The time in seconds to wait for the channel to become terminated, giving up if the timeout is reached.", type={DataType.LONG}, optional=true, defaultValue="5"), @Parameter(name="max.inbound.message.size", description="Sets the maximum message size allowed to be received on the channel in bytes", type={DataType.LONG}, optional=true, defaultValue="4194304"), @Parameter(name="max.inbound.metadata.size", description="Sets the maximum size of metadata allowed to be received in bytes", type={DataType.LONG}, optional=true, defaultValue="8192"), @Parameter(name="truststore.file", description="the file path of truststore. If this is provided then server authentication is enabled", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="truststore.password", description="the password of truststore. If this is provided then the integrity of the keystore is checked", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="truststore.algorithm", description="the encryption algorithm to be used for server authentication", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="tls.store.type", description="TLS store type", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="keystore.file", description="the file path of keystore. If this is provided then client authentication is enabled", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="keystore.password", description="the password of keystore", type={DataType.STRING}, optional=true, defaultValue="-"), @Parameter(name="keystore.algorithm", description="the encryption algorithm to be used for client authentication", type={DataType.STRING}, optional=true, defaultValue="-")}, examples={@Example(syntax="@sink(type='grpc-call',\n      publisher.url = 'grpc://194.23.98.100:8080/EventService/process',\n      sink.id= '1', @map(type='json'))\ndefine stream FooStream (message String);\n@source(type='grpc-call-response', sink.id= '1')\ndefine stream BarStream (message String);", description="Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. sink.id is set to 1 here. So we can write a source with sink.id 1 so that it will listen to responses for requests published from this stream. Note that since we are using EventService/process the sink will be operating in default mode"), @Example(syntax="@sink(type='grpc-call',\n      publisher.url = 'grpc://194.23.98.100:8080/EventService/process',\n      sink.id= '1', @map(type='json'))\ndefine stream FooStream (message String);\n\n@source(type='grpc-call-response', sink.id= '1')\ndefine stream BarStream (message String);", description="Here with the same FooStream definition we have added a BarStream which has a grpc-call-response source with the same sink.id 1. So the responses for calls sent from the FooStream will be added to BarStream."), @Example(syntax="@sink(type='grpc-call',\n      publisher.url = 'grpc://194.23.98.100:8888/org.wso2.grpc.test.MyService/process',\n      sink.id= '1', @map(type='protobuf'))\ndefine stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);\n\n@source(type='grpc-call-response', sink.id= '1')\ndefine stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);", description="Here with the same FooStream definition we have added a BarStream which has a grpc-call-response source with the same sink.id 1. So the responses for calls sent from the FooStream will be added to BarStream. since there is no mapping available stream definition attributes names should be as same as the attributes of the protobuf message definition")})
public class GrpcCallSink
extends AbstractGrpcSink {
    private static final Logger logger = Logger.getLogger((String)GrpcCallSink.class.getName());
    protected String sinkID;
    protected AbstractStub futureStub;

    @Override
    public void initSink(OptionHolder optionHolder) {
        if (this.isDefaultMode) {
            if (this.methodName == null) {
                this.methodName = "process";
            } else if (!this.methodName.equalsIgnoreCase("process")) {
                throw new SiddhiAppValidationException(this.siddhiAppName + ": " + this.streamID + ": In default mode grpc-call-sink when using EventService the method name should be '" + "process" + "' but given " + this.methodName);
            }
        }
        if (optionHolder.isOptionExists("max.inbound.message.size")) {
            this.managedChannelBuilder.maxInboundMessageSize(Integer.parseInt(optionHolder.validateAndGetOption("max.inbound.message.size").getValue()));
        }
        if (optionHolder.isOptionExists("max.inbound.metadata.size")) {
            this.managedChannelBuilder.maxInboundMetadataSize(Integer.parseInt(optionHolder.validateAndGetOption("max.inbound.metadata.size").getValue()));
        }
        this.sinkID = optionHolder.validateAndGetOption("sink.id").getValue();
    }

    public void publish(Object payload, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        final Map<String, String> siddhiRequestEventData = this.getRequestEventDataMap(dynamicOptions);
        if (this.isDefaultMode) {
            Event.Builder eventBuilder = org.wso2.grpc.Event.newBuilder().setPayload(payload.toString());
            EventServiceGrpc.EventServiceFutureStub currentFutureStub = (EventServiceGrpc.EventServiceFutureStub)this.futureStub;
            if (this.headersOption != null || this.sequenceName != null) {
                eventBuilder = this.addHeadersToEventBuilder(dynamicOptions, eventBuilder);
            }
            if (this.metadataOption != null) {
                currentFutureStub = (EventServiceGrpc.EventServiceFutureStub)this.attachMetaDataToStub(dynamicOptions, currentFutureStub);
            }
            ListenableFuture<org.wso2.grpc.Event> futureResponse = currentFutureStub.process(eventBuilder.build());
            Futures.addCallback(futureResponse, (FutureCallback)new FutureCallback<org.wso2.grpc.Event>(){

                public void onSuccess(org.wso2.grpc.Event result) {
                    GrpcSourceRegistry.getInstance().getGrpcCallResponseSource(GrpcCallSink.this.sinkID).onResponse(result, (Map<String, String>)siddhiRequestEventData);
                }

                public void onFailure(Throwable t) {
                    logger.error((Object)(GrpcCallSink.this.siddhiAppName + ":" + GrpcCallSink.this.streamID + ": " + t.getMessage()));
                }
            }, (Executor)MoreExecutors.directExecutor());
        } else {
            try {
                AbstractStub currentFutureStub = this.futureStub;
                if (this.metadataOption != null) {
                    currentFutureStub = this.attachMetaDataToStub(dynamicOptions, currentFutureStub);
                }
                Method serviceMethod = currentFutureStub.getClass().getDeclaredMethod(this.methodName, this.requestClass);
                ListenableFuture genericRes = (ListenableFuture)serviceMethod.invoke((Object)currentFutureStub, payload);
                Futures.addCallback((ListenableFuture)genericRes, (FutureCallback)new FutureCallback<Object>(){

                    public void onSuccess(Object o) {
                        GrpcSourceRegistry.getInstance().getGrpcCallResponseSource(GrpcCallSink.this.sinkID).onResponse(o, (Map<String, String>)siddhiRequestEventData);
                    }

                    public void onFailure(Throwable t) {
                        logger.error((Object)(GrpcCallSink.this.siddhiAppName + ":" + GrpcCallSink.this.streamID + ": " + t.getMessage()));
                    }
                }, (Executor)MoreExecutors.directExecutor());
            }
            catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                throw new SiddhiAppCreationException(this.siddhiAppName + ": Invalid method name provided in the url, provided method name : '" + this.methodName + "' expected one of these methods : " + GrpcUtils.getRPCmethodList(this.serviceReference, this.siddhiAppName) + ". " + e.getMessage(), (Throwable)e);
            }
        }
    }

    private Map<String, String> getRequestEventDataMap(DynamicOptions dynamicOptions) {
        Event event = dynamicOptions.getEvent();
        Object[] data = event.getData();
        List attributes = this.streamDefinition.getAttributeList();
        HashMap<String, String> requestEventDataMap = new HashMap<String, String>();
        for (int i = 0; i < attributes.size(); ++i) {
            requestEventDataMap.put(((Attribute)attributes.get(i)).getName(), data[i].toString());
        }
        return requestEventDataMap;
    }

    public void connect() throws ConnectionUnavailableException {
        this.channel = this.managedChannelBuilder.build();
        if (this.isDefaultMode) {
            this.futureStub = EventServiceGrpc.newFutureStub((Channel)this.channel);
            logger.info((Object)(this.siddhiAppName + ": gRPC service on " + this.streamID + " has successfully connected to " + this.url));
        } else {
            try {
                Class<?> serviceClass = Class.forName(this.serviceReference + "Grpc");
                Method futureStubCreationMethod = serviceClass.getDeclaredMethod("newFutureStub", Channel.class);
                this.futureStub = (AbstractStub)futureStubCreationMethod.invoke(serviceClass, this.channel);
            }
            catch (ClassNotFoundException e) {
                throw new SiddhiAppCreationException(this.siddhiAppName + ": Invalid service name provided in the url, provided service name : '" + this.serviceReference + "'", (Throwable)e);
            }
            catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                throw new SiddhiAppCreationException(this.siddhiAppName + ": Invalid method name provided in the url, provided method name : '" + this.methodName + "' expected one of these methods : " + GrpcUtils.getRPCmethodList(this.serviceReference, this.siddhiAppName) + ". " + e.getMessage(), (Throwable)e);
            }
        }
        if (GrpcSourceRegistry.getInstance().getGrpcCallResponseSource(this.sinkID) == null) {
            throw new SiddhiAppRuntimeException(this.siddhiAppName + ": " + this.streamID + ": For grpc-call sink to work a grpc-call-response source should be available with the same sink.id. In this case sink.id is " + this.sinkID + ". Please provide a grpc-call-response source with the sink.id " + this.sinkID);
        }
    }

    public void disconnect() {
        try {
            if (this.channelTerminationWaitingTimeInMillis != -1L) {
                this.channel.shutdown().awaitTermination(this.channelTerminationWaitingTimeInMillis, TimeUnit.MILLISECONDS);
            } else if (this.channel != null) {
                this.channel.shutdown();
            }
            this.channel = null;
        }
        catch (InterruptedException e) {
            throw new SiddhiAppRuntimeException(this.siddhiAppName + ":" + this.streamID + ": Error in shutting down the channel. " + e.getMessage(), (Throwable)e);
        }
    }
}

