/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kafka.pubsub;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10;
import org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

public abstract class ConsumerLease
implements Closeable,
ConsumerRebalanceListener {
    private final long maxWaitMillis;
    private final Consumer<byte[], byte[]> kafkaConsumer;
    private final ComponentLog logger;
    private final byte[] demarcatorBytes;
    private final String keyEncoding;
    private final String securityProtocol;
    private final String bootstrapServers;
    private final RecordSetWriterFactory writerFactory;
    private final RecordReaderFactory readerFactory;
    private boolean poisoned = false;
    private final Map<BundleInformation, BundleTracker> bundleMap = new HashMap<BundleInformation, BundleTracker>();
    private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<TopicPartition, OffsetAndMetadata>();
    private long leaseStartNanos = -1L;
    private boolean lastPollEmpty = false;
    private int totalMessages = 0;

    ConsumerLease(long maxWaitMillis, Consumer<byte[], byte[]> kafkaConsumer, byte[] demarcatorBytes, String keyEncoding, String securityProtocol, String bootstrapServers, RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, ComponentLog logger) {
        this.maxWaitMillis = maxWaitMillis;
        this.kafkaConsumer = kafkaConsumer;
        this.demarcatorBytes = demarcatorBytes;
        this.keyEncoding = keyEncoding;
        this.securityProtocol = securityProtocol;
        this.bootstrapServers = bootstrapServers;
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
        this.logger = logger;
    }

    private void resetInternalState() {
        this.bundleMap.clear();
        this.uncommittedOffsetsMap.clear();
        this.leaseStartNanos = -1L;
        this.lastPollEmpty = false;
        this.totalMessages = 0;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        this.logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, this.kafkaConsumer});
        this.commit();
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        this.logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, this.kafkaConsumer});
    }

    void poll() {
        try {
            ConsumerRecords records = this.kafkaConsumer.poll(10L);
            this.lastPollEmpty = records.count() == 0;
            this.processRecords((ConsumerRecords<byte[], byte[]>)records);
        }
        catch (ProcessException pe) {
            throw pe;
        }
        catch (Throwable t) {
            this.poison();
            throw t;
        }
    }

    boolean commit() {
        if (this.uncommittedOffsetsMap.isEmpty()) {
            this.resetInternalState();
            return false;
        }
        try {
            Collection<FlowFile> bundledFlowFiles = this.getBundles();
            if (!bundledFlowFiles.isEmpty()) {
                this.getProcessSession().transfer(bundledFlowFiles, ConsumeKafkaRecord_0_10.REL_SUCCESS);
            }
            this.getProcessSession().commit();
            Map<TopicPartition, OffsetAndMetadata> offsetsMap = this.uncommittedOffsetsMap;
            this.kafkaConsumer.commitSync(offsetsMap);
            this.resetInternalState();
            return true;
        }
        catch (IOException ioe) {
            this.poison();
            this.logger.error("Failed to finish writing out FlowFile bundle", (Throwable)ioe);
            throw new ProcessException((Throwable)ioe);
        }
        catch (KafkaException kex) {
            this.poison();
            this.logger.warn("Duplicates are likely as we were able to commit the process session but received an exception from Kafka while committing offsets.");
            throw kex;
        }
        catch (Throwable t) {
            this.poison();
            throw t;
        }
    }

    boolean continuePolling() {
        long durationMillis;
        if (this.lastPollEmpty) {
            return false;
        }
        if (this.leaseStartNanos < 0L) {
            this.leaseStartNanos = System.nanoTime();
        }
        if ((durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.leaseStartNanos)) > this.maxWaitMillis) {
            return false;
        }
        if (this.bundleMap.size() > 200) {
            return false;
        }
        return this.totalMessages < 1000;
    }

    private void poison() {
        this.poisoned = true;
    }

    boolean isPoisoned() {
        return this.poisoned;
    }

    public void wakeup() {
        this.kafkaConsumer.wakeup();
    }

    @Override
    public void close() {
        this.resetInternalState();
    }

    public abstract ProcessSession getProcessSession();

    public abstract void yield();

    private void processRecords(ConsumerRecords<byte[], byte[]> records) {
        records.partitions().stream().forEach(partition -> {
            List messages = records.records(partition);
            if (!messages.isEmpty()) {
                long maxOffset = messages.stream().mapToLong(record -> record.offset()).max().getAsLong();
                if (this.demarcatorBytes != null) {
                    this.writeDemarcatedData(this.getProcessSession(), messages, (TopicPartition)partition);
                } else if (this.readerFactory != null && this.writerFactory != null) {
                    this.writeRecordData(this.getProcessSession(), messages, (TopicPartition)partition);
                } else {
                    messages.stream().forEach(message -> this.writeData(this.getProcessSession(), (ConsumerRecord<byte[], byte[]>)message, (TopicPartition)partition));
                }
                this.totalMessages += messages.size();
                this.uncommittedOffsetsMap.put((TopicPartition)partition, new OffsetAndMetadata(maxOffset + 1L));
            }
        });
    }

    private static String encodeKafkaKey(byte[] key, String encoding) {
        if (key == null) {
            return null;
        }
        if (KafkaProcessorUtils.HEX_ENCODING.getValue().equals(encoding)) {
            return DatatypeConverter.printHexBinary((byte[])key);
        }
        if (KafkaProcessorUtils.UTF8_ENCODING.getValue().equals(encoding)) {
            return new String(key, StandardCharsets.UTF_8);
        }
        return null;
    }

    private Collection<FlowFile> getBundles() throws IOException {
        ArrayList<FlowFile> flowFiles = new ArrayList<FlowFile>();
        for (BundleTracker tracker : this.bundleMap.values()) {
            boolean includeBundle = this.processBundle(tracker);
            if (!includeBundle) continue;
            flowFiles.add(tracker.flowFile);
        }
        return flowFiles;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processBundle(BundleTracker bundle) throws IOException {
        RecordSetWriter writer = bundle.recordWriter;
        if (writer != null) {
            WriteResult writeResult;
            try {
                writeResult = writer.finishRecordSet();
            }
            finally {
                writer.close();
            }
            if (writeResult.getRecordCount() == 0) {
                this.getProcessSession().remove(bundle.flowFile);
                return false;
            }
            HashMap<String, String> attributes = new HashMap<String, String>();
            attributes.putAll(writeResult.getAttributes());
            attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
            bundle.flowFile = this.getProcessSession().putAllAttributes(bundle.flowFile, attributes);
        }
        this.populateAttributes(bundle);
        return true;
    }

    private void writeData(ProcessSession session, ConsumerRecord<byte[], byte[]> record, TopicPartition topicPartition) {
        FlowFile flowFile = session.create();
        BundleTracker tracker = new BundleTracker((ConsumerRecord)record, topicPartition, this.keyEncoding);
        tracker.incrementRecordCount(1L);
        byte[] value = (byte[])record.value();
        if (value != null) {
            flowFile = session.write(flowFile, out -> out.write(value));
        }
        tracker.updateFlowFile(flowFile);
        this.populateAttributes(tracker);
        session.transfer(tracker.flowFile, ConsumeKafkaRecord_0_10.REL_SUCCESS);
    }

    private void writeDemarcatedData(ProcessSession session, List<ConsumerRecord<byte[], byte[]>> records, TopicPartition topicPartition) {
        boolean demarcateFirstRecord;
        FlowFile flowFile;
        ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
        BundleInformation bundleInfo = new BundleInformation(topicPartition, null);
        BundleTracker tracker = this.bundleMap.get(bundleInfo);
        if (tracker == null) {
            tracker = new BundleTracker((ConsumerRecord)firstRecord, topicPartition, this.keyEncoding);
            flowFile = session.create();
            tracker.updateFlowFile(flowFile);
            demarcateFirstRecord = false;
        } else {
            demarcateFirstRecord = true;
        }
        flowFile = tracker.flowFile;
        tracker.incrementRecordCount(records.size());
        flowFile = session.append(flowFile, out -> {
            boolean useDemarcator = demarcateFirstRecord;
            for (ConsumerRecord record : records) {
                byte[] value;
                if (useDemarcator) {
                    out.write(this.demarcatorBytes);
                }
                if ((value = (byte[])record.value()) != null) {
                    out.write((byte[])record.value());
                }
                useDemarcator = true;
            }
        });
        tracker.updateFlowFile(flowFile);
        this.bundleMap.put(bundleInfo, tracker);
    }

    private void writeRecordData(ProcessSession session, List<ConsumerRecord<byte[], byte[]>> messages, TopicPartition topicPartition) {
        RecordSetWriter writer = null;
        BiConsumer<ConsumerRecord, Exception> handleParseFailure = (consumerRecord, e) -> {
            HashMap<String, String> attributes = new HashMap<String, String>();
            attributes.put("kafka.offset", String.valueOf(consumerRecord.offset()));
            attributes.put("kafka.partition", String.valueOf(topicPartition.partition()));
            attributes.put("kafka.topic", topicPartition.topic());
            FlowFile failureFlowFile = session.create();
            if (consumerRecord.value() != null) {
                failureFlowFile = session.write(failureFlowFile, out -> out.write((byte[])consumerRecord.value()));
            }
            failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
            String transitUri = KafkaProcessorUtils.buildTransitURI(this.securityProtocol, this.bootstrapServers, topicPartition.topic());
            session.getProvenanceReporter().receive(failureFlowFile, transitUri);
            session.transfer(failureFlowFile, ConsumeKafkaRecord_0_10.REL_PARSE_FAILURE);
            this.logger.error("Failed to parse message from Kafka using the configured Record Reader. Will route message as its own FlowFile to the 'parse.failure' relationship", (Throwable)e);
            session.adjustCounter("Parse Failures", 1L, false);
        };
        try {
            for (ConsumerRecord<byte[], byte[]> consumerRecord2 : messages) {
                Record firstRecord;
                RecordReader reader;
                byte[] recordBytes = consumerRecord2.value() == null ? new byte[]{} : (byte[])consumerRecord2.value();
                ByteArrayInputStream in = new ByteArrayInputStream(recordBytes);
                Throwable throwable = null;
                try {
                    reader = this.readerFactory.createRecordReader(Collections.emptyMap(), (InputStream)in, this.logger);
                    firstRecord = reader.nextRecord();
                }
                catch (Exception e2) {
                    handleParseFailure.accept(consumerRecord2, e2);
                    if (in == null) continue;
                    if (throwable != null) {
                        try {
                            ((InputStream)in).close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                        continue;
                    }
                    ((InputStream)in).close();
                    continue;
                }
                try {
                    if (firstRecord == null) continue;
                    RecordSchema recordSchema = firstRecord.getSchema();
                    BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema);
                    BundleTracker tracker = this.bundleMap.get(bundleInfo);
                    if (tracker == null) {
                        RecordSchema writeSchema;
                        FlowFile flowFile = session.create();
                        OutputStream rawOut = session.write(flowFile);
                        try {
                            writeSchema = this.writerFactory.getSchema(Collections.emptyMap(), recordSchema);
                        }
                        catch (Exception e3) {
                            this.logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", (Throwable)e3);
                            try {
                                this.rollback(topicPartition);
                            }
                            catch (Exception rollbackException) {
                                this.logger.warn("Attempted to rollback Kafka message offset but was unable to do so", (Throwable)rollbackException);
                            }
                            this.yield();
                            throw new ProcessException((Throwable)e3);
                        }
                        writer = this.writerFactory.createWriter(this.logger, writeSchema, rawOut);
                        writer.beginRecordSet();
                        tracker = new BundleTracker(consumerRecord2, topicPartition, this.keyEncoding, writer);
                        tracker.updateFlowFile(flowFile);
                        this.bundleMap.put(bundleInfo, tracker);
                    } else {
                        writer = tracker.recordWriter;
                    }
                    try {
                        Record record = firstRecord;
                        while (record != null) {
                            writer.write(record);
                            tracker.incrementRecordCount(1L);
                            session.adjustCounter("Records Received", 1L, false);
                            record = reader.nextRecord();
                        }
                    }
                    catch (Exception e4) {
                        handleParseFailure.accept(consumerRecord2, e4);
                    }
                }
                catch (Throwable throwable3) {
                    throwable = throwable3;
                    throw throwable3;
                }
                catch (Throwable throwable4) {
                    throw throwable4;
                }
                finally {
                    if (in == null) continue;
                    if (throwable != null) {
                        try {
                            ((InputStream)in).close();
                        }
                        catch (Throwable e2) {
                            throwable.addSuppressed(e2);
                        }
                        continue;
                    }
                    ((InputStream)in).close();
                }
            }
        }
        catch (Exception e5) {
            this.logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", (Throwable)e5);
            try {
                if (writer != null) {
                    writer.close();
                }
            }
            catch (Exception ioe) {
                this.logger.warn("Failed to close Record Writer", (Throwable)ioe);
            }
            try {
                this.rollback(topicPartition);
            }
            catch (Exception rollbackException) {
                this.logger.warn("Attempted to rollback Kafka message offset but was unable to do so", (Throwable)rollbackException);
            }
            throw new ProcessException((Throwable)e5);
        }
    }

    private void rollback(TopicPartition topicPartition) {
        OffsetAndMetadata offsetAndMetadata = this.uncommittedOffsetsMap.get(topicPartition);
        if (offsetAndMetadata == null) {
            offsetAndMetadata = this.kafkaConsumer.committed(topicPartition);
        }
        long offset = offsetAndMetadata.offset();
        this.kafkaConsumer.seek(topicPartition, offset);
    }

    private void populateAttributes(BundleTracker tracker) {
        HashMap<String, String> kafkaAttrs = new HashMap<String, String>();
        kafkaAttrs.put("kafka.offset", String.valueOf(tracker.initialOffset));
        if (tracker.key != null && tracker.totalRecords == 1L) {
            kafkaAttrs.put("kafka.key", tracker.key);
        }
        kafkaAttrs.put("kafka.partition", String.valueOf(tracker.partition));
        kafkaAttrs.put("kafka.topic", tracker.topic);
        if (tracker.totalRecords > 1L) {
            if (tracker.recordWriter == null) {
                kafkaAttrs.put("kafka.count", String.valueOf(tracker.totalRecords));
            } else {
                kafkaAttrs.put("record.count", String.valueOf(tracker.totalRecords));
            }
        }
        FlowFile newFlowFile = this.getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
        long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.leaseStartNanos);
        String transitUri = KafkaProcessorUtils.buildTransitURI(this.securityProtocol, this.bootstrapServers, tracker.topic);
        this.getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
        tracker.updateFlowFile(newFlowFile);
    }

    private static class BundleInformation {
        private final TopicPartition topicPartition;
        private final RecordSchema schema;

        public BundleInformation(TopicPartition topicPartition, RecordSchema schema) {
            this.topicPartition = topicPartition;
            this.schema = schema;
        }

        public int hashCode() {
            return 41 + 13 * this.topicPartition.hashCode() + (this.schema == null ? 0 : 13 * this.schema.hashCode());
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (!(obj instanceof BundleInformation)) {
                return false;
            }
            BundleInformation other = (BundleInformation)obj;
            return Objects.equals(this.topicPartition, other.topicPartition) && Objects.equals(this.schema, other.schema);
        }
    }

    private static class BundleTracker {
        final long initialOffset;
        final int partition;
        final String topic;
        final String key;
        final RecordSetWriter recordWriter;
        FlowFile flowFile;
        long totalRecords = 0L;

        private BundleTracker(ConsumerRecord<byte[], byte[]> initialRecord, TopicPartition topicPartition, String keyEncoding) {
            this(initialRecord, topicPartition, keyEncoding, (RecordSetWriter)null);
        }

        private BundleTracker(ConsumerRecord<byte[], byte[]> initialRecord, TopicPartition topicPartition, String keyEncoding, RecordSetWriter recordWriter) {
            this.initialOffset = initialRecord.offset();
            this.partition = topicPartition.partition();
            this.topic = topicPartition.topic();
            this.recordWriter = recordWriter;
            this.key = ConsumerLease.encodeKafkaKey((byte[])initialRecord.key(), keyEncoding);
        }

        private void incrementRecordCount(long count) {
            this.totalRecords += count;
        }

        private void updateFlowFile(FlowFile flowFile) {
            this.flowFile = flowFile;
        }
    }
}

