package io.siddhi.extension.io.grpc.sink;

import com.google.protobuf.GeneratedMessageV3;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.grpc.util.GrpcConstants;
import io.siddhi.extension.io.grpc.util.GrpcSourceRegistry;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import org.apache.log4j.Logger;

@Extension(name = GrpcConstants.GRPC_SERVICE_RESPONSE_SINK_NAME, namespace = "sink", description = "This extension is used to send responses back to a gRPC client after receiving requests through grpc-service source. This correlates with the particular source using a unique source.id", parameters = {@Parameter(name = GrpcConstants.SOURCE_ID, description = "A unique id to identify the correct source to which this sink is mapped. There is a 1:1 mapping between source and sink", type = {DataType.INT})}, examples = {@Example(syntax = "@sink(type='grpc-service-response',\n      source.id='1',\n      @map(type='json'))\ndefine stream BarStream (messageId String, message String);\n\n@source(type='grpc-service',\n        url='grpc://134.23.43.35:8080/org.wso2.grpc.EventService/process',\n        source.id='1',\n        @map(type='json',\n             @attributes(messageId='trp:messageId', message='message')))\ndefine stream FooStream (messageId String, message String);\nfrom FooStream\nselect * \ninsert into BarStream;\n", description = "The grpc requests are received through the grpc-service sink. Each received event is sent back through grpc-service-source. This is just a passthrough through Siddhi as we are selecting everything from FooStream and inserting into BarStream.")})
/* loaded from: input_file:plugins/siddhi-io-grpc-1.0.8.jar:io/siddhi/extension/io/grpc/sink/GrpcServiceResponseSink.class */
public class GrpcServiceResponseSink extends Sink {
    private static final Logger logger = Logger.getLogger(GrpcServiceResponseSink.class.getName());
    protected GrpcSourceRegistry grpcSourceRegistry = GrpcSourceRegistry.getInstance();
    private String sourceId;
    private Option messageIdOption;

    protected StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        String id = streamDefinition.getId();
        if (optionHolder.isOptionExists(GrpcConstants.SOURCE_ID)) {
            this.sourceId = optionHolder.validateAndGetOption(GrpcConstants.SOURCE_ID).getValue();
        } else if (optionHolder.validateAndGetOption(GrpcConstants.SINK_TYPE_OPTION).getValue().equalsIgnoreCase(GrpcConstants.GRPC_SERVICE_RESPONSE_SINK_NAME)) {
            throw new SiddhiAppValidationException(siddhiAppContext.getName() + GrpcConstants.SEMI_COLON_STRING + id + ": For grpc-service-response sink the parameter source.id is mandatory for receiving responses. Please provide a source.id");
        }
        this.messageIdOption = optionHolder.validateAndGetOption(GrpcConstants.MESSAGE_ID);
        return null;
    }

    public void publish(Object obj, DynamicOptions dynamicOptions, State state) {
        this.grpcSourceRegistry.getGrpcServiceSource(this.sourceId).handleCallback(this.messageIdOption.getValue(dynamicOptions), obj);
    }

    public void connect() throws ConnectionUnavailableException {
    }

    public void disconnect() {
    }

    public void destroy() {
    }

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, GeneratedMessageV3.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{GrpcConstants.MESSAGE_ID};
    }
}
