package org.wso2.carbon.event.input.adaptor.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.ResourceBundle;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import kafka.consumer.ConsumerConfig;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.event.input.adaptor.core.AbstractInputEventAdaptor;
import org.wso2.carbon.event.input.adaptor.core.InputEventAdaptorListener;
import org.wso2.carbon.event.input.adaptor.core.Property;
import org.wso2.carbon.event.input.adaptor.core.config.InputEventAdaptorConfiguration;
import org.wso2.carbon.event.input.adaptor.core.message.config.InputEventAdaptorMessageConfiguration;
import org.wso2.carbon.event.input.adaptor.kafka.internal.LateStartAdaptorListener;
import org.wso2.carbon.event.input.adaptor.kafka.internal.ds.KafkaEventAdaptorServiceHolder;
import org.wso2.carbon.event.input.adaptor.kafka.internal.util.ConsumerKafkaConstants;

/* loaded from: input_file:org/wso2/carbon/event/input/adaptor/kafka/KafkaEventAdaptorType.class */
public final class KafkaEventAdaptorType extends AbstractInputEventAdaptor implements LateStartAdaptorListener {
    private ResourceBundle resourceBundle;
    private static final Log log = LogFactory.getLog(KafkaEventAdaptorType.class);
    private static KafkaEventAdaptorType kafkaAdaptorEventAdaptor = new KafkaEventAdaptorType();
    private boolean readyToPoll = false;
    ConcurrentHashMap<Integer, ConcurrentHashMap<String, ConsumerKafkaAdaptor>> consumerAdaptorMap = new ConcurrentHashMap<>();
    List<LateStartAdaptorConfig> lateStartAdaptorConfigList = new ArrayList();

    /* loaded from: input_file:org/wso2/carbon/event/input/adaptor/kafka/KafkaEventAdaptorType$LateStartAdaptorConfig.class */
    class LateStartAdaptorConfig {
        InputEventAdaptorMessageConfiguration inputEventAdaptorMessageConfiguration;
        InputEventAdaptorListener inputEventAdaptorListener;
        InputEventAdaptorConfiguration inputEventAdaptorConfiguration;
        AxisConfiguration axisConfiguration;
        String subscriptionId;
        int tenantId;

        public LateStartAdaptorConfig(InputEventAdaptorMessageConfiguration inputEventAdaptorMessageConfiguration, InputEventAdaptorListener inputEventAdaptorListener, InputEventAdaptorConfiguration inputEventAdaptorConfiguration, AxisConfiguration axisConfiguration, String str, int i) {
            this.inputEventAdaptorMessageConfiguration = inputEventAdaptorMessageConfiguration;
            this.inputEventAdaptorListener = inputEventAdaptorListener;
            this.inputEventAdaptorConfiguration = inputEventAdaptorConfiguration;
            this.axisConfiguration = axisConfiguration;
            this.subscriptionId = str;
            this.tenantId = i;
        }

        public InputEventAdaptorMessageConfiguration getInputEventAdaptorMessageConfiguration() {
            return this.inputEventAdaptorMessageConfiguration;
        }

        public void setInputEventAdaptorMessageConfiguration(InputEventAdaptorMessageConfiguration inputEventAdaptorMessageConfiguration) {
            this.inputEventAdaptorMessageConfiguration = inputEventAdaptorMessageConfiguration;
        }

        public InputEventAdaptorListener getInputEventAdaptorListener() {
            return this.inputEventAdaptorListener;
        }

        public void setInputEventAdaptorListener(InputEventAdaptorListener inputEventAdaptorListener) {
            this.inputEventAdaptorListener = inputEventAdaptorListener;
        }

        public InputEventAdaptorConfiguration getInputEventAdaptorConfiguration() {
            return this.inputEventAdaptorConfiguration;
        }

        public void setInputEventAdaptorConfiguration(InputEventAdaptorConfiguration inputEventAdaptorConfiguration) {
            this.inputEventAdaptorConfiguration = inputEventAdaptorConfiguration;
        }

        public AxisConfiguration getAxisConfiguration() {
            return this.axisConfiguration;
        }

        public void setAxisConfiguration(AxisConfiguration axisConfiguration) {
            this.axisConfiguration = axisConfiguration;
        }

        public String getSubscriptionId() {
            return this.subscriptionId;
        }

        public void setSubscriptionId(String str) {
            this.subscriptionId = str;
        }

        public int getTenantId() {
            return this.tenantId;
        }

        public void setTenantId(int i) {
            this.tenantId = i;
        }
    }

    private KafkaEventAdaptorType() {
    }

    public static KafkaEventAdaptorType getInstance() {
        return kafkaAdaptorEventAdaptor;
    }

    protected String getName() {
        return ConsumerKafkaConstants.ADAPTOR_TYPE_KAFKA;
    }

    protected List<String> getSupportedInputMessageTypes() {
        ArrayList arrayList = new ArrayList();
        arrayList.add("json");
        arrayList.add("xml");
        arrayList.add("text");
        return arrayList;
    }

    protected void init() {
        this.resourceBundle = ResourceBundle.getBundle("org.wso2.carbon.event.input.adaptor.kafka.i18n.Resources", Locale.getDefault());
        KafkaEventAdaptorServiceHolder.addLateStartAdaptorListener(this);
    }

