package io.siddhi.extension.io.grpc.source;

import com.google.protobuf.GeneratedMessageV3;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.grpc.util.GrpcConstants;
import io.siddhi.extension.io.grpc.util.GrpcSourceRegistry;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.wso2.grpc.Event;

@Extension(name = "grpc-call-response", namespace = "source", description = "This grpc source receives responses received from gRPC server for requests sent from a grpc-call sink. The source will receive responses for sink with the same sink.id. For example if you have a gRPC sink with sink.id 15 then we need to set the sink.id as 15 in the source to receives responses. Sinks and sources have 1:1 mapping", parameters = {@Parameter(name = GrpcConstants.SINK_ID, description = "a unique ID that should be set for each grpc-call source. There is a 1:1 mapping between grpc-call sinks and grpc-call-response sources. Each sink has one particular source listening to the responses to requests published from that sink. So the same sink.id should be given when writing the sink also.", type = {DataType.INT})}, examples = {@Example(syntax = "@source(type='grpc-call-response', sink.id= '1')\ndefine stream BarStream (message String);@sink(type='grpc-call',\n      publisher.url = 'grpc://194.23.98.100:8080/EventService/process',\n      sink.id= '1', @map(type='json'))\ndefine stream FooStream (message String);\n", description = "Here we are listening to responses  for requests sent from the sink with sink.id 1 will be received here. The results will be injected into BarStream")})
/* loaded from: input_file:plugins/siddhi-io-grpc-1.0.8.jar:io/siddhi/extension/io/grpc/source/GrpcCallResponseSource.class */
public class GrpcCallResponseSource extends Source {
    private String sinkID;
    private SourceEventListener sourceEventListener;
    private String[] requestedTransportPropertyNames;
    private boolean paused;
    private ReentrantLock lock;
    private Condition condition;
    private static final Logger logger = Logger.getLogger(GrpcCallResponseSource.class.getName());

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public StateFactory init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sourceEventListener = sourceEventListener;
        this.requestedTransportPropertyNames = (String[]) strArr.clone();
        this.sinkID = optionHolder.validateAndGetOption(GrpcConstants.SINK_ID).getValue();
        GrpcSourceRegistry.getInstance().putGrpcCallResponseSource(this.sinkID, this);
        this.lock = new ReentrantLock();
        this.condition = this.lock.newCondition();
        return null;
    }

    public void onResponse(Event event, Map<String, String> map) {
        handlePause();
        this.sourceEventListener.onEvent(event.getPayload(), getTransportProperties(event.getHeadersMap(), map));
    }

    public void onResponse(Object obj, Map<String, String> map) {
        handlePause();
        this.sourceEventListener.onEvent(obj, getTransportProperties(map));
    }

    private String[] getTransportProperties(Map<String, String> map, Map<String, String> map2) {
        map2.putAll(map);
        String[] strArr = new String[this.requestedTransportPropertyNames.length];
        for (int i = 0; i < this.requestedTransportPropertyNames.length; i++) {
            if (map2.containsKey(this.requestedTransportPropertyNames[i])) {
                strArr[i] = map2.get(this.requestedTransportPropertyNames[i]);
            }
        }
        return strArr;
    }

    private String[] getTransportProperties(Map<String, String> map) {
        String[] strArr = new String[this.requestedTransportPropertyNames.length];
        for (int i = 0; i < this.requestedTransportPropertyNames.length; i++) {
            if (map.containsKey(this.requestedTransportPropertyNames[i])) {
                strArr[i] = map.get(this.requestedTransportPropertyNames[i]);
            }
        }
        return strArr;
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class, GeneratedMessageV3.class};
    }

    public void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
    }

    public void disconnect() {
    }

    public void destroy() {
        GrpcSourceRegistry.getInstance().removeGrpcCallResponseSource(this.sinkID);
    }

    public void pause() {
        this.lock.lock();
        try {
            this.paused = true;
            logger.info("Response has pause for grpc-call-response source with sink.id: " + this.sinkID);
        } finally {
            this.lock.unlock();
        }
    }

    public void resume() {
        this.lock.lock();
        try {
            this.paused = false;
            logger.info("Response has resume for grpc-call-response source with sink.id: " + this.sinkID);
            this.condition.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    private void handlePause() {
        if (this.paused) {
            this.lock.lock();
            while (this.paused) {
                try {
                    this.condition.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    logger.error("Thread interrupted while pausing ", e);
                    return;
                } finally {
                    this.lock.unlock();
                }
            }
        }
    }
}
