/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kafka.pubsub;

import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.kafka.pubsub.ConsumeKafka;
import org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils;

public abstract class ConsumerLease
implements Closeable,
ConsumerRebalanceListener {
    private final long maxWaitMillis;
    private final Consumer<byte[], byte[]> kafkaConsumer;
    private final ComponentLog logger;
    private final byte[] demarcatorBytes;
    private final String keyEncoding;
    private final String securityProtocol;
    private final String bootstrapServers;
    private boolean poisoned = false;
    private final Map<TopicPartition, BundleTracker> bundleMap = new HashMap<TopicPartition, BundleTracker>();
    private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<TopicPartition, OffsetAndMetadata>();
    private long leaseStartNanos = -1L;
    private boolean lastPollEmpty = false;
    private int totalFlowFiles = 0;
    private ReentrantLock pollingLock = new ReentrantLock();

    ConsumerLease(long maxWaitMillis, Consumer<byte[], byte[]> kafkaConsumer, byte[] demarcatorBytes, String keyEncoding, String securityProtocol, String bootstrapServers, ComponentLog logger) {
        this.maxWaitMillis = maxWaitMillis;
        this.kafkaConsumer = kafkaConsumer;
        this.demarcatorBytes = demarcatorBytes;
        this.keyEncoding = keyEncoding;
        this.securityProtocol = securityProtocol;
        this.bootstrapServers = bootstrapServers;
        this.logger = logger;
    }

    private void resetInternalState() {
        this.bundleMap.clear();
        this.uncommittedOffsetsMap.clear();
        this.leaseStartNanos = -1L;
        this.lastPollEmpty = false;
        this.totalFlowFiles = 0;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        this.logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, this.kafkaConsumer});
        this.commit();
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        this.logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, this.kafkaConsumer});
    }

    void poll() {
        this.pollingLock.lock();
        try {
            ConsumerRecords records = this.kafkaConsumer.poll(10L);
            this.lastPollEmpty = records.count() == 0;
            this.processRecords((ConsumerRecords<byte[], byte[]>)records);
        }
        catch (Throwable t) {
            this.poison();
            throw t;
        }
        finally {
            this.pollingLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    void retainConnection() {
        TopicPartition[] assignments;
        block14: {
            Set assignmentSet;
            block13: {
                this.pollingLock.lock();
                assignments = null;
                try {
                    assignmentSet = this.kafkaConsumer.assignment();
                    if (!assignmentSet.isEmpty()) break block13;
                }
                catch (Throwable throwable) {
                    try {
                        if (assignments == null) throw throwable;
                        this.kafkaConsumer.resume(assignments);
                        throw throwable;
                    }
                    finally {
                        this.pollingLock.unlock();
                    }
                }
                try {
                    if (assignments == null) return;
                    this.kafkaConsumer.resume(assignments);
                    return;
                }
                finally {
                    this.pollingLock.unlock();
                }
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Pausing " + assignmentSet);
            }
            assignments = assignmentSet.toArray(new TopicPartition[assignmentSet.size()]);
            this.kafkaConsumer.pause(assignments);
            this.kafkaConsumer.poll(0L);
            if (!this.logger.isDebugEnabled()) break block14;
            this.logger.debug("Resuming " + assignments);
        }
        try {
            if (assignments == null) return;
            this.kafkaConsumer.resume(assignments);
            return;
        }
        finally {
            this.pollingLock.unlock();
        }
    }

    boolean commit() {
        if (this.uncommittedOffsetsMap.isEmpty()) {
            this.resetInternalState();
            return false;
        }
        try {
            Collection<FlowFile> bundledFlowFiles = this.getBundles();
            if (!bundledFlowFiles.isEmpty()) {
                this.getProcessSession().transfer(bundledFlowFiles, ConsumeKafka.REL_SUCCESS);
            }
            this.getProcessSession().commit();
            this.kafkaConsumer.commitSync(this.uncommittedOffsetsMap);
            this.resetInternalState();
            return true;
        }
        catch (KafkaException kex) {
            this.poison();
            this.logger.warn("Duplicates are likely as we were able to commit the process session but received an exception from Kafka while committing offsets.");
            throw kex;
        }
        catch (Throwable t) {
            this.poison();
            throw t;
        }
    }

    boolean continuePolling() {
        long durationMillis;
        if (this.lastPollEmpty) {
            return false;
        }
        if (this.leaseStartNanos < 0L) {
            this.leaseStartNanos = System.nanoTime();
        }
        if ((durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.leaseStartNanos)) > this.maxWaitMillis) {
            return false;
        }
        if (this.bundleMap.size() > 200) {
            return false;
        }
        return this.totalFlowFiles < 15000;
    }

    private void poison() {
        this.poisoned = true;
    }

    boolean isPoisoned() {
        return this.poisoned;
    }

    public void wakeup() {
        this.kafkaConsumer.wakeup();
    }

    @Override
    public void close() {
        this.resetInternalState();
    }

    public abstract ProcessSession getProcessSession();

    private void processRecords(ConsumerRecords<byte[], byte[]> records) {
        records.partitions().stream().forEach(partition -> {
            List messages = records.records(partition);
            if (!messages.isEmpty()) {
                long maxOffset = messages.stream().mapToLong(record -> record.offset()).max().getAsLong();
                this.uncommittedOffsetsMap.put((TopicPartition)partition, new OffsetAndMetadata(maxOffset + 1L));
                if (this.demarcatorBytes == null) {
                    this.totalFlowFiles += messages.size();
                    messages.stream().forEach(message -> this.writeData(this.getProcessSession(), (ConsumerRecord<byte[], byte[]>)message, (TopicPartition)partition));
                } else {
                    this.writeData(this.getProcessSession(), messages, (TopicPartition)partition);
                }
            }
        });
    }

    private static String encodeKafkaKey(byte[] key, String encoding) {
        if (key == null) {
            return null;
        }
        if (KafkaProcessorUtils.HEX_ENCODING.getValue().equals(encoding)) {
            return DatatypeConverter.printHexBinary((byte[])key);
        }
        if (KafkaProcessorUtils.UTF8_ENCODING.getValue().equals(encoding)) {
            return new String(key, StandardCharsets.UTF_8);
        }
        return null;
    }

    private Collection<FlowFile> getBundles() {
        ArrayList<FlowFile> flowFiles = new ArrayList<FlowFile>();
        for (BundleTracker tracker : this.bundleMap.values()) {
            this.populateAttributes(tracker);
            flowFiles.add(tracker.flowFile);
        }
        return flowFiles;
    }

    private void writeData(ProcessSession session, ConsumerRecord<byte[], byte[]> record, TopicPartition topicPartition) {
        FlowFile flowFile = session.create();
        BundleTracker tracker = new BundleTracker(record, topicPartition, this.keyEncoding);
        tracker.incrementRecordCount(1L);
        byte[] value = (byte[])record.value();
        if (value != null) {
            flowFile = session.write(flowFile, out -> out.write(value));
        }
        tracker.updateFlowFile(flowFile);
        this.populateAttributes(tracker);
        session.transfer(tracker.flowFile, ConsumeKafka.REL_SUCCESS);
    }

    private void writeData(ProcessSession session, List<ConsumerRecord<byte[], byte[]>> records, TopicPartition topicPartition) {
        boolean demarcateFirstRecord;
        FlowFile flowFile;
        ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
        BundleTracker tracker = this.bundleMap.get(topicPartition);
        if (tracker == null) {
            tracker = new BundleTracker(firstRecord, topicPartition, this.keyEncoding);
            flowFile = session.create();
            tracker.updateFlowFile(flowFile);
            demarcateFirstRecord = false;
        } else {
            demarcateFirstRecord = true;
        }
        flowFile = tracker.flowFile;
        tracker.incrementRecordCount(records.size());
        flowFile = session.append(flowFile, out -> {
            boolean useDemarcator = demarcateFirstRecord;
            for (ConsumerRecord record : records) {
                byte[] value;
                if (useDemarcator) {
                    out.write(this.demarcatorBytes);
                }
                if ((value = (byte[])record.value()) != null) {
                    out.write((byte[])record.value());
                }
                useDemarcator = true;
            }
        });
        tracker.updateFlowFile(flowFile);
        this.bundleMap.put(topicPartition, tracker);
    }

    private void populateAttributes(BundleTracker tracker) {
        HashMap<String, String> kafkaAttrs = new HashMap<String, String>();
        kafkaAttrs.put("kafka.offset", String.valueOf(tracker.initialOffset));
        if (tracker.key != null && tracker.totalRecords == 1L) {
            kafkaAttrs.put("kafka.key", tracker.key);
        }
        kafkaAttrs.put("kafka.partition", String.valueOf(tracker.partition));
        kafkaAttrs.put("kafka.topic", tracker.topic);
        if (tracker.totalRecords > 1L) {
            kafkaAttrs.put("kafka.count", String.valueOf(tracker.totalRecords));
        }
        FlowFile newFlowFile = this.getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
        long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.leaseStartNanos);
        String transitUri = KafkaProcessorUtils.buildTransitURI(this.securityProtocol, this.bootstrapServers, tracker.topic);
        this.getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
        tracker.updateFlowFile(newFlowFile);
    }

    private static class BundleTracker {
        final long initialOffset;
        final int partition;
        final String topic;
        final String key;
        FlowFile flowFile;
        long totalRecords = 0L;

        private BundleTracker(ConsumerRecord<byte[], byte[]> initialRecord, TopicPartition topicPartition, String keyEncoding) {
            this.initialOffset = initialRecord.offset();
            this.partition = topicPartition.partition();
            this.topic = topicPartition.topic();
            this.key = ConsumerLease.encodeKafkaKey((byte[])initialRecord.key(), keyEncoding);
        }

        private void incrementRecordCount(long count) {
            this.totalRecords += count;
        }

        private void updateFlowFile(FlowFile flowFile) {
            this.flowFile = flowFile;
        }
    }
}

