package org.wso2.carbon.event.output.adapter.kafka;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.event.output.adapter.core.EventAdapterUtil;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapter;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration;
import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException;
import org.wso2.carbon.event.output.adapter.core.exception.TestConnectionNotSupportedException;
import org.wso2.carbon.event.output.adapter.kafka.internal.util.KafkaEventAdapterConstants;

/* loaded from: input_file:org/wso2/carbon/event/output/adapter/kafka/KafkaEventAdapter.class */
public class KafkaEventAdapter implements OutputEventAdapter {
    private static final Log log = LogFactory.getLog(KafkaEventAdapter.class);
    private static ThreadPoolExecutor threadPoolExecutor;
    private OutputEventAdapterConfiguration eventAdapterConfiguration;
    private Map<String, String> globalProperties;
    private ProducerConfig config;
    private Producer<String, Object> producer;
    private int tenantId;

    /* loaded from: input_file:org/wso2/carbon/event/output/adapter/kafka/KafkaEventAdapter$KafkaSender.class */
    class KafkaSender implements Runnable {
        String topic;
        Object message;

        KafkaSender(String str, Object obj) {
            this.topic = str;
            this.message = obj;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                KafkaEventAdapter.this.producer.send(new KeyedMessage(this.topic, this.message.toString()));
            } catch (Throwable th) {
                KafkaEventAdapter.log.error("Unexpected error when sending event via Kafka Output Adatper:" + th.getMessage(), th);
            }
        }
    }

    public KafkaEventAdapter(OutputEventAdapterConfiguration outputEventAdapterConfiguration, Map<String, String> map) {
        this.eventAdapterConfiguration = outputEventAdapterConfiguration;
        this.globalProperties = map;
    }

    public void init() throws OutputEventAdapterException {
        this.tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        if (threadPoolExecutor == null) {
            threadPoolExecutor = new ThreadPoolExecutor(this.globalProperties.get(KafkaEventAdapterConstants.ADAPTER_MIN_THREAD_POOL_SIZE_NAME) != null ? Integer.parseInt(this.globalProperties.get(KafkaEventAdapterConstants.ADAPTER_MIN_THREAD_POOL_SIZE_NAME)) : 8, this.globalProperties.get(KafkaEventAdapterConstants.ADAPTER_MAX_THREAD_POOL_SIZE_NAME) != null ? Integer.parseInt(this.globalProperties.get(KafkaEventAdapterConstants.ADAPTER_MAX_THREAD_POOL_SIZE_NAME)) : 100, this.globalProperties.get(KafkaEventAdapterConstants.ADAPTER_KEEP_ALIVE_TIME_NAME) != null ? Integer.parseInt(this.globalProperties.get(KafkaEventAdapterConstants.ADAPTER_KEEP_ALIVE_TIME_NAME)) : 20000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(this.globalProperties.get(KafkaEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE_NAME) != null ? Integer.parseInt(this.globalProperties.get(KafkaEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE_NAME)) : 2000), new ThreadFactoryBuilder().setNameFormat("KafkaOutput-" + this.eventAdapterConfiguration.getName() + "-executor-thread-%d").build());
        }
    }

    public void testConnect() throws TestConnectionNotSupportedException {
        throw new TestConnectionNotSupportedException("Test connection is not available");
    }

    public void connect() {
        String[] split;
        Map staticProperties = this.eventAdapterConfiguration.getStaticProperties();
        String str = (String) staticProperties.get(KafkaEventAdapterConstants.ADAPTOR_META_BROKER_LIST);
        String str2 = (String) staticProperties.get(KafkaEventAdapterConstants.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES);
        Properties properties = new Properties();
        properties.put("metadata.broker.list", str);
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        if (str2 != null && (split = str2.split(KafkaEventAdapterConstants.HEADER_SEPARATOR)) != null && split.length > 0) {
            for (String str3 : split) {
                try {
                    String[] split2 = str3.split(KafkaEventAdapterConstants.ENTRY_SEPARATOR, 2);
                    properties.put(split2[0], split2[1]);
                } catch (Exception e) {
                    log.warn("Optional property '" + str3 + "' is not defined in the correct format.", e);
                }
            }
        }
        this.config = new ProducerConfig(properties);
        this.producer = new Producer<>(this.config);
    }

    public void publish(Object obj, Map<String, String> map) {
        try {
            threadPoolExecutor.submit(new KafkaSender(map.get(KafkaEventAdapterConstants.ADAPTOR_PUBLISH_TOPIC), obj));
        } catch (RejectedExecutionException e) {
            EventAdapterUtil.logAndDrop(this.eventAdapterConfiguration.getName(), obj, "Job queue is full", e, log, this.tenantId);
        }
    }

    public void disconnect() {
        if (this.producer != null) {
            this.producer.close();
        }
    }

    public void destroy() {
    }

    public boolean isPolled() {
        return false;
    }
}
