/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kafka.pubsub;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.pubsub.InFlightMessageTracker;
import org.apache.nifi.processors.kafka.pubsub.PublishResult;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;

public class PublisherLease
implements Closeable {
    private final ComponentLog logger;
    private final Producer<byte[], byte[]> producer;
    private final int maxMessageSize;
    private final long maxAckWaitMillis;
    private final boolean useTransactions;
    private final Pattern attributeNameRegex;
    private final Charset headerCharacterSet;
    private volatile boolean poisoned = false;
    private final AtomicLong messagesSent = new AtomicLong(0L);
    private volatile boolean transactionsInitialized = false;
    private volatile boolean activeTransaction = false;
    private InFlightMessageTracker tracker;

    public PublisherLease(Producer<byte[], byte[]> producer, int maxMessageSize, long maxAckWaitMillis, ComponentLog logger, boolean useTransactions, Pattern attributeNameRegex, Charset headerCharacterSet) {
        this.producer = producer;
        this.maxMessageSize = maxMessageSize;
        this.logger = logger;
        this.maxAckWaitMillis = maxAckWaitMillis;
        this.useTransactions = useTransactions;
        this.attributeNameRegex = attributeNameRegex;
        this.headerCharacterSet = headerCharacterSet;
    }

    protected void poison() {
        this.poisoned = true;
    }

    public boolean isPoisoned() {
        return this.poisoned;
    }

    void beginTransaction() {
        if (!this.useTransactions) {
            return;
        }
        if (!this.transactionsInitialized) {
            this.producer.initTransactions();
            this.transactionsInitialized = true;
        }
        this.producer.beginTransaction();
        this.activeTransaction = true;
    }

    void rollback() {
        if (!this.useTransactions || !this.activeTransaction) {
            return;
        }
        this.producer.abortTransaction();
        this.activeTransaction = false;
    }

    void fail(FlowFile flowFile, Exception cause) {
        this.getTracker().fail(flowFile, cause);
        this.rollback();
    }

    void publish(FlowFile flowFile, InputStream flowFileContent, byte[] messageKey, byte[] demarcatorBytes, String topic) throws IOException {
        if (this.tracker == null) {
            this.tracker = new InFlightMessageTracker(this.logger);
        }
        try {
            if (demarcatorBytes == null || demarcatorBytes.length == 0) {
                if (flowFile.getSize() > (long)this.maxMessageSize) {
                    this.tracker.fail(flowFile, (Exception)((Object)new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxMessageSize + " bytes.")));
                    return;
                }
                byte[] messageContent = new byte[(int)flowFile.getSize()];
                StreamUtils.fillBuffer((InputStream)flowFileContent, (byte[])messageContent);
                this.publish(flowFile, messageKey, messageContent, topic, this.tracker);
                return;
            }
            try (StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, this.maxMessageSize);){
                byte[] messageContent;
                while ((messageContent = demarcator.nextToken()) != null) {
                    this.publish(flowFile, messageKey, messageContent, topic, this.tracker);
                    if (!this.tracker.isFailed(flowFile)) continue;
                    return;
                }
            }
            catch (TokenTooLargeException ttle) {
                this.tracker.fail(flowFile, (Exception)((Object)ttle));
            }
        }
        catch (Exception e) {
            this.tracker.fail(flowFile, e);
            this.poison();
            throw e;
        }
    }

    void publish(FlowFile flowFile, RecordSet recordSet, RecordSetWriterFactory writerFactory, RecordSchema schema, String messageKeyField, String topic) throws IOException {
        if (this.tracker == null) {
            this.tracker = new InFlightMessageTracker(this.logger);
        }
        ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
        int recordCount = 0;
        try {
            Record record;
            while ((record = recordSet.next()) != null) {
                ++recordCount;
                baos.reset();
                Map additionalAttributes = Collections.emptyMap();
                try (RecordSetWriter writer = writerFactory.createWriter(this.logger, schema, (OutputStream)baos);){
                    WriteResult writeResult = writer.write(record);
                    additionalAttributes = writeResult.getAttributes();
                    writer.flush();
                }
                byte[] messageContent = baos.toByteArray();
                String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
                byte[] messageKey = key == null ? null : key.getBytes(StandardCharsets.UTF_8);
                this.publish(flowFile, additionalAttributes, messageKey, messageContent, topic, this.tracker);
                if (!this.tracker.isFailed(flowFile)) continue;
                return;
            }
            if (recordCount == 0) {
                this.tracker.trackEmpty(flowFile);
            }
        }
        catch (TokenTooLargeException ttle) {
            this.tracker.fail(flowFile, (Exception)((Object)ttle));
        }
        catch (SchemaNotFoundException snfe) {
            throw new IOException(snfe);
        }
        catch (Exception e) {
            this.tracker.fail(flowFile, e);
            this.poison();
            throw e;
        }
    }

    private void addHeaders(FlowFile flowFile, Map<String, String> additionalAttributes, ProducerRecord<?, ?> record) {
        if (this.attributeNameRegex == null) {
            return;
        }
        Headers headers = record.headers();
        for (Map.Entry entry : flowFile.getAttributes().entrySet()) {
            if (!this.attributeNameRegex.matcher((CharSequence)entry.getKey()).matches()) continue;
            headers.add((String)entry.getKey(), ((String)entry.getValue()).getBytes(this.headerCharacterSet));
        }
        for (Map.Entry<Object, Object> entry : additionalAttributes.entrySet()) {
            if (!this.attributeNameRegex.matcher((CharSequence)entry.getKey()).matches()) continue;
            headers.add((String)entry.getKey(), ((String)entry.getValue()).getBytes(this.headerCharacterSet));
        }
    }

    protected void publish(FlowFile flowFile, byte[] messageKey, byte[] messageContent, String topic, InFlightMessageTracker tracker) {
        this.publish(flowFile, Collections.emptyMap(), messageKey, messageContent, topic, tracker);
    }

    protected void publish(final FlowFile flowFile, Map<String, String> additionalAttributes, byte[] messageKey, byte[] messageContent, String topic, final InFlightMessageTracker tracker) {
        ProducerRecord record = new ProducerRecord(topic, null, (Object)messageKey, (Object)messageContent);
        this.addHeaders(flowFile, additionalAttributes, record);
        this.producer.send(record, new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    tracker.incrementAcknowledgedCount(flowFile);
                } else {
                    tracker.fail(flowFile, exception);
                    PublisherLease.this.poison();
                }
            }
        });
        this.messagesSent.incrementAndGet();
        tracker.incrementSentCount(flowFile);
    }

    public PublishResult complete() {
        if (this.tracker == null) {
            if (this.messagesSent.get() == 0L) {
                return PublishResult.EMPTY;
            }
            this.rollback();
            throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
        }
        this.producer.flush();
        if (this.activeTransaction) {
            this.producer.commitTransaction();
            this.activeTransaction = false;
        }
        try {
            this.tracker.awaitCompletion(this.maxAckWaitMillis);
            PublishResult publishResult = this.tracker.createPublishResult();
            return publishResult;
        }
        catch (InterruptedException e) {
            this.logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
            Thread.currentThread().interrupt();
            PublishResult publishResult = this.tracker.failOutstanding(e);
            return publishResult;
        }
        catch (TimeoutException e) {
            this.logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
            PublishResult publishResult = this.tracker.failOutstanding(e);
            return publishResult;
        }
        finally {
            this.tracker = null;
        }
    }

    @Override
    public void close() {
        this.producer.close(this.maxAckWaitMillis, TimeUnit.MILLISECONDS);
        this.tracker = null;
    }

    public InFlightMessageTracker getTracker() {
        if (this.tracker == null) {
            this.tracker = new InFlightMessageTracker(this.logger);
        }
        return this.tracker;
    }
}

