package org.wso2.carbon.event.input.adapter.kafka;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterListener;

/* loaded from: input_file:org/wso2/carbon/event/input/adapter/kafka/KafkaConsumer.class */
public class KafkaConsumer implements Runnable {
    private KafkaStream stream;
    private InputEventAdapterListener brokerListener;
    private String event;
    private int tenantId;
    private Log log = LogFactory.getLog(KafkaConsumer.class);

    public KafkaConsumer(KafkaStream kafkaStream, InputEventAdapterListener inputEventAdapterListener, int i) {
        this.stream = kafkaStream;
        this.brokerListener = inputEventAdapterListener;
        this.tenantId = i;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            PrivilegedCarbonContext.startTenantFlow();
            PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(this.tenantId);
            ConsumerIterator it = this.stream.iterator();
            while (it.hasNext()) {
                try {
                    this.event = new String((byte[]) it.next().message());
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Event received in Kafka Event Adaptor - " + this.event);
                    }
                    this.brokerListener.onEvent(this.event);
                } catch (Throwable th) {
                    this.log.error("Error while transforming the event : " + this.event, th);
                }
            }
        } catch (Throwable th2) {
            this.log.error("Error while consuming event ", th2);
        } finally {
            PrivilegedCarbonContext.endTenantFlow();
        }
    }
}
