package org.wso2.carbon.event.input.adapter.kafka;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kafka.consumer.ConsumerConfig;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapter;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterConfiguration;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterListener;
import org.wso2.carbon.event.input.adapter.core.exception.InputEventAdapterException;
import org.wso2.carbon.event.input.adapter.core.exception.TestConnectionNotSupportedException;
import org.wso2.carbon.event.input.adapter.kafka.internal.util.KafkaEventAdapterConstants;

/* loaded from: input_file:org/wso2/carbon/event/input/adapter/kafka/KafkaEventAdapter.class */
public final class KafkaEventAdapter implements InputEventAdapter {
    private final InputEventAdapterConfiguration eventAdapterConfiguration;
    private final Map<String, String> globalProperties;
    private InputEventAdapterListener eventAdaptorListener;
    private final String id = UUID.randomUUID().toString();
    private boolean readyToPoll = true;
    ConcurrentHashMap<Integer, ConcurrentHashMap<String, ConsumerKafkaAdaptor>> consumerAdaptorMap = new ConcurrentHashMap<>();
    private static final Log log = LogFactory.getLog(KafkaEventAdapter.class);
    public static ExecutorService executorService = new ThreadPoolExecutor(8, 100, 20, TimeUnit.SECONDS, new LinkedBlockingQueue(10000));

    public KafkaEventAdapter(InputEventAdapterConfiguration inputEventAdapterConfiguration, Map<String, String> map) {
        this.eventAdapterConfiguration = inputEventAdapterConfiguration;
        this.globalProperties = map;
    }

    public void init(InputEventAdapterListener inputEventAdapterListener) throws InputEventAdapterException {
        this.eventAdaptorListener = inputEventAdapterListener;
    }

    public void testConnect() throws TestConnectionNotSupportedException {
        throw new TestConnectionNotSupportedException("not-supported");
    }

    public void connect() {
        String str = (String) this.eventAdapterConfiguration.getProperties().get("topic");
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(true);
        String uuid = UUID.randomUUID().toString();
        if (this.readyToPoll) {
            createKafkaAdaptorListener(this.eventAdaptorListener, this.eventAdapterConfiguration, uuid, tenantId);
        } else {
            this.readyToPoll = true;
            log.debug("Adapter " + this.eventAdapterConfiguration.getName() + " readyToPoll " + str);
        }
    }

    public void disconnect() {
        String str = (String) this.eventAdapterConfiguration.getProperties().get("topic");
        if (this.consumerAdaptorMap != null) {
            this.consumerAdaptorMap = null;
            this.readyToPoll = false;
            log.debug("Adapter " + this.eventAdapterConfiguration.getName() + " disconnected " + str);
        }
    }

    public void destroy() {
    }

    public InputEventAdapterListener getEventAdaptorListener() {
        return this.eventAdaptorListener;
    }

    public int hashCode() {
        return this.id.hashCode();
    }

    private static ConsumerConfig createConsumerConfig(String str, String str2, String str3) {
        String[] split;
        Properties properties = new Properties();
        properties.put(KafkaEventAdapterConstants.ADAPTOR_SUSCRIBER_ZOOKEEPER_CONNECT, str);
        properties.put(KafkaEventAdapterConstants.ADAPTOR_SUSCRIBER_GROUP_ID, str2);
        if (str3 != null && (split = str3.split(",")) != null && split.length > 0) {
            for (String str4 : split) {
                String[] split2 = str4.split(":");
                if (split2.length == 2) {
                    properties.put(split2[0], split2[1]);
                } else {
                    log.warn("Optional configuration property not defined in the correct format.\nRequired - property_name1:property_value1,property_name2:property_value2\nFound - " + str3);
                }
            }
        }
        return new ConsumerConfig(properties);
    }

    private void createKafkaAdaptorListener(InputEventAdapterListener inputEventAdapterListener, InputEventAdapterConfiguration inputEventAdapterConfiguration, String str, int i) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(inputEventAdapterConfiguration.getProperties());
        String str2 = (String) hashMap.get(KafkaEventAdapterConstants.ADAPTOR_SUSCRIBER_ZOOKEEPER_CONNECT);
        String str3 = (String) hashMap.get(KafkaEventAdapterConstants.ADAPTOR_SUSCRIBER_GROUP_ID);
        String str4 = (String) hashMap.get(KafkaEventAdapterConstants.ADAPTOR_SUSCRIBER_THREADS);
        String str5 = (String) hashMap.get(KafkaEventAdapterConstants.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES);
        int parseInt = Integer.parseInt(str4);
        ConsumerKafkaAdaptor consumerKafkaAdaptor = new ConsumerKafkaAdaptor((String) inputEventAdapterConfiguration.getProperties().get("topic"), i, createConsumerConfig(str2, str3, str5));
        ConcurrentHashMap<String, ConsumerKafkaAdaptor> concurrentHashMap = this.consumerAdaptorMap.get(Integer.valueOf(i));
        if (concurrentHashMap == null) {
            concurrentHashMap = new ConcurrentHashMap<>();
            this.consumerAdaptorMap.put(Integer.valueOf(i), concurrentHashMap);
        }
        concurrentHashMap.put(str, consumerKafkaAdaptor);
        consumerKafkaAdaptor.run(parseInt, inputEventAdapterListener);
    }
}
