package org.wso2.siddhi.extension.output.transport.kafka;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.output.sink.OutputTransport;
import org.wso2.siddhi.core.util.transport.DynamicOptions;
import org.wso2.siddhi.core.util.transport.Option;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

@Extension(name = "kafka", namespace = "outputtransport", description = "")
/* loaded from: input_file:org/wso2/siddhi/extension/output/transport/kafka/KafkaOutputTransport.class */
public class KafkaOutputTransport extends OutputTransport {
    private ScheduledExecutorService executorService;
    private Producer<String, String> producer;
    private Option topicOption = null;
    private String kafkaConnect;
    private String optionalConfigs;
    private Option partitionOption;
    private static final String KAFKA_PUBLISH_TOPIC = "topic";
    private static final String KAFKA_BROKER_LIST = "bootstrap.servers";
    private static final String KAFKA_OPTIONAL_CONFIGURATION_PROPERTIES = "optional.configuration";
    private static final String HEADER_SEPARATOR = ",";
    private static final String ENTRY_SEPARATOR = ":";
    private static final String KAFKA_PARTITION_NO = "partition.no";
    private static final Logger log = Logger.getLogger(KafkaOutputTransport.class);

    /* loaded from: input_file:org/wso2/siddhi/extension/output/transport/kafka/KafkaOutputTransport$KafkaSender.class */
    private class KafkaSender implements Runnable {
        String topic;
        Object message;
        String partitionNo;

        KafkaSender(String str, String str2, Object obj) {
            this.topic = str;
            this.message = obj;
            this.partitionNo = str2;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (null == this.partitionNo) {
                    KafkaOutputTransport.this.producer.send(new ProducerRecord(this.topic, this.message.toString()));
                } else {
                    KafkaOutputTransport.this.producer.send(new ProducerRecord(this.topic, this.partitionNo, this.message.toString()));
                }
            } catch (Throwable th) {
                KafkaOutputTransport.log.error("Unexpected error when sending event via Kafka Output Adapter:" + th.getMessage(), th);
            }
        }
    }

    protected void init(StreamDefinition streamDefinition, OptionHolder optionHolder, ExecutionPlanContext executionPlanContext) {
        this.kafkaConnect = optionHolder.validateAndGetStaticValue(KAFKA_BROKER_LIST);
        this.optionalConfigs = optionHolder.validateAndGetStaticValue(KAFKA_OPTIONAL_CONFIGURATION_PROPERTIES, (String) null);
        this.topicOption = optionHolder.validateAndGetOption(KAFKA_PUBLISH_TOPIC);
        this.partitionOption = optionHolder.getOrCreateOption(KAFKA_PARTITION_NO, (String) null);
        this.executorService = executionPlanContext.getScheduledExecutorService();
    }

    public void connect() throws ConnectionUnavailableException {
        String[] split;
        Properties properties = new Properties();
        properties.put(KAFKA_BROKER_LIST, this.kafkaConnect);
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        if (this.optionalConfigs != null && (split = this.optionalConfigs.split(HEADER_SEPARATOR)) != null && split.length > 0) {
            for (String str : split) {
                try {
                    String[] split2 = str.split(ENTRY_SEPARATOR, 2);
                    properties.put(split2[0], split2[1]);
                } catch (Exception e) {
                    log.warn("Optional property '" + str + "' is not defined in the correct format.", e);
                }
            }
        }
        this.producer = new KafkaProducer(properties);
        log.info("Kafka producer created.");
    }

    public void publish(Object obj, DynamicOptions dynamicOptions) throws ConnectionUnavailableException {
        try {
            this.executorService.submit(new KafkaSender(this.topicOption.getValue(dynamicOptions), this.partitionOption.getValue(dynamicOptions), obj));
        } catch (RejectedExecutionException e) {
            log.error("Job queue is full : " + e.getMessage(), e);
        }
    }

    public void disconnect() {
        if (this.producer != null) {
            this.producer.close();
        }
    }

    public void destroy() {
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{KAFKA_PUBLISH_TOPIC, KAFKA_PARTITION_NO};
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> map) {
    }
}
