/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.stream.input.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.transport.InMemoryBroker;
import io.siddhi.core.util.transport.OptionHolder;
import java.util.Map;
import org.apache.log4j.Logger;

@Extension(name="inMemory", namespace="source", description="In-memory source that can communicate with other in-memory sinks within the same JVM, it is assumed that the publisher and subscriber of a topic uses same event schema (stream definition).", parameters={@Parameter(name="topic", type={DataType.STRING}, description="Subscribes to sent on the given topic.")}, examples={@Example(syntax="@source(type='inMemory', @map(type='passThrough'))\ndefine stream BarStream (symbol string, price float, volume long)", description="In this example BarStream uses inMemory transport which passes the received event internally without using external transport.")})
public class InMemorySource
extends Source {
    private static final Logger LOG = Logger.getLogger(InMemorySource.class);
    private static final String TOPIC_KEY = "topic";
    private SourceEventListener sourceEventListener;
    private InMemoryBroker.Subscriber subscriber;

    @Override
    public void init(final SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requestedTransportPropertyNames, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sourceEventListener = sourceEventListener;
        final String topic = optionHolder.validateAndGetStaticValue(TOPIC_KEY, "input inMemory source");
        this.subscriber = new InMemoryBroker.Subscriber(){

            @Override
            public void onMessage(Object event) {
                sourceEventListener.onEvent(event, null);
            }

            @Override
            public String getTopic() {
                return topic;
            }
        };
    }

    @Override
    public Class[] getOutputEventClasses() {
        return new Class[0];
    }

    @Override
    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        InMemoryBroker.subscribe(this.subscriber);
    }

    @Override
    public void disconnect() {
        InMemoryBroker.unsubscribe(this.subscriber);
    }

    @Override
    public void destroy() {
    }

    @Override
    public void pause() {
        InMemoryBroker.unsubscribe(this.subscriber);
    }

    @Override
    public void resume() {
        InMemoryBroker.subscribe(this.subscriber);
    }

    @Override
    public Map<String, Object> currentState() {
        return null;
    }

    @Override
    public void restoreState(Map<String, Object> state) {
    }
}

