package org.apache.karaf.decanter.collector.kafka;

import java.io.ByteArrayInputStream;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.karaf.decanter.api.marshaller.Unmarshaller;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(name = "org.apache.karaf.decanter.collector.kafka", immediate = true)
/* loaded from: input_file:org/apache/karaf/decanter/collector/kafka/KafkaCollector.class */
public class KafkaCollector implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCollector.class);
    private Dictionary<String, Object> properties;
    private KafkaConsumer<String, String> consumer;
    private String topic;
    private String eventAdminTopic;
    private boolean consuming = false;
    private EventAdmin dispatcher;
    private Unmarshaller unmarshaller;

    @Activate
    public void activate(ComponentContext componentContext) {
        this.properties = componentContext.getProperties();
        this.topic = getValue(this.properties, "topic", "decanter");
        this.eventAdminTopic = getValue(this.properties, "event.topics", "decanter/collect/kafka/decanter");
        Properties properties = new Properties();
        properties.put("bootstrap.servers", getValue(this.properties, "bootstrap.servers", "localhost:9092"));
        properties.put("group.id", getValue(this.properties, "group.id", "decanter"));
        properties.put("enable.auto.commit", getValue(this.properties, "enable.auto.commit", "true"));
        properties.put("auto.commit.interval.ms", getValue(this.properties, "auto.commit.interval.ms", "1000"));
        properties.put("session.timeout.ms", getValue(this.properties, "session.timeout.ms", "30000"));
        properties.put("key.deserializer", getValue(this.properties, "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"));
        properties.put("value.deserializer", getValue(this.properties, "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"));
        String value = getValue(this.properties, "security.protocol", null);
        if (value != null) {
            properties.put("security.protocol", value);
        }
        String value2 = getValue(this.properties, "ssl.truststore.location", null);
        if (value2 != null) {
            properties.put("ssl.truststore.location", value2);
        }
        String value3 = getValue(this.properties, "ssl.truststore.password", null);
        if (value3 != null) {
            properties.put("ssl.truststore.password", value3);
        }
        String value4 = getValue(this.properties, "ssl.keystore.location", null);
        if (value4 != null) {
            properties.put("ssl.keystore.location", value4);
        }
        String value5 = getValue(this.properties, "ssl.keystore.password", null);
        if (value5 != null) {
            properties.put("ssl.keystore.password", value5);
        }
        String value6 = getValue(this.properties, "ssl.key.password", null);
        if (value6 != null) {
            properties.put("ssl.key.password", value6);
        }
        String value7 = getValue(this.properties, "ssl.provider", null);
        if (value7 != null) {
            properties.put("ssl.provider", value7);
        }
        String value8 = getValue(this.properties, "ssl.cipher.suites", null);
        if (value8 != null) {
            properties.put("ssl.cipher.suites", value8);
        }
        String value9 = getValue(this.properties, "ssl.enabled.protocols", null);
        if (value9 != null) {
            properties.put("ssl.enabled.protocols", value9);
        }
        String value10 = getValue(this.properties, "ssl.truststore.type", null);
        if (value10 != null) {
            properties.put("ssl.truststore.type", value10);
        }
        String value11 = getValue(this.properties, "ssl.keystore.type", null);
        if (value11 != null) {
            properties.put("ssl.keystore.type", value11);
        }
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(null);
            this.consumer = new KafkaConsumer<>(properties);
            this.consumer.subscribe(Arrays.asList(this.topic));
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            this.consuming = true;
            Executors.newSingleThreadExecutor().execute(this);
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    @Deactivate
    public void deactivate(ComponentContext componentContext) {
        this.consuming = false;
        this.consumer.close();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.consuming) {
            try {
                consume();
            } catch (Exception e) {
                LOGGER.warn(e.getMessage(), e);
            }
        }
    }

    private void consume() throws UnsupportedEncodingException {
        ConsumerRecords poll = this.consumer.poll(10000L);
        if (poll.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap();
        try {
            hashMap.put("hostAddress", InetAddress.getLocalHost().getHostAddress());
            hashMap.put("hostName", InetAddress.getLocalHost().getHostName());
        } catch (Exception e) {
            LOGGER.warn("Can't populate local host name and address", e);
        }
        Enumeration<String> keys = this.properties.keys();
        while (keys.hasMoreElements()) {
            String nextElement = keys.nextElement();
            hashMap.put(nextElement, this.properties.get(nextElement));
        }
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            hashMap.putAll(this.unmarshaller.unmarshal(new ByteArrayInputStream(((String) ((ConsumerRecord) it.next()).value()).getBytes("utf-8"))));
        }
        hashMap.put("type", "kafka");
        String property = System.getProperty("karaf.name");
        if (property != null) {
            hashMap.put("karafName", property);
        }
        this.dispatcher.postEvent(new Event(this.eventAdminTopic, hashMap));
    }

    @Reference
    public void setDispatcher(EventAdmin eventAdmin) {
        this.dispatcher = eventAdmin;
    }

    @Reference
    public void setUnmarshaller(Unmarshaller unmarshaller) {
        this.unmarshaller = unmarshaller;
    }

    private String getValue(Dictionary<String, Object> dictionary, String str, String str2) {
        String str3 = (String) dictionary.get(str);
        return str3 != null ? str3 : str2;
    }
}
