/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kafka.pubsub;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.pubsub.InFlightMessageTracker;
import org.apache.nifi.processors.kafka.pubsub.PublishResult;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;

public class PublisherLease
implements Closeable {
    private final ComponentLog logger;
    private final Producer<byte[], byte[]> producer;
    private final int maxMessageSize;
    private final long maxAckWaitMillis;
    private volatile boolean poisoned = false;
    private InFlightMessageTracker tracker;

    public PublisherLease(Producer<byte[], byte[]> producer, int maxMessageSize, long maxAckWaitMillis, ComponentLog logger) {
        this.producer = producer;
        this.maxMessageSize = maxMessageSize;
        this.logger = logger;
        this.maxAckWaitMillis = maxAckWaitMillis;
    }

    protected void poison() {
        this.poisoned = true;
    }

    public boolean isPoisoned() {
        return this.poisoned;
    }

    void publish(FlowFile flowFile, InputStream flowFileContent, byte[] messageKey, byte[] demarcatorBytes, String topic) throws IOException {
        if (this.tracker == null) {
            this.tracker = new InFlightMessageTracker();
        }
        try {
            if (demarcatorBytes == null || demarcatorBytes.length == 0) {
                if (flowFile.getSize() > (long)this.maxMessageSize) {
                    this.tracker.fail(flowFile, (Exception)((Object)new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxMessageSize + " bytes.")));
                    return;
                }
                byte[] messageContent = new byte[(int)flowFile.getSize()];
                StreamUtils.fillBuffer((InputStream)flowFileContent, (byte[])messageContent);
                this.publish(flowFile, messageKey, messageContent, topic, this.tracker);
                return;
            }
            try (StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, this.maxMessageSize);){
                byte[] messageContent;
                while ((messageContent = demarcator.nextToken()) != null) {
                    this.publish(flowFile, messageKey, messageContent, topic, this.tracker);
                    if (!this.tracker.isFailed(flowFile)) continue;
                    return;
                }
                this.tracker.trackEmpty(flowFile);
            }
            catch (TokenTooLargeException ttle) {
                this.tracker.fail(flowFile, (Exception)((Object)ttle));
            }
        }
        catch (Exception e) {
            this.tracker.fail(flowFile, e);
            this.poison();
            throw e;
        }
    }

    private void publish(final FlowFile flowFile, byte[] messageKey, byte[] messageContent, String topic, final InFlightMessageTracker tracker) {
        ProducerRecord record = new ProducerRecord(topic, null, (Object)messageKey, (Object)messageContent);
        this.producer.send(record, new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    tracker.incrementAcknowledgedCount(flowFile);
                } else {
                    tracker.fail(flowFile, exception);
                    PublisherLease.this.poison();
                }
            }
        });
        tracker.incrementSentCount(flowFile);
    }

    public PublishResult complete() {
        if (this.tracker == null) {
            throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
        }
        this.producer.flush();
        try {
            this.tracker.awaitCompletion(this.maxAckWaitMillis);
            PublishResult publishResult = this.tracker.createPublishResult();
            return publishResult;
        }
        catch (InterruptedException e) {
            this.logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
            Thread.currentThread().interrupt();
            PublishResult publishResult = this.tracker.failOutstanding(e);
            return publishResult;
        }
        catch (TimeoutException e) {
            this.logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
            PublishResult publishResult = this.tracker.failOutstanding(e);
            return publishResult;
        }
        finally {
            this.tracker = null;
        }
    }

    @Override
    public void close() {
        this.producer.close(this.maxAckWaitMillis, TimeUnit.MILLISECONDS);
        this.tracker = null;
    }
}

