package io.confluent.parallelconsumer.internal;

import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.offsets.OffsetSimultaneousEncoder;
import io.confluent.parallelconsumer.state.WorkManager;
import java.time.Duration;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/internal/ProducerManager.class */
public class ProducerManager<K, V> extends AbstractOffsetCommitter<K, V> implements OffsetCommitter {
    private static final Logger log = LoggerFactory.getLogger(ProducerManager.class);
    protected final ProducerWrapper<K, V> producerWrapper;
    private final ParallelConsumerOptions<K, V> options;
    private ReentrantReadWriteLock producerTransactionLock;

    /* loaded from: input_file:io/confluent/parallelconsumer/internal/ProducerManager$ProducingLock.class */
    public class ProducingLock {
        private final PollContextInternal<K, V> context;
        private final ReentrantReadWriteLock.ReadLock produceLock;

        /* JADX INFO: Access modifiers changed from: protected */
        public void unlock() {
            this.produceLock.unlock();
            ProducerManager.log.debug("Unlocking produce lock (context: {}).", this.context.getOffsets());
        }

        public ProducingLock(PollContextInternal<K, V> pollContextInternal, ReentrantReadWriteLock.ReadLock readLock) {
            this.context = pollContextInternal;
            this.produceLock = readLock;
        }
    }

    public ProducerManager(ProducerWrapper<K, V> producerWrapper, ConsumerManager<K, V> consumerManager, WorkManager<K, V> workManager, ParallelConsumerOptions<K, V> parallelConsumerOptions) {
        super(consumerManager, workManager);
        this.producerWrapper = producerWrapper;
        this.options = parallelConsumerOptions;
        initProducer();
    }

    private void initProducer() {
        this.producerTransactionLock = new ReentrantReadWriteLock(true);
        if (!this.options.isUsingTransactionalProducer()) {
            if (this.producerWrapper.isConfiguredForTransactions()) {
                throw new IllegalArgumentException("Using non-transactional producer option, but Producer has a transaction ID - the Producer must not have a transaction ID for this option. This is because having such an ID forces the Producer into transactional mode - i.e. you cannot use it without using transactions.");
            }
        } else {
            if (!this.producerWrapper.isConfiguredForTransactions()) {
                throw new IllegalArgumentException("Using transactional option, yet Producer doesn't have a transaction ID - Producer needs a transaction id");
            }
            try {
                log.debug("Initialising producer transaction session...");
                this.producerWrapper.initTransactions();
            } catch (KafkaException e) {
                log.error("Make sure your producer is setup for transactions - specifically make sure it's {} is set.", "transactional.id", e);
                throw e;
            }
        }
    }

    public List<ParallelConsumer.Tuple<ProducerRecord<K, V>, Future<RecordMetadata>>> produceMessages(List<ProducerRecord<K, V>> list) {
        ensureProduceStarted();
        lazyMaybeBeginTransaction();
        Callback callback = (recordMetadata, exc) -> {
            if (exc != null) {
                log.error("Error producing result message", exc);
                throw new InternalRuntimeException("Error producing result message", exc);
            }
        };
        ArrayList arrayList = new ArrayList(list.size());
        for (ProducerRecord<K, V> producerRecord : list) {
            log.trace("Producing {}", producerRecord);
            arrayList.add(ParallelConsumer.Tuple.pairOf(producerRecord, this.producerWrapper.send(producerRecord, callback)));
        }
        return arrayList;
    }

    private void lazyMaybeBeginTransaction() {
        if (this.options.isUsingTransactionCommitMode()) {
            if (!this.producerWrapper.isTransactionOpen()) {
                syncBeginTransaction();
            }
        }
    }

    private synchronized void syncBeginTransaction() {
        if (!this.producerWrapper.isTransactionOpen()) {
            beginTransaction();
        }
    }

    protected void releaseProduceLock(ProducerManager<K, V>.ProducingLock producingLock) {
        producingLock.unlock();
    }

