/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.kafka.source;

import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.kafka.source.KafkaSource;
import io.siddhi.extension.io.kafka.source.SequenceKey;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.Logger;

public class KafkaConsumerThread
implements Runnable {
    private static final Logger LOG = Logger.getLogger(KafkaConsumerThread.class);
    private final KafkaConsumer<byte[], byte[]> consumer;
    private final Lock consumerLock = new ReentrantLock();
    private final String[] partitions;
    private SourceEventListener sourceEventListener;
    private String[] topics;
    private volatile boolean paused;
    private volatile boolean inactive;
    private List<TopicPartition> partitionsList = new ArrayList<TopicPartition>();
    private String consumerThreadId;
    private boolean isPartitionWiseThreading = false;
    private boolean isBinaryMessage = false;
    private boolean enableOffsetCommit = false;
    private boolean enableAutoCommit = false;
    private boolean enableAsyncCommit;
    private boolean consumerClosed;
    private ReentrantLock lock;
    private Condition condition;
    private KafkaSource.KafkaSourceState kafkaSourceState;
    private String[] requiredProperties;
    private int trpLength;

    KafkaConsumerThread(SourceEventListener sourceEventListener, String[] topics, String[] partitions, Properties props, boolean isPartitionWiseThreading, boolean isBinaryMessage, boolean enableOffsetCommit, boolean enableAsyncCommit, String[] requiredProperties) {
        this.consumer = new KafkaConsumer(props);
        this.sourceEventListener = sourceEventListener;
        this.topics = topics;
        this.partitions = partitions;
        this.isPartitionWiseThreading = isPartitionWiseThreading;
        this.isBinaryMessage = isBinaryMessage;
        this.enableOffsetCommit = enableOffsetCommit;
        this.enableAutoCommit = Boolean.parseBoolean(props.getProperty("enable.auto.commit", "true"));
        this.consumerThreadId = this.buildId();
        this.enableAsyncCommit = enableAsyncCommit;
        this.lock = new ReentrantLock();
        this.condition = this.lock.newCondition();
        this.requiredProperties = requiredProperties;
        this.trpLength = requiredProperties != null && requiredProperties.length > 0 ? requiredProperties.length : 0;
        if (null != partitions) {
            for (String topic : topics) {
                for (String partition1 : partitions) {
                    TopicPartition partition = new TopicPartition(topic, Integer.parseInt(partition1));
                    LOG.info((Object)("Adding partition " + partition1 + " for topic: " + topic));
                    this.partitionsList.add(partition);
                }
                LOG.info((Object)("Adding partitions " + Arrays.toString(partitions) + " for topic: " + topic));
                this.consumer.assign(this.partitionsList);
            }
        } else {
            this.consumer.subscribe(Arrays.asList(topics));
        }
        this.consumerClosed = false;
        LOG.info((Object)("Subscribed for topics: " + Arrays.toString(topics)));
    }

    void pause() {
        this.paused = true;
    }

    void resume() {
        this.restore();
        this.paused = false;
        try {
            this.lock.lock();
            this.condition.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void restore() {
        Lock consumerLock = this.consumerLock;
        if (this.kafkaSourceState != null && this.kafkaSourceState.getTopicOffsetMap() != null) {
            for (String topic : this.topics) {
                Map<Integer, Long> offsetMap = this.kafkaSourceState.getTopicOffsetMap().get(topic);
                if (null == offsetMap) continue;
                for (Map.Entry<Integer, Long> entry : offsetMap.entrySet()) {
                    TopicPartition partition = new TopicPartition(topic, entry.getKey().intValue());
                    if (!this.partitionsList.contains(partition)) continue;
                    LOG.info((Object)("Seeking partition: " + partition + " for topic: " + topic + " offset: " + (entry.getValue() + 1L)));
                    try {
                        consumerLock.lock();
                        this.consumer.seek(partition, entry.getValue() + 1L);
                    }
                    finally {
                        consumerLock.unlock();
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Lock consumerLock = this.consumerLock;
        while (!this.inactive) {
            if (this.paused) {
                this.lock.lock();
                try {
                    this.condition.await();
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
                finally {
                    this.lock.unlock();
                }
            }
            ConsumerRecords records = null;
            try {
                consumerLock.lock();
                records = this.consumer.poll(100L);
            }
            catch (CommitFailedException ex) {
                LOG.warn((Object)("Consumer poll() failed." + ex.getMessage()), (Throwable)ex);
            }
            finally {
                consumerLock.unlock();
            }
            if (null != records) {
                Map<SequenceKey, Integer> lastReceivedSeqNoMap = null;
                if (this.kafkaSourceState.getConsumerLastReceivedSeqNoMap() != null) {
                    lastReceivedSeqNoMap = this.kafkaSourceState.getConsumerLastReceivedSeqNoMap().get(this.consumerThreadId);
                }
                for (ConsumerRecord record : records) {
                    String[] trpProperties = new String[this.trpLength];
                    if (!this.consumerClosed) {
                        int partition = record.partition();
                        Object event = record.value();
                        Object eventBody = null;
                        String header = null;
                        long eventTimestamp = System.currentTimeMillis();
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Event received in Kafka Event Adaptor with offSet: " + record.offset() + ", key: " + record.key() + ", topic: " + record.topic() + ", partition: " + partition + ", recordTimestamp: " + record.timestamp() + ", eventTimestamp: " + eventTimestamp + ", checksum: " + record.checksum()));
                        }
                        for (int i = 0; i < this.requiredProperties.length; ++i) {
                            if (this.requiredProperties[i].equalsIgnoreCase("partition")) {
                                trpProperties[i] = String.valueOf(record.partition());
                            }
                            if (this.requiredProperties[i].equalsIgnoreCase("topic")) {
                                trpProperties[i] = record.topic();
                            }
                            if (this.requiredProperties[i].equalsIgnoreCase("key")) {
                                trpProperties[i] = String.valueOf(record.key());
                            }
                            if (this.requiredProperties[i].equalsIgnoreCase("record.timestamp")) {
                                trpProperties[i] = String.valueOf(record.timestamp());
                            }
                            if (this.requiredProperties[i].equalsIgnoreCase("event.timestamp")) {
                                trpProperties[i] = String.valueOf(eventTimestamp);
                            }
                            if (this.requiredProperties[i].equalsIgnoreCase("check.sum")) {
                                trpProperties[i] = String.valueOf(record.checksum());
                            }
                            if (!this.requiredProperties[i].equalsIgnoreCase("offset")) continue;
                            trpProperties[i] = String.valueOf(record.offset());
                        }
                        String transportSyncProperties = "topic:" + record.topic() + ",partition:" + record.partition() + ",offSet:" + record.offset();
                        String[] transportSyncPropertiesArr = new String[]{transportSyncProperties};
                        if (lastReceivedSeqNoMap == null) {
                            this.sourceEventListener.onEvent(event, trpProperties, transportSyncPropertiesArr);
                        } else {
                            if (this.isBinaryMessage) {
                                byte[] byteEvents = (byte[])event;
                                int stringSize = ByteBuffer.wrap(byteEvents).getInt();
                                header = new String(byteEvents, 4, stringSize - 1, Charset.defaultCharset());
                                eventBody = Arrays.copyOfRange(byteEvents, stringSize + 4, byteEvents.length);
                            } else {
                                String stringEvent = event.toString();
                                int headerStartingIndex = stringEvent.indexOf("~");
                                eventBody = stringEvent.substring(headerStartingIndex + 1);
                                if (headerStartingIndex > 0) {
                                    header = stringEvent.substring(0, headerStartingIndex);
                                }
                            }
                            if (null != header && !header.isEmpty()) {
                                String[] headerElements = header.split(":");
                                String sequenceId = headerElements[0];
                                Integer seqNo = Integer.parseInt(headerElements[1]);
                                SequenceKey sequenceKey = new SequenceKey(sequenceId, partition);
                                Integer lastReceivedSeqNo = lastReceivedSeqNoMap.get(sequenceKey);
                                if (lastReceivedSeqNo == null) {
                                    lastReceivedSeqNo = -1;
                                }
                                if (lastReceivedSeqNo < seqNo) {
                                    lastReceivedSeqNoMap.put(sequenceKey, seqNo);
                                    this.sourceEventListener.onEvent(eventBody, trpProperties, transportSyncPropertiesArr);
                                    if (LOG.isDebugEnabled()) {
                                        LOG.debug((Object)("Last Received SeqNo Updated to:" + seqNo + " for SeqKey:[" + sequenceKey.toString() + "] in Kafka consumer thread:" + this.consumerThreadId));
                                    }
                                } else if (LOG.isDebugEnabled()) {
                                    LOG.debug((Object)("Duplicate Message arrived at Kafka Consumer Thread:" + this.consumerThreadId + ". SeqKey:[" + sequenceKey.toString() + "], Latest SeqNo:" + lastReceivedSeqNo + ", this message SeqNo:" + seqNo + ". Ignoring the message."));
                                }
                            } else {
                                LOG.warn((Object)("'Sequenced' option is set to true in Kafka source configuration. But this message does not contain the sequence number in consumer thread :" + this.consumerThreadId + ". Dropping the message"));
                            }
                        }
                        this.kafkaSourceState.getTopicOffsetMap().get(record.topic()).put(record.partition(), record.offset());
                        continue;
                    }
                    this.kafkaSourceState.getTopicOffsetMap().get(record.topic()).put(record.partition(), record.offset());
                    break;
                }
                if (this.enableOffsetCommit && !this.enableAutoCommit) {
                    try {
                        consumerLock.lock();
                        if (!records.isEmpty()) {
                            if (this.enableAsyncCommit) {
                                this.consumer.commitAsync((OffsetCommitCallback)new KafkaOffsetCommitCallback());
                            } else {
                                try {
                                    this.consumer.commitSync();
                                }
                                catch (KafkaException e) {
                                    LOG.error((Object)"Exception occurred when committing offsets Synchronously", (Throwable)e);
                                }
                            }
                        }
                    }
                    catch (CommitFailedException e) {
                        LOG.error((Object)"Kafka commit failed for topic kafka_result_topic", (Throwable)e);
                    }
                    finally {
                        consumerLock.unlock();
                    }
                }
            }
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    void shutdownConsumer() {
        try {
            this.consumerLock.lock();
            this.consumer.close();
            this.consumerClosed = true;
        }
        finally {
            this.consumerLock.unlock();
        }
        this.inactive = true;
    }

    public String buildId() {
        StringBuilder key = new StringBuilder();
        int count = this.topics.length - 1;
        for (String topic : this.topics) {
            key.append(topic);
            if (--count < 0) continue;
            key.append(":");
        }
        if (this.partitions != null && this.isPartitionWiseThreading) {
            count = this.partitions.length - 1;
            key.append("-");
            for (String partition : this.partitions) {
                key.append(partition);
                if (--count < 0) continue;
                key.append(":");
            }
        }
        return key.toString();
    }

    public void setKafkaSourceState(KafkaSource.KafkaSourceState kafkaSourceState) {
        this.kafkaSourceState = kafkaSourceState;
        if (kafkaSourceState != null) {
            if (kafkaSourceState.getConsumerLastReceivedSeqNoMap() != null) {
                kafkaSourceState.getConsumerLastReceivedSeqNoMap().putIfAbsent(this.consumerThreadId, new HashMap());
            }
            for (String topic : this.topics) {
                kafkaSourceState.getTopicOffsetMap().putIfAbsent(topic, new HashMap());
            }
        }
    }

    private static class KafkaOffsetCommitCallback
    implements OffsetCommitCallback {
        private KafkaOffsetCommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception == null) {
                if (LOG.isDebugEnabled()) {
                    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                        LOG.debug((Object)("Asynchronously commit offset done for " + entry.getKey().topic() + " with offset of: " + entry.getValue().offset()));
                    }
                }
            } else {
                if (LOG.isDebugEnabled()) {
                    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                        LOG.debug((Object)("Commit offset exception for " + entry.getKey().topic() + " with offset of: " + entry.getValue().offset()));
                    }
                }
                LOG.error((Object)"Exception occurred when committing offsets asynchronously.", (Throwable)exception);
            }
        }
    }
}

