/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kafka.pubsub;

import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0;
import org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSchema;

public abstract class ConsumerLease
implements Closeable,
ConsumerRebalanceListener {
    private final long maxWaitMillis;
    private final Consumer<byte[], byte[]> kafkaConsumer;
    private final ComponentLog logger;
    private final byte[] demarcatorBytes;
    private final String keyEncoding;
    private final String securityProtocol;
    private final String bootstrapServers;
    private final RecordSetWriterFactory writerFactory;
    private final RecordReaderFactory readerFactory;
    private final Charset headerCharacterSet;
    private final Pattern headerNamePattern;
    private boolean poisoned = false;
    private final Map<BundleInformation, BundleTracker> bundleMap = new HashMap<BundleInformation, BundleTracker>();
    private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<TopicPartition, OffsetAndMetadata>();
    private long leaseStartNanos = -1L;
    private boolean lastPollEmpty = false;
    private int totalMessages = 0;

    ConsumerLease(long maxWaitMillis, Consumer<byte[], byte[]> kafkaConsumer, byte[] demarcatorBytes, String keyEncoding, String securityProtocol, String bootstrapServers, RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, ComponentLog logger, Charset headerCharacterSet, Pattern headerNamePattern) {
        this.maxWaitMillis = maxWaitMillis;
        this.kafkaConsumer = kafkaConsumer;
        this.demarcatorBytes = demarcatorBytes;
        this.keyEncoding = keyEncoding;
        this.securityProtocol = securityProtocol;
        this.bootstrapServers = bootstrapServers;
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
        this.logger = logger;
        this.headerCharacterSet = headerCharacterSet;
        this.headerNamePattern = headerNamePattern;
    }

    private void resetInternalState() {
        this.bundleMap.clear();
        this.uncommittedOffsetsMap.clear();
        this.leaseStartNanos = -1L;
        this.lastPollEmpty = false;
        this.totalMessages = 0;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        this.logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, this.kafkaConsumer});
        this.commit();
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        this.logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, this.kafkaConsumer});
    }

    void poll() {
        try {
            ConsumerRecords records = this.kafkaConsumer.poll(10L);
            this.lastPollEmpty = records.count() == 0;
            this.processRecords((ConsumerRecords<byte[], byte[]>)records);
        }
        catch (ProcessException pe) {
            throw pe;
        }
        catch (Throwable t) {
            this.poison();
            throw t;
        }
    }

    boolean commit() {
        if (this.uncommittedOffsetsMap.isEmpty()) {
            this.resetInternalState();
            return false;
        }
        try {
            Collection<FlowFile> bundledFlowFiles = this.getBundles();
            if (!bundledFlowFiles.isEmpty()) {
                this.getProcessSession().transfer(bundledFlowFiles, ConsumeKafkaRecord_1_0.REL_SUCCESS);
            }
            this.getProcessSession().commit();
            Map<TopicPartition, OffsetAndMetadata> offsetsMap = this.uncommittedOffsetsMap;
            this.kafkaConsumer.commitSync(offsetsMap);
            this.resetInternalState();
            return true;
        }
        catch (IOException ioe) {
            this.poison();
            this.logger.error("Failed to finish writing out FlowFile bundle", (Throwable)ioe);
            throw new ProcessException((Throwable)ioe);
        }
        catch (KafkaException kex) {
            this.poison();
            this.logger.warn("Duplicates are likely as we were able to commit the process session but received an exception from Kafka while committing offsets.");
            throw kex;
        }
        catch (Throwable t) {
            this.poison();
            throw t;
        }
    }

    boolean continuePolling() {
        long durationMillis;
        if (this.lastPollEmpty) {
            return false;
        }
        if (this.leaseStartNanos < 0L) {
            this.leaseStartNanos = System.nanoTime();
        }
        if ((durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.leaseStartNanos)) > this.maxWaitMillis) {
            return false;
        }
        if (this.bundleMap.size() > 200) {
            return false;
        }
        return this.totalMessages < 1000;
    }

    private void poison() {
        this.poisoned = true;
    }

    boolean isPoisoned() {
        return this.poisoned;
    }

    public void wakeup() {
        this.kafkaConsumer.wakeup();
    }

    @Override
    public void close() {
        this.resetInternalState();
    }

    public abstract ProcessSession getProcessSession();

    public abstract void yield();

    private void processRecords(ConsumerRecords<byte[], byte[]> records) {
        records.partitions().stream().forEach(partition -> {
            List messages = records.records(partition);
            if (!messages.isEmpty()) {
                long maxOffset = messages.stream().mapToLong(record -> record.offset()).max().getAsLong();
                if (this.demarcatorBytes != null) {
                    this.writeDemarcatedData(this.getProcessSession(), messages, (TopicPartition)partition);
                } else if (this.readerFactory != null && this.writerFactory != null) {
                    this.writeRecordData(this.getProcessSession(), messages, (TopicPartition)partition);
                } else {
                    messages.stream().forEach(message -> this.writeData(this.getProcessSession(), (ConsumerRecord<byte[], byte[]>)message, (TopicPartition)partition));
                }
                this.totalMessages += messages.size();
                this.uncommittedOffsetsMap.put((TopicPartition)partition, new OffsetAndMetadata(maxOffset + 1L));
            }
        });
    }

    private static String encodeKafkaKey(byte[] key, String encoding) {
        if (key == null) {
            return null;
        }
        if (KafkaProcessorUtils.HEX_ENCODING.getValue().equals(encoding)) {
            return DatatypeConverter.printHexBinary((byte[])key);
        }
        if (KafkaProcessorUtils.UTF8_ENCODING.getValue().equals(encoding)) {
            return new String(key, StandardCharsets.UTF_8);
        }
        return null;
    }

    private Collection<FlowFile> getBundles() throws IOException {
        ArrayList<FlowFile> flowFiles = new ArrayList<FlowFile>();
        for (BundleTracker tracker : this.bundleMap.values()) {
            boolean includeBundle = this.processBundle(tracker);
            if (!includeBundle) continue;
            flowFiles.add(tracker.flowFile);
        }
        return flowFiles;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processBundle(BundleTracker bundle) throws IOException {
        RecordSetWriter writer = bundle.recordWriter;
        if (writer != null) {
            WriteResult writeResult;
            try {
                writeResult = writer.finishRecordSet();
            }
            finally {
                writer.close();
            }
            if (writeResult.getRecordCount() == 0) {
                this.getProcessSession().remove(bundle.flowFile);
                return false;
            }
            HashMap<String, String> attributes = new HashMap<String, String>();
            attributes.putAll(writeResult.getAttributes());
            attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
            bundle.flowFile = this.getProcessSession().putAllAttributes(bundle.flowFile, attributes);
        }
        this.populateAttributes(bundle);
        return true;
    }

    private void writeData(ProcessSession session, ConsumerRecord<byte[], byte[]> record, TopicPartition topicPartition) {
        FlowFile flowFile = session.create();
        BundleTracker tracker = new BundleTracker((ConsumerRecord)record, topicPartition, this.keyEncoding);
        tracker.incrementRecordCount(1L);
        byte[] value = (byte[])record.value();
        if (value != null) {
            flowFile = session.write(flowFile, out -> out.write(value));
        }
        flowFile = session.putAllAttributes(flowFile, this.getAttributes(record));
        tracker.updateFlowFile(flowFile);
        this.populateAttributes(tracker);
        session.transfer(tracker.flowFile, ConsumeKafkaRecord_1_0.REL_SUCCESS);
    }

    private void writeDemarcatedData(ProcessSession session, List<ConsumerRecord<byte[], byte[]>> records, TopicPartition topicPartition) {
        Map<BundleInformation, List<ConsumerRecord>> map = records.stream().collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, this.getAttributes((ConsumerRecord<?, ?>)rec))));
        for (Map.Entry<BundleInformation, List<ConsumerRecord>> entry : map.entrySet()) {
            boolean demarcateFirstRecord;
            FlowFile flowFile;
            BundleInformation bundleInfo = entry.getKey();
            List<ConsumerRecord> recordList = entry.getValue();
            BundleTracker tracker = this.bundleMap.get(bundleInfo);
            if (tracker == null) {
                tracker = new BundleTracker(recordList.get(0), topicPartition, this.keyEncoding);
                flowFile = session.create();
                flowFile = session.putAllAttributes(flowFile, bundleInfo.attributes);
                tracker.updateFlowFile(flowFile);
                demarcateFirstRecord = false;
            } else {
                demarcateFirstRecord = true;
            }
            flowFile = tracker.flowFile;
            tracker.incrementRecordCount(recordList.size());
            flowFile = session.append(flowFile, out -> {
                boolean useDemarcator = demarcateFirstRecord;
                for (ConsumerRecord record : recordList) {
                    byte[] value;
                    if (useDemarcator) {
                        out.write(this.demarcatorBytes);
                    }
                    if ((value = (byte[])record.value()) != null) {
                        out.write((byte[])record.value());
                    }
                    useDemarcator = true;
                }
            });
            tracker.updateFlowFile(flowFile);
            this.bundleMap.put(bundleInfo, tracker);
        }
    }

    private void handleParseFailure(ConsumerRecord<byte[], byte[]> consumerRecord, ProcessSession session, Exception cause) {
        this.handleParseFailure(consumerRecord, session, cause, "Failed to parse message from Kafka using the configured Record Reader. Will route message as its own FlowFile to the 'parse.failure' relationship");
    }

    private void handleParseFailure(ConsumerRecord<byte[], byte[]> consumerRecord, ProcessSession session, Exception cause, String message) {
        Map<String, String> attributes = this.getAttributes(consumerRecord);
        attributes.put("kafka.offset", String.valueOf(consumerRecord.offset()));
        attributes.put("kafka.partition", String.valueOf(consumerRecord.partition()));
        attributes.put("kafka.topic", consumerRecord.topic());
        FlowFile failureFlowFile = session.create();
        byte[] value = (byte[])consumerRecord.value();
        if (value != null) {
            failureFlowFile = session.write(failureFlowFile, out -> out.write(value));
        }
        failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
        String transitUri = KafkaProcessorUtils.buildTransitURI(this.securityProtocol, this.bootstrapServers, consumerRecord.topic());
        session.getProvenanceReporter().receive(failureFlowFile, transitUri);
        session.transfer(failureFlowFile, ConsumeKafkaRecord_1_0.REL_PARSE_FAILURE);
        if (cause == null) {
            this.logger.error(message);
        } else {
            this.logger.error(message, (Throwable)cause);
        }
        session.adjustCounter("Parse Failures", 1L, false);
    }

    private Map<String, String> getAttributes(ConsumerRecord<?, ?> consumerRecord) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        if (this.headerNamePattern == null) {
            return attributes;
        }
        for (Header header : consumerRecord.headers()) {
            String attributeName = header.key();
            if (!this.headerNamePattern.matcher(attributeName).matches()) continue;
            attributes.put(attributeName, new String(header.value(), this.headerCharacterSet));
        }
        return attributes;
    }

    /*
     * Exception decompiling
     */
    private void writeRecordData(ProcessSession session, List<ConsumerRecord<byte[], byte[]>> records, TopicPartition topicPartition) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void closeWriter(RecordSetWriter writer) {
        try {
            if (writer != null) {
                writer.close();
            }
        }
        catch (Exception ioe) {
            this.logger.warn("Failed to close Record Writer", (Throwable)ioe);
        }
    }

    private void rollback(TopicPartition topicPartition) {
        try {
            OffsetAndMetadata offsetAndMetadata = this.uncommittedOffsetsMap.get(topicPartition);
            if (offsetAndMetadata == null) {
                offsetAndMetadata = this.kafkaConsumer.committed(topicPartition);
            }
            long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
            this.kafkaConsumer.seek(topicPartition, offset);
        }
        catch (Exception rollbackException) {
            this.logger.warn("Attempted to rollback Kafka message offset but was unable to do so", (Throwable)rollbackException);
        }
    }

    private void populateAttributes(BundleTracker tracker) {
        HashMap<String, String> kafkaAttrs = new HashMap<String, String>();
        kafkaAttrs.put("kafka.offset", String.valueOf(tracker.initialOffset));
        if (tracker.key != null && tracker.totalRecords == 1L) {
            kafkaAttrs.put("kafka.key", tracker.key);
        }
        kafkaAttrs.put("kafka.partition", String.valueOf(tracker.partition));
        kafkaAttrs.put("kafka.topic", tracker.topic);
        if (tracker.totalRecords > 1L) {
            if (tracker.recordWriter == null) {
                kafkaAttrs.put("kafka.count", String.valueOf(tracker.totalRecords));
            } else {
                kafkaAttrs.put("record.count", String.valueOf(tracker.totalRecords));
            }
        }
        FlowFile newFlowFile = this.getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
        long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.leaseStartNanos);
        String transitUri = KafkaProcessorUtils.buildTransitURI(this.securityProtocol, this.bootstrapServers, tracker.topic);
        this.getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
        tracker.updateFlowFile(newFlowFile);
    }

    private static class BundleInformation {
        private final TopicPartition topicPartition;
        private final RecordSchema schema;
        private final Map<String, String> attributes;

        public BundleInformation(TopicPartition topicPartition, RecordSchema schema, Map<String, String> attributes) {
            this.topicPartition = topicPartition;
            this.schema = schema;
            this.attributes = attributes;
        }

        public int hashCode() {
            return 41 + 13 * this.topicPartition.hashCode() + (this.schema == null ? 0 : 13 * this.schema.hashCode()) + (this.attributes == null ? 0 : 13 * this.attributes.hashCode());
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (!(obj instanceof BundleInformation)) {
                return false;
            }
            BundleInformation other = (BundleInformation)obj;
            return Objects.equals(this.topicPartition, other.topicPartition) && Objects.equals(this.schema, other.schema) && Objects.equals(this.attributes, other.attributes);
        }
    }

    private static class BundleTracker {
        final long initialOffset;
        final int partition;
        final String topic;
        final String key;
        final RecordSetWriter recordWriter;
        FlowFile flowFile;
        long totalRecords = 0L;

        private BundleTracker(ConsumerRecord<byte[], byte[]> initialRecord, TopicPartition topicPartition, String keyEncoding) {
            this(initialRecord, topicPartition, keyEncoding, (RecordSetWriter)null);
        }

        private BundleTracker(ConsumerRecord<byte[], byte[]> initialRecord, TopicPartition topicPartition, String keyEncoding, RecordSetWriter recordWriter) {
            this.initialOffset = initialRecord.offset();
            this.partition = topicPartition.partition();
            this.topic = topicPartition.topic();
            this.recordWriter = recordWriter;
            this.key = ConsumerLease.encodeKafkaKey((byte[])initialRecord.key(), keyEncoding);
        }

        private void incrementRecordCount(long count) {
            this.totalRecords += count;
        }

        private void updateFlowFile(FlowFile flowFile) {
            this.flowFile = flowFile;
        }
    }
}