    protected List<Property> getInputAdaptorProperties() {
        ArrayList arrayList = new ArrayList();
        Property property = new Property(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_ZOOKEEPER_CONNECT);
        property.setDisplayName(this.resourceBundle.getString(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_ZOOKEEPER_CONNECT));
        property.setHint(this.resourceBundle.getString(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_ZOOKEEPER_CONNECT_HINT));
        property.setRequired(true);
        arrayList.add(property);
        Property property2 = new Property(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_GROUP_ID);
        property2.setDisplayName(this.resourceBundle.getString(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_GROUP_ID));
        property2.setHint(this.resourceBundle.getString(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_GROUP_ID_hint));
        property2.setRequired(true);
        arrayList.add(property2);
        Property property3 = new Property(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_THREADS);
        property3.setDisplayName(this.resourceBundle.getString(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_THREADS));
        property3.setHint(this.resourceBundle.getString(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_THREADS_HINT));
        property3.setRequired(true);
        arrayList.add(property3);
        Property property4 = new Property(ConsumerKafkaConstants.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES);
        property4.setDisplayName(this.resourceBundle.getString(ConsumerKafkaConstants.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES));
        property4.setHint(this.resourceBundle.getString(ConsumerKafkaConstants.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES_HINT));
        arrayList.add(property4);
        return arrayList;
    }

    protected List<Property> getInputMessageProperties() {
        ArrayList arrayList = new ArrayList();
        Property property = new Property(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_TOPIC);
        property.setDisplayName(this.resourceBundle.getString(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_TOPIC));
        property.setRequired(true);
        arrayList.add(property);
        return arrayList;
    }

    public String subscribe(InputEventAdaptorMessageConfiguration inputEventAdaptorMessageConfiguration, InputEventAdaptorListener inputEventAdaptorListener, InputEventAdaptorConfiguration inputEventAdaptorConfiguration, AxisConfiguration axisConfiguration) {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(true);
        String uuid = UUID.randomUUID().toString();
        if (this.readyToPoll) {
            createKafkaAdaptorListener(inputEventAdaptorMessageConfiguration, inputEventAdaptorListener, inputEventAdaptorConfiguration, axisConfiguration, uuid, tenantId);
        } else {
            this.lateStartAdaptorConfigList.add(new LateStartAdaptorConfig(inputEventAdaptorMessageConfiguration, inputEventAdaptorListener, inputEventAdaptorConfiguration, axisConfiguration, uuid, tenantId));
        }
        return uuid;
    }

    public void unsubscribe(InputEventAdaptorMessageConfiguration inputEventAdaptorMessageConfiguration, InputEventAdaptorConfiguration inputEventAdaptorConfiguration, AxisConfiguration axisConfiguration, String str) {
        ConsumerKafkaAdaptor consumerKafkaAdaptor;
        if (this.consumerAdaptorMap != null) {
            ConcurrentHashMap<String, ConsumerKafkaAdaptor> concurrentHashMap = this.consumerAdaptorMap.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(true)));
            if (concurrentHashMap == null || (consumerKafkaAdaptor = concurrentHashMap.get(str)) == null) {
                return;
            }
            consumerKafkaAdaptor.shutdown();
            concurrentHashMap.remove(str);
        }
    }

    private static ConsumerConfig createConsumerConfig(String str, String str2, String str3) {
        String[] split;
        Properties properties = new Properties();
        properties.put("zookeeper.connect", str);
        properties.put(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_GROUP_ID, str2);
        if (str3 != null && (split = str3.split(",")) != null && split.length > 0) {
            for (String str4 : split) {
                String[] split2 = str4.split(":");
                if (split2.length == 2) {
                    properties.put(split2[0], split2[1]);
                } else {
                    log.warn("Optional configuration property not defined in the correct format");
                }
            }
        }
        return new ConsumerConfig(properties);
    }

    @Override // org.wso2.carbon.event.input.adaptor.kafka.internal.LateStartAdaptorListener
    public void tryStartAdaptor() {
        log.info("Kafka input event adaptor loading listeners ");
        this.readyToPoll = true;
        for (LateStartAdaptorConfig lateStartAdaptorConfig : this.lateStartAdaptorConfigList) {
            createKafkaAdaptorListener(lateStartAdaptorConfig.getInputEventAdaptorMessageConfiguration(), lateStartAdaptorConfig.getInputEventAdaptorListener(), lateStartAdaptorConfig.getInputEventAdaptorConfiguration(), lateStartAdaptorConfig.getAxisConfiguration(), lateStartAdaptorConfig.getSubscriptionId(), lateStartAdaptorConfig.getTenantId());
        }
    }

    private void createKafkaAdaptorListener(InputEventAdaptorMessageConfiguration inputEventAdaptorMessageConfiguration, InputEventAdaptorListener inputEventAdaptorListener, InputEventAdaptorConfiguration inputEventAdaptorConfiguration, AxisConfiguration axisConfiguration, String str, int i) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(inputEventAdaptorConfiguration.getInputProperties());
        String str2 = (String) hashMap.get(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_ZOOKEEPER_CONNECT);
        String str3 = (String) hashMap.get(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_GROUP_ID);
        String str4 = (String) hashMap.get(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_THREADS);
        String str5 = (String) hashMap.get(ConsumerKafkaConstants.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES);
        int parseInt = Integer.parseInt(str4);
        ConsumerKafkaAdaptor consumerKafkaAdaptor = new ConsumerKafkaAdaptor((String) inputEventAdaptorMessageConfiguration.getInputMessageProperties().get(ConsumerKafkaConstants.ADAPTOR_SUSCRIBER_TOPIC), createConsumerConfig(str2, str3, str5));
        ConcurrentHashMap<String, ConsumerKafkaAdaptor> concurrentHashMap = this.consumerAdaptorMap.get(Integer.valueOf(i));
        if (concurrentHashMap == null) {
            concurrentHashMap = new ConcurrentHashMap<>();
            this.consumerAdaptorMap.put(Integer.valueOf(i), concurrentHashMap);
        }
        concurrentHashMap.put(str, consumerKafkaAdaptor);
        consumerKafkaAdaptor.run(parseInt, inputEventAdaptorListener);
    }
}
