package io.smallrye.reactive.messaging.kafka.transactions;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniOnFailure;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaClientService;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.TopicPartitions;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterImpl;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.eclipse.microprofile.reactive.messaging.Message;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/transactions/KafkaTransactionsImpl.class */
public class KafkaTransactionsImpl<T> extends MutinyEmitterImpl<T> implements KafkaTransactions<T> {
    private final KafkaClientService clientService;
    private final KafkaProducer<?, ?> producer;
    private volatile KafkaTransactionsImpl<T>.Transaction<?> currentTransaction;
    private final ReentrantLock lock;
    private static final Uni<Void> VOID_UNI = Uni.createFrom().voidItem();

    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/transactions/KafkaTransactionsImpl$Transaction.class */
    private class Transaction<R> implements TransactionalEmitter<T> {
        private final Uni<Void> beforeCommit;
        private final Function<R, Uni<R>> afterCommit;
        private final Uni<Void> beforeAbort;
        private final Function<Throwable, Uni<R>> afterAbort;
        private final List<Uni<Void>> sendUnis;
        private volatile boolean abort;

        public Transaction(KafkaTransactionsImpl kafkaTransactionsImpl) {
            this(KafkaTransactionsImpl.VOID_UNI, obj -> {
                return KafkaTransactionsImpl.defaultAfterCommit(obj);
            }, KafkaTransactionsImpl.VOID_UNI, th -> {
                return KafkaTransactionsImpl.defaultAfterAbort(th);
            });
        }

        public Transaction(Uni<Void> uni, Function<R, Uni<R>> function, Uni<Void> uni2, Function<Throwable, Uni<R>> function2) {
            this.sendUnis = new CopyOnWriteArrayList();
            this.beforeCommit = uni;
            this.afterCommit = function;
            this.beforeAbort = uni2;
            this.afterAbort = function2;
        }

        /* JADX WARN: Multi-variable type inference failed */
        Uni<R> execute(Function<TransactionalEmitter<T>, Uni<R>> function) {
            KafkaTransactionsImpl.this.currentTransaction = this;
            Context currentContext = Vertx.currentContext();
            return KafkaTransactionsImpl.this.producer.beginTransaction().plug(uni -> {
                return currentContext == null ? uni : uni.emitOn(runnable -> {
                    VertxContext.runOnContext(currentContext, runnable);
                });
            }).chain(() -> {
                return executeInTransaction(function);
            }).eventually(() -> {
                KafkaTransactionsImpl.this.currentTransaction = null;
            });
        }

        private Uni<R> executeInTransaction(Function<TransactionalEmitter<T>, Uni<R>> function) {
            return Uni.createFrom().nullItem().chain(() -> {
                return (Uni) function.apply(this);
            }).eventually(() -> {
                return waitOnSend();
            }).call(() -> {
                return KafkaTransactionsImpl.this.producer.flush();
            }).onFailure().call(th -> {
                return abort();
            }).onCancellation().call(() -> {
                return abort();
            }).call(() -> {
                return this.abort ? abort() : commit().onFailure().recoverWithUni(th2 -> {
                    KafkaLogging.log.transactionCommitFailed(th2);
                    return abort();
                });
            }).onFailure().recoverWithUni(th2 -> {
                return this.afterAbort.apply(th2);
            }).onItem().transformToUni(obj -> {
                return this.afterCommit.apply(obj);
            });
        }

        private Uni<List<Void>> waitOnSend() {
            return this.sendUnis.isEmpty() ? Uni.createFrom().nullItem() : Uni.join().all(this.sendUnis).andCollectFailures();
        }

        private Uni<Void> commit() {
            Uni<Void> uni = this.beforeCommit;
            KafkaProducer<?, ?> kafkaProducer = KafkaTransactionsImpl.this.producer;
            Objects.requireNonNull(kafkaProducer);
            return uni.call(kafkaProducer::commitTransaction);
        }

        private Uni<Void> abort() {
            Uni<Void> uni = this.beforeAbort;
            KafkaProducer<?, ?> kafkaProducer = KafkaTransactionsImpl.this.producer;
            Objects.requireNonNull(kafkaProducer);
            Uni<Void> call = uni.call(kafkaProducer::abortTransaction);
            return this.abort ? call.chain(() -> {
                return Uni.createFrom().failure(new TransactionAbortedException());
            }) : call;
        }

        @Override // io.smallrye.reactive.messaging.kafka.transactions.TransactionalEmitter
        public <M extends Message<? extends T>> void send(M m) {
            UniOnFailure onFailure = KafkaTransactionsImpl.this.sendMessage(m).onFailure();
            KafkaLogging kafkaLogging = KafkaLogging.log;
            Objects.requireNonNull(kafkaLogging);
            this.sendUnis.add(Uni.createFrom().completionStage(onFailure.invoke(kafkaLogging::unableToSendRecord).subscribeAsCompletionStage()));
        }

