/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.kafka.multidc.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.kafka.multidc.source.Interceptor;
import io.siddhi.extension.io.kafka.multidc.source.SourceSynchronizer;
import io.siddhi.extension.io.kafka.source.KafkaSource;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;

@Extension(name="kafkaMultiDC", namespace="source", description="The Kafka Multi-Datacenter(DC) source receives records from the same topic in brokers deployed in two different kafka clusters. It filters out all the duplicate messages and ensuresthat the events are received in the correct order using sequential numbering. It receives events in formats such as `TEXT`, `XML` JSON` and `Binary`.The Kafka Source creates the default partition '0' for a given topic, if the topic has not yet been created in the Kafka cluster.", parameters={@Parameter(name="bootstrap.servers", description="This contains the kafka server list which the kafka source listens to. This is given using comma-separated values. eg: 'localhost:9092,localhost:9093' ", type={DataType.STRING}), @Parameter(name="topic", description="This is the topic that the source listens to. eg: 'topic_one' ", type={DataType.STRING}), @Parameter(name="partition.no", description="This is the partition number of the given topic.", type={DataType.INT}, optional=true, defaultValue="0"), @Parameter(name="is.binary.message", description="In order to receive the binary events via the Kafka Multi-DC source, the value of this parameter needs to be set to 'True'.", type={DataType.BOOL}, optional=true, defaultValue="false"), @Parameter(name="optional.configuration", description="This contains all the other possible configurations with which the consumer can be created.eg: producer.type:async,batch.size:200", type={DataType.STRING}, optional=true, defaultValue="null")}, examples={@Example(syntax="@App:name('TestExecutionPlan') \ndefine stream BarStream (symbol string, price float, volume long); \n@info(name = 'query1') \n@source(type='kafkaMultiDC', topic='kafka_topic', bootstrap.servers='host1:9092,host1:9093', partition.no='1', @map(type='xml'))\nDefine stream FooStream (symbol string, price float, volume long);\nfrom FooStream select symbol, price, volume insert into BarStream;\n", description="The following query listens to 'kafka_topic' topic, deployed in the broker host1:9092 and host1:9093, with partition 1. A thread is created for each broker. The receiving xml events are mapped to a siddhi event and sent to the FooStream.")})
public class KafkaMultiDCSource
extends Source<KafkaMultiDCSourceState> {
    private static final String KAFKA_TOPIC = "topic";
    private static final String KAFKA_PARTITION_NO = "partition.no";
    private static final Logger LOG = Logger.getLogger(KafkaMultiDCSource.class);
    private static final String LAST_RECEIVED_SEQ_NO_KEY = "lastConsumedSeqNo";
    private SourceEventListener eventListener;
    private Map<String, KafkaSource> sources = new HashMap<String, KafkaSource>();
    private Map<String, StateFactory<KafkaSource.KafkaSourceState>> stateFactories = new HashMap<String, StateFactory<KafkaSource.KafkaSourceState>>();
    private String[] bootstrapServers;
    private SourceSynchronizer synchronizer;

    public StateFactory<KafkaMultiDCSourceState> init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] transportPropertyNames, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.eventListener = sourceEventListener;
        String serverList = optionHolder.validateAndGetStaticValue("bootstrap.servers");
        boolean isBinaryMessage = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue("is.binary.message", "false"));
        this.bootstrapServers = serverList.split(",");
        if (this.bootstrapServers.length != 2) {
            throw new SiddhiAppValidationException("There should be two servers listed in 'bootstrap.servers' configuration to ensure fault tolerant kafka messaging.");
        }
        this.synchronizer = new SourceSynchronizer(sourceEventListener, this.bootstrapServers, 1000, 10);
        LOG.info((Object)("Initializing kafka source for bootstrap server :" + this.bootstrapServers[0]));
        Interceptor interceptor = new Interceptor(this.bootstrapServers[0], this.synchronizer, isBinaryMessage);
        OptionHolder options = this.createOptionHolders(this.bootstrapServers[0], optionHolder);
        KafkaSource source = new KafkaSource();
        this.stateFactories.put(this.bootstrapServers[0], source.init(interceptor, options, transportPropertyNames, configReader, siddhiAppContext));
        this.sources.put(this.bootstrapServers[0], source);
        LOG.info((Object)("Initializing kafka source for bootstrap server :" + this.bootstrapServers[1]));
        interceptor = new Interceptor(this.bootstrapServers[1], this.synchronizer, isBinaryMessage);
        options = this.createOptionHolders(this.bootstrapServers[1], optionHolder);
        source = new KafkaSource();
        this.stateFactories.put(this.bootstrapServers[1], source.init(interceptor, options, transportPropertyNames, configReader, siddhiAppContext));
        this.sources.put(this.bootstrapServers[1], source);
        return new KafkaMultiDCSourceStateFactory(this.stateFactories);
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class, byte[].class};
    }

    public void connect(Source.ConnectionCallback connectionCallback, KafkaMultiDCSourceState kafkaMultiDCSourceState) throws ConnectionUnavailableException {
        StringBuilder errorMessage = new StringBuilder();
        for (Map.Entry<String, KafkaSource> entry : this.sources.entrySet()) {
            try {
                KafkaSource.KafkaSourceState kafkaSourceState = kafkaMultiDCSourceState.kafkaSourceStateMap.get(entry.getKey().toString());
                entry.getValue().connect(connectionCallback, kafkaSourceState);
                LOG.info((Object)("Connect to bootstrap server " + entry.getKey()));
            }
            catch (ConnectionUnavailableException e) {
                errorMessage.append("Error occurred while connecting to ").append((Object)entry.getKey()).append(":").append(e.getMessage()).append("\n");
            }
        }
        if (!errorMessage.toString().isEmpty()) {
            LOG.error((Object)("Error while trying to connect boot strap server(s): " + errorMessage.toString()));
            throw new ConnectionUnavailableException(errorMessage.toString());
        }
    }

    public void disconnect() {
        this.sources.values().forEach(KafkaSource::disconnect);
    }

    public void destroy() {
        this.sources.values().forEach(KafkaSource::destroy);
    }

    public void pause() {
        this.sources.values().forEach(KafkaSource::pause);
    }

    public void resume() {
        this.sources.values().forEach(KafkaSource::resume);
    }

    private OptionHolder createOptionHolders(String server, OptionHolder originalOptionHolder) {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put("bootstrap.servers", server);
        options.put("group.id", UUID.randomUUID().toString());
        options.put("threading.option", "single.thread");
        options.put("seq.enabled", "false");
        String partition = originalOptionHolder.validateAndGetStaticValue(KAFKA_PARTITION_NO, "0");
        options.put("partition.no.list", partition);
        String topic = originalOptionHolder.validateAndGetStaticValue(KAFKA_TOPIC);
        options.put("topic.list", topic);
        String optionalConfigs = originalOptionHolder.validateAndGetStaticValue("optional.configuration", null);
        options.put("optional.configuration", optionalConfigs);
        String isBinaryMessage = originalOptionHolder.validateAndGetStaticValue("is.binary.message", "false");
        options.put("is.binary.message", isBinaryMessage);
        Extension extension = KafkaSource.class.getAnnotation(Extension.class);
        OptionHolder holder = new OptionHolder(this.eventListener.getStreamDefinition(), options, new HashMap(), extension);
        return holder;
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    class KafkaMultiDCSourceStateFactory
    implements StateFactory<KafkaMultiDCSourceState> {
        private Map<String, StateFactory<KafkaSource.KafkaSourceState>> stateFactories;

        public KafkaMultiDCSourceStateFactory(Map<String, StateFactory<KafkaSource.KafkaSourceState>> kafkaStateFactories) {
            this.stateFactories = kafkaStateFactories;
        }

        public KafkaMultiDCSourceState createNewState() {
            HashMap<String, KafkaSource.KafkaSourceState> kafkaSourceStateMap = new HashMap<String, KafkaSource.KafkaSourceState>();
            for (Map.Entry<String, StateFactory<KafkaSource.KafkaSourceState>> entry : this.stateFactories.entrySet()) {
                String sourceKey = entry.getKey();
                StateFactory<KafkaSource.KafkaSourceState> kafkaSourceStateStateFactory = entry.getValue();
                kafkaSourceStateMap.put(sourceKey, (KafkaSource.KafkaSourceState)kafkaSourceStateStateFactory.createNewState());
            }
            return new KafkaMultiDCSourceState(kafkaSourceStateMap);
        }
    }

    class KafkaMultiDCSourceState
    extends State {
        Map<String, KafkaSource.KafkaSourceState> kafkaSourceStateMap = new HashMap<String, KafkaSource.KafkaSourceState>();

        KafkaMultiDCSourceState(Map<String, KafkaSource.KafkaSourceState> kafkaSourceStateMap) {
            this.kafkaSourceStateMap = kafkaSourceStateMap;
        }

        public Map<String, Object> snapshot() {
            HashMap<String, Object> state = new HashMap<String, Object>();
            for (Map.Entry<String, KafkaSource.KafkaSourceState> entry : this.kafkaSourceStateMap.entrySet()) {
                state.put(entry.getKey(), entry.getValue().snapshot());
            }
            state.put(KafkaMultiDCSource.LAST_RECEIVED_SEQ_NO_KEY, KafkaMultiDCSource.this.synchronizer.getLastConsumedSeqNo());
            return state;
        }

        public void restore(Map<String, Object> state) {
            KafkaMultiDCSource.this.synchronizer.setLastConsumedSeqNo((Long)state.get(KafkaMultiDCSource.LAST_RECEIVED_SEQ_NO_KEY));
            for (Map.Entry<String, KafkaSource.KafkaSourceState> entry : this.kafkaSourceStateMap.entrySet()) {
                entry.getValue().restore((Map)state.get(entry.getKey()));
            }
            this.kafkaSourceStateMap = (Map)state.get("SOURCE_STATES");
        }

        public boolean canDestroy() {
            return false;
        }
    }
}

