package org.wso2.carbon.event.output.adapter.kafka;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminUtils;
import kafka.common.TopicExistsException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapter;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration;
import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException;
import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterRuntimeException;
import org.wso2.carbon.event.output.adapter.kafka.internal.util.KafkaEventAdapterConstants;

/* loaded from: input_file:org/wso2/carbon/event/output/adapter/kafka/KafkaEventAdapter.class */
public class KafkaEventAdapter implements OutputEventAdapter {
    private static final Log log = LogFactory.getLog(KafkaEventAdapter.class);
    private static ThreadPoolExecutor threadPoolExecutor;
    private OutputEventAdapterConfiguration eventAdapterConfiguration;
    private Map<String, String> globalProperties;
    private ProducerConfig config;
    private Producer<String, Object> producer;

    public KafkaEventAdapter(OutputEventAdapterConfiguration outputEventAdapterConfiguration, Map<String, String> map) {
        this.eventAdapterConfiguration = outputEventAdapterConfiguration;
        this.globalProperties = map;
    }

    public void init() throws OutputEventAdapterException {
        if (threadPoolExecutor == null) {
            threadPoolExecutor = new ThreadPoolExecutor(this.globalProperties.get(KafkaEventAdapterConstants.MIN_THREAD_NAME) != null ? Integer.parseInt(this.globalProperties.get(KafkaEventAdapterConstants.MIN_THREAD_NAME)) : 8, this.globalProperties.get(KafkaEventAdapterConstants.MAX_THREAD_NAME) != null ? Integer.parseInt(this.globalProperties.get(KafkaEventAdapterConstants.MAX_THREAD_NAME)) : 100, this.globalProperties.get(KafkaEventAdapterConstants.DEFAULT_KEEP_ALIVE_TIME_NAME) != null ? Integer.parseInt(this.globalProperties.get(KafkaEventAdapterConstants.DEFAULT_KEEP_ALIVE_TIME_NAME)) : 20L, TimeUnit.SECONDS, new LinkedBlockingQueue(1000));
        }
    }

    public void testConnect() {
        connect();
    }

    public void connect() {
        String[] split;
        Map staticProperties = this.eventAdapterConfiguration.getStaticProperties();
        String str = (String) staticProperties.get(KafkaEventAdapterConstants.ADAPTOR_META_BROKER_LIST);
        log.info(str);
        String str2 = (String) staticProperties.get(KafkaEventAdapterConstants.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES);
        Properties properties = new Properties();
        properties.put("metadata.broker.list", str);
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        if (str2 != null && (split = str2.split(",")) != null && split.length > 0) {
            for (String str3 : split) {
                String[] split2 = str3.split(":");
                if (split2.length == 2) {
                    properties.put(split2[0], split2[1]);
                } else {
                    log.warn("Optional configuration property not defined in the correct format");
                }
            }
        }
        this.config = new ProducerConfig(properties);
        this.producer = new Producer<>(this.config);
        try {
            AdminUtils.createTopic(new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$), "org.wso2.carbon.event.output.adapter.kafka.test", 10, 1, new Properties());
            this.producer.send(new KeyedMessage("org.wso2.carbon.event.output.adapter.kafka.test", "Successfully connected to kafka server"));
        } catch (TopicExistsException e) {
            log.info("test topic already created.");
        } catch (Exception e2) {
            throw new OutputEventAdapterRuntimeException("The adaptor " + this.eventAdapterConfiguration.getName() + " failed to connect to the kafka server ", e2);
        }
    }

    public void publish(Object obj, Map<String, String> map) {
        this.producer.send(new KeyedMessage(map.get(KafkaEventAdapterConstants.ADAPTOR_PUBLISH_TOPIC), obj.toString()));
    }

    public void disconnect() {
        if (this.producer != null) {
            this.producer.close();
        }
    }

    public void destroy() {
    }
}