    protected ProducerManager<K, V>.ProducingLock acquireProduceLock(PollContextInternal<K, V> pollContextInternal) throws TimeoutException {
        ReentrantReadWriteLock.ReadLock readLock = this.producerTransactionLock.readLock();
        Duration produceLockAcquisitionTimeout = this.options.getProduceLockAcquisitionTimeout();
        log.debug("Acquiring produce lock (timeout: {})...", produceLockAcquisitionTimeout);
        try {
            if (!readLock.tryLock(produceLockAcquisitionTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                throw new TimeoutException(StringUtils.msg("Timeout while waiting to get produce lock (was set to {}). Commit taking too long? Try increasing the produce lock timeout.", produceLockAcquisitionTimeout));
            }
            log.debug("Produce lock acquired (context: {}).", pollContextInternal.getOffsets());
            log.trace("Produce lock acquired.");
            return new ProducingLock(pollContextInternal, readLock);
        } catch (InterruptedException e) {
            throw new InternalRuntimeException("Interrupted while waiting to get produce lock (timeout was set to {})", e, produceLockAcquisitionTimeout);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.parallelconsumer.internal.AbstractOffsetCommitter
    public void preAcquireOffsetsToCommit() throws TimeoutException, InterruptedException {
        acquireCommitLock();
        flush();
    }

    private void flush() {
        this.producerWrapper.flush();
    }

    @Override // io.confluent.parallelconsumer.internal.AbstractOffsetCommitter
    protected void postCommit() {
        if (this.producerTransactionLock.getWriteHoldCount() > 1) {
            throw new ConcurrentModificationException("Lock held too many times, won't be released problem and will cause deadlock");
        }
        releaseCommitLock();
    }

    @Override // io.confluent.parallelconsumer.internal.AbstractOffsetCommitter
    protected void commitOffsets(@NonNull Map<TopicPartition, OffsetAndMetadata> map, @NonNull ConsumerGroupMetadata consumerGroupMetadata) {
        if (map == null) {
            throw new NullPointerException("offsetsToSend is marked non-null but is null");
        }
        if (consumerGroupMetadata == null) {
            throw new NullPointerException("groupMetadata is marked non-null but is null");
        }
        log.debug("Transactional offset commit starting");
        if (!this.options.isUsingTransactionalProducer()) {
            throw new IllegalStateException("Bug: cannot use if not using transactional producer");
        }
        ensureCommitLockHeld();
        lazyMaybeBeginTransaction();
        try {
            this.producerWrapper.sendOffsetsToTransaction(map, consumerGroupMetadata);
            boolean z = false;
            int i = 0;
            Exception exc = null;
            while (!z) {
                if (i > 200) {
                    String msg = StringUtils.msg("Retired too many times ({} > limit of {}), giving up. See error above.", Integer.valueOf(i), Integer.valueOf(OffsetSimultaneousEncoder.LARGE_ENCODED_SIZE_THRESHOLD_BYTES));
                    log.error(msg, exc);
                    throw new InternalRuntimeException(msg, exc);
                }
                try {
                    if (this.producerWrapper.isMockProducer()) {
                        commitTransaction();
                    } else {
                        if (i > 0) {
                            if (this.producerWrapper.isTransactionCompleting()) {
                                commitTransaction();
                            }
                            if (exc == null || !exc.getMessage().contains("Invalid transition attempted from state READY to state COMMITTING_TRANSACTION")) {
                                log.error("Transaction was already in READY state - tx completed between interrupt and retry");
                            }
                        } else {
                            commitTransaction();
                        }
                    }
                    z = true;
                    if (i > 0) {
                        log.warn("Commit success, but took {} tries.", Integer.valueOf(i));
                    }
                } catch (org.apache.kafka.common.errors.TimeoutException | InterruptException e) {
                    log.warn("Commit exception, will retry, have tried {} times (see KafkaProducer#commit)", Integer.valueOf(i), e);
                    exc = e;
                    i++;
                }
            }
        } catch (ProducerFencedException e2) {
            throw new InternalRuntimeException((Throwable) e2);
        }
    }

    private void commitTransaction() {
        this.producerWrapper.commitTransaction();
    }

    private void beginTransaction() {
        this.producerWrapper.beginTransaction();
    }

    public void close(Duration duration) {
        log.debug("Closing producer, assuming no more in flight...");
        if (this.options.isUsingTransactionalProducer() && !this.producerWrapper.isTransactionReady()) {
            try {
                acquireCommitLock();
            } catch (InterruptedException | TimeoutException e) {
                log.error("Exception acquiring commit lock, will try to abort anyway", e);
            }
            try {
                abortTransaction();
            } finally {
                releaseCommitLock();
            }
        }
        closeProducer(duration);
    }

    private void closeProducer(Duration duration) {
        this.producerWrapper.close(duration);
    }

    private void abortTransaction() {
        this.producerWrapper.abortTransaction();
    }

    private void acquireCommitLock() throws TimeoutException, InterruptedException {
        log.debug("Acquiring commit - checking lock state...");
        if (this.producerTransactionLock.isWriteLocked() && this.producerTransactionLock.isWriteLockedByCurrentThread()) {
            log.debug("Lock already held, returning with-out reentering to avoid write lock layers...");
            return;
        }
        ReentrantReadWriteLock.WriteLock writeLock = this.producerTransactionLock.writeLock();
        if (this.producerTransactionLock.isWriteLocked() && !this.producerTransactionLock.isWriteLockedByCurrentThread()) {
            throw new ConcurrentModificationException(getClass().getSimpleName() + " is not safe for multi-threaded access - write lock already held by another thread");
        }
        Duration commitLockAcquisitionTimeout = this.options.getCommitLockAcquisitionTimeout();
        log.debug("Acquiring commit lock (timeout: {})...", commitLockAcquisitionTimeout);
        if (!writeLock.tryLock(commitLockAcquisitionTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
            throw new TimeoutException(StringUtils.msg("Timeout getting commit lock (which was set to {}). Slow processing or too many records being ack'd? Try increasing the commit lock timeout ({}), or reduce your record processing time.", commitLockAcquisitionTimeout, ParallelConsumerOptions.Fields.commitLockAcquisitionTimeout));
        }
        log.debug("Commit lock acquired.");
    }

    private void releaseCommitLock() {
        log.debug("Releasing commit lock...");
        ReentrantReadWriteLock.WriteLock writeLock = this.producerTransactionLock.writeLock();
        if (!this.producerTransactionLock.isWriteLockedByCurrentThread()) {
            throw new IllegalStateException("Not held be me");
        }
        writeLock.unlock();
        log.debug("Commit lock released.");
    }

    private void ensureCommitLockHeld() {
        if (!this.producerTransactionLock.isWriteLockedByCurrentThread()) {
            throw new IllegalStateException("Expected commit lock to be held");
        }
    }

    public boolean isTransactionCommittingInProgress() {
        return this.producerTransactionLock.isWriteLocked();
    }

    public ProducerManager<K, V>.ProducingLock beginProducing(PollContextInternal<K, V> pollContextInternal) throws TimeoutException {
        return acquireProduceLock(pollContextInternal);
    }

    public void finishProducing(@NonNull ProducerManager<K, V>.ProducingLock producingLock) {
        if (producingLock == null) {
            throw new NullPointerException("produceLock is marked non-null but is null");
        }
        ensureProduceStarted();
        releaseProduceLock(producingLock);
    }

    private void ensureProduceStarted() {
        if (this.options.isUsingTransactionCommitMode() && this.producerTransactionLock.getReadHoldCount() < 1) {
            throw new InternalRuntimeException("Need to call #beginProducing first");
        }
    }

    public String toString() {
        return "ProducerManager()";
    }

    public ProducerWrapper<K, V> getProducerWrapper() {
        return this.producerWrapper;
    }

    public ReentrantReadWriteLock getProducerTransactionLock() {
        return this.producerTransactionLock;
    }
}
