package io.siddhi.core.stream.input.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.InMemoryBroker;
import io.siddhi.core.util.transport.OptionHolder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* JADX WARN: Classes with same name are omitted:
  input_file:dependencies/siddhi-core-5.1.29.jar:io/siddhi/core/stream/input/source/InMemorySource.class
 */
@Extension(name = "inMemory", namespace = "source", description = "In-memory source subscribes to a topic to consume events which are published on the same topic by In-memory sinks. This provides a way to connect multiple Siddhi Apps deployed under the same Siddhi Manager (JVM). Here both the publisher and subscriber should have the same event schema (stream definition) for successful data transfer.", parameters = {@Parameter(name = InMemorySource.TOPIC_KEY, type = {DataType.STRING}, description = "Subscribes to the events sent on the given topic.")}, parameterOverloads = {@ParameterOverload(parameterNames = {InMemorySource.TOPIC_KEY})}, examples = {@Example(syntax = "@source(type='inMemory', topic='Stocks', @map(type='passThrough'))\ndefine stream StocksStream (symbol string, price float, volume long);", description = "Here the `StocksStream` uses inMemory source to consume events published on the topic `Stocks` by the inMemory sinks deployed in the same JVM.")})
/* loaded from: input_file:io/siddhi/core/stream/input/source/InMemorySource.class */
public class InMemorySource extends Source {
    private static final Logger LOG = LogManager.getLogger((Class<?>) InMemorySource.class);
    private static final String TOPIC_KEY = "topic";
    private SourceEventListener sourceEventListener;
    private InMemoryBroker.Subscriber subscriber;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = this.pauseLock.newCondition();
    private volatile boolean paused = false;

    @Override // io.siddhi.core.stream.input.source.Source
    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    @Override // io.siddhi.core.stream.input.source.Source
    public StateFactory<State> init(final SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sourceEventListener = sourceEventListener;
        final String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue(TOPIC_KEY, "input inMemory source");
        this.subscriber = new InMemoryBroker.Subscriber() { // from class: io.siddhi.core.stream.input.source.InMemorySource.1
            @Override // io.siddhi.core.util.transport.InMemoryBroker.Subscriber
            public void onMessage(Object obj) {
                if (InMemorySource.this.paused) {
                    InMemorySource.this.pauseLock.lock();
                    while (InMemorySource.this.paused) {
                        try {
                            InMemorySource.this.unpaused.await();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        } finally {
                            InMemorySource.this.pauseLock.unlock();
                        }
                    }
                }
                sourceEventListener.onEvent(obj, (String[]) null);
            }

            @Override // io.siddhi.core.util.transport.InMemoryBroker.Subscriber
            public String getTopic() {
                return validateAndGetStaticValue;
            }
        };
        return null;
    }

    @Override // io.siddhi.core.stream.input.source.Source
    public Class[] getOutputEventClasses() {
        return new Class[0];
    }

    @Override // io.siddhi.core.stream.input.source.Source
    public void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        InMemoryBroker.subscribe(this.subscriber);
    }

    @Override // io.siddhi.core.stream.input.source.Source
    public void disconnect() {
        InMemoryBroker.unsubscribe(this.subscriber);
    }

    @Override // io.siddhi.core.stream.input.source.Source
    public void destroy() {
    }

    @Override // io.siddhi.core.stream.input.source.Source
    public void pause() {
        this.paused = true;
    }

    @Override // io.siddhi.core.stream.input.source.Source
    public void resume() {
        this.paused = false;
        try {
            this.pauseLock.lock();
            this.unpaused.signalAll();
        } finally {
            this.pauseLock.unlock();
        }
    }
}
