package org.wso2.carbon.event.input.adapter.kafka090;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterListener;

/* loaded from: input_file:org/wso2/carbon/event/input/adapter/kafka090/KafkaConsumerThread.class */
public class KafkaConsumerThread implements Runnable {
    private InputEventAdapterListener brokerListener;
    private int tenantId;
    private Log log = LogFactory.getLog(KafkaConsumerThread.class);
    private final Lock consumerLock = new ReentrantLock();
    private final KafkaConsumer<byte[], byte[]> consumer;

    public KafkaConsumerThread(Properties properties, String str, InputEventAdapterListener inputEventAdapterListener, int i) {
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(Arrays.asList(str));
        this.log.info("Subscribed for topic: " + str);
        this.brokerListener = inputEventAdapterListener;
        this.tenantId = i;
    }

    @Override // java.lang.Runnable
    public void run() {
        Lock lock = this.consumerLock;
        while (true) {
            try {
                try {
                    PrivilegedCarbonContext.startTenantFlow();
                    PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(this.tenantId);
                    ConsumerRecords consumerRecords = null;
                    try {
                        try {
                            lock.lock();
                            consumerRecords = this.consumer.poll(100L);
                            lock.unlock();
                        } finally {
                        }
                    } catch (CommitFailedException e) {
                        this.log.warn("Consumer poll() failed." + e.getMessage(), e);
                        lock.unlock();
                    }
                    if (null != consumerRecords) {
                        Iterator it = consumerRecords.iterator();
                        while (it.hasNext()) {
                            this.brokerListener.onEvent(((ConsumerRecord) it.next()).value());
                        }
                        try {
                            try {
                                lock.lock();
                                if (!consumerRecords.isEmpty()) {
                                    this.consumer.commitAsync();
                                }
                                lock.unlock();
                            } finally {
                            }
                        } catch (CommitFailedException e2) {
                            this.log.error("Kafka commit failed for topic kafka_result_topic", e2);
                            lock.unlock();
                        }
                    }
                    try {
                        Thread.sleep(1L);
                    } catch (InterruptedException e3) {
                        Thread.currentThread().interrupt();
                    }
                    PrivilegedCarbonContext.endTenantFlow();
                } catch (Throwable th) {
                    PrivilegedCarbonContext.endTenantFlow();
                    throw th;
                }
            } catch (Throwable th2) {
                this.log.error("Error while consuming event ", th2);
                PrivilegedCarbonContext.endTenantFlow();
            }
        }
    }
}