        @Override // io.smallrye.reactive.messaging.kafka.transactions.TransactionalEmitter
        public void send(T t) {
            UniOnFailure onFailure = KafkaTransactionsImpl.this.send(t).onFailure();
            KafkaLogging kafkaLogging = KafkaLogging.log;
            Objects.requireNonNull(kafkaLogging);
            this.sendUnis.add(Uni.createFrom().completionStage(onFailure.invoke(kafkaLogging::unableToSendRecord).subscribeAsCompletionStage()));
        }

        @Override // io.smallrye.reactive.messaging.kafka.transactions.TransactionalEmitter
        public void markForAbort() {
            this.abort = true;
        }

        @Override // io.smallrye.reactive.messaging.kafka.transactions.TransactionalEmitter
        public boolean isMarkedForAbort() {
            return this.abort;
        }
    }

    public KafkaTransactionsImpl(EmitterConfiguration emitterConfiguration, long j, KafkaClientService kafkaClientService) {
        super(emitterConfiguration, j);
        this.lock = new ReentrantLock();
        this.clientService = kafkaClientService;
        this.producer = kafkaClientService.getProducer(emitterConfiguration.name());
    }

    @Override // io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions
    public boolean isTransactionInProgress() {
        this.lock.lock();
        try {
            return this.currentTransaction != null;
        } finally {
            this.lock.unlock();
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions
    @CheckReturnValue
    public <R> Uni<R> withTransaction(Function<TransactionalEmitter<T>, Uni<R>> function) {
        this.lock.lock();
        try {
            if (this.currentTransaction != null) {
                throw KafkaExceptions.ex.transactionInProgress(this.name);
            }
            Uni<R> execute = new Transaction(this).execute(function);
            this.lock.unlock();
            return execute;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v60, types: [java.util.Map] */
    @Override // io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions
    @CheckReturnValue
    public <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmitter<T>, Uni<R>> function) {
        String channel;
        HashMap hashMap;
        int consumerGroupGenerationId;
        this.lock.lock();
        try {
            Optional metadata = message.getMetadata(IncomingKafkaRecordBatchMetadata.class);
            Optional metadata2 = message.getMetadata(IncomingKafkaRecordMetadata.class);
            if (metadata.isPresent()) {
                IncomingKafkaRecordBatchMetadata incomingKafkaRecordBatchMetadata = (IncomingKafkaRecordBatchMetadata) metadata.get();
                channel = incomingKafkaRecordBatchMetadata.getChannel();
                consumerGroupGenerationId = incomingKafkaRecordBatchMetadata.getConsumerGroupGenerationId();
                hashMap = (Map) incomingKafkaRecordBatchMetadata.getOffsets().entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry -> {
                    return new OffsetAndMetadata(((OffsetAndMetadata) entry.getValue()).offset() + 1);
                }));
            } else {
                if (!metadata2.isPresent()) {
                    throw KafkaExceptions.ex.noKafkaMetadataFound(message);
                }
                IncomingKafkaRecordMetadata incomingKafkaRecordMetadata = (IncomingKafkaRecordMetadata) metadata2.get();
                channel = incomingKafkaRecordMetadata.getChannel();
                hashMap = new HashMap();
                consumerGroupGenerationId = incomingKafkaRecordMetadata.getConsumerGroupGenerationId();
                hashMap.put(TopicPartitions.getTopicPartition(incomingKafkaRecordMetadata.getTopic(), incomingKafkaRecordMetadata.getPartition()), new OffsetAndMetadata(incomingKafkaRecordMetadata.getOffset() + 1));
            }
            List consumers = this.clientService.getConsumers(channel);
            if (consumers.isEmpty()) {
                throw KafkaExceptions.ex.unableToFindConsumerForChannel(channel);
            }
            if (consumers.size() > 1) {
                throw KafkaExceptions.ex.exactlyOnceProcessingNotSupported(channel);
            }
            KafkaConsumer kafkaConsumer = (KafkaConsumer) consumers.get(0);
            if (this.currentTransaction != null) {
                throw KafkaExceptions.ex.transactionInProgress(this.name);
            }
            int i = consumerGroupGenerationId;
            HashMap hashMap2 = hashMap;
            String str = channel;
            Uni<R> execute = new Transaction(kafkaConsumer.consumerGroupMetadata().chain(consumerGroupMetadata -> {
                if (consumerGroupMetadata.generationId() != i) {
                    return Uni.createFrom().failure(KafkaExceptions.ex.exactlyOnceProcessingRebalance(str, consumerGroupMetadata.toString(), String.valueOf(i)));
                }
                this.producer.unwrap().sendOffsetsToTransaction(hashMap2, consumerGroupMetadata);
                return Uni.createFrom().voidItem();
            }), obj -> {
                return Uni.createFrom().item(obj);
            }, VOID_UNI, th -> {
                return kafkaConsumer.resetToLastCommittedPositions().chain(() -> {
                    return Uni.createFrom().failure(th);
                });
            }).execute(function);
            this.lock.unlock();
            return execute;
        } catch (Throwable th2) {
            this.lock.unlock();
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <R> Uni<R> defaultAfterCommit(R r) {
        return Uni.createFrom().item(r);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <R> Uni<R> defaultAfterAbort(Throwable th) {
        return Uni.createFrom().failure(th);
    }
}
