/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.stream.output.sink;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.InMemoryBroker;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.core.util.transport.SubscriberUnAvailableException;
import io.siddhi.query.api.definition.StreamDefinition;
import org.apache.log4j.Logger;

@Extension(name="inMemory", namespace="sink", description="In-memory sink publishes events to In-memory sources that are subscribe to the same topic to which the sink publishes. This provides a way to connect multiple Siddhi Apps deployed under the same Siddhi Manager (JVM). Here both the publisher and subscriber should have the same event schema (stream definition) for successful data transfer.", parameters={@Parameter(name="topic", type={DataType.STRING}, description="Event are delivered to allthe subscribers subscribed on this topic.")}, parameterOverloads={@ParameterOverload(parameterNames={"topic"})}, examples={@Example(syntax="@sink(type='inMemory', topic='Stocks', @map(type='passThrough'))\ndefine stream StocksStream (symbol string, price float, volume long);", description="Here the `StocksStream` uses inMemory sink to emit the Siddhi events to all the inMemory sources deployed in the same JVM and subscribed to the topic `Stocks`.")})
public class InMemorySink
extends Sink<State> {
    private static final Logger log = Logger.getLogger(InMemorySink.class);
    protected static final String TOPIC_KEY = "topic";
    protected Option topicOption;

    @Override
    public Class[] getSupportedInputEventClasses() {
        return new Class[]{Object.class};
    }

    @Override
    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    @Override
    public String[] getSupportedDynamicOptions() {
        return new String[]{TOPIC_KEY};
    }

    @Override
    protected StateFactory<State> init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder, ConfigReader sinkConfigReader, SiddhiAppContext siddhiAppContext) {
        this.topicOption = optionHolder.validateAndGetOption(TOPIC_KEY);
        return null;
    }

    @Override
    public void connect() throws ConnectionUnavailableException {
    }

    @Override
    public void disconnect() {
    }

    @Override
    public void destroy() {
    }

    @Override
    public void publish(Object payload, DynamicOptions dynamicOptions, State s) throws ConnectionUnavailableException {
        try {
            InMemoryBroker.publish(this.topicOption.getValue(dynamicOptions), payload);
        }
        catch (SubscriberUnAvailableException e) {
            this.onError(payload, dynamicOptions, new ConnectionUnavailableException(e.getMessage(), e));
        }
    }
}

