/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.stream.output.sink;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.InMemoryBroker;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.core.util.transport.SubscriberUnAvailableException;
import io.siddhi.query.api.definition.StreamDefinition;
import org.apache.log4j.Logger;

@Extension(name="inMemory", namespace="sink", description="In-memory transport that can communicate with other in-memory transports within the same JVM, itis assumed that the publisher and subscriber of a topic uses same event schema (stream definition).", parameters={@Parameter(name="topic", type={DataType.STRING}, description="Event will be delivered to allthe subscribers of the same topic")}, examples={@Example(syntax="@sink(type='inMemory', @map(type='passThrough'))\ndefine stream BarStream (symbol string, price float, volume long)", description="In this example BarStream uses inMemory transport which emit the Siddhi events internally without using external transport and transformation.")})
public class InMemorySink
extends Sink {
    private static final Logger log = Logger.getLogger(InMemorySink.class);
    private static final String TOPIC_KEY = "topic";
    private Option topicOption;

    @Override
    public Class[] getSupportedInputEventClasses() {
        return new Class[]{Object.class};
    }

    @Override
    public String[] getSupportedDynamicOptions() {
        return new String[]{TOPIC_KEY};
    }

    @Override
    protected void init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder, ConfigReader sinkConfigReader, SiddhiAppContext siddhiAppContext) {
        this.topicOption = optionHolder.validateAndGetOption(TOPIC_KEY);
    }

    @Override
    public void connect() throws ConnectionUnavailableException {
    }

    @Override
    public void disconnect() {
    }

    @Override
    public void destroy() {
    }

    @Override
    public void publish(Object payload, DynamicOptions dynamicOptions) throws ConnectionUnavailableException {
        try {
            InMemoryBroker.publish(this.topicOption.getValue(dynamicOptions), payload);
        }
        catch (SubscriberUnAvailableException e) {
            this.onError(payload, e);
        }
    }
}

