package org.apache.pulsar.broker.transaction.pendingack.impl;

import io.netty.util.Timer;
import io.prometheus.client.CollectorRegistry;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.exception.pendingack.TransactionPendingAckException;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterMetricsStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.class */
public class MLPendingAckStoreProvider implements TransactionPendingAckStoreProvider {
    private static final Logger log = LoggerFactory.getLogger(MLPendingAckStoreProvider.class);
    private static volatile TxnLogBufferedWriterMetricsStats bufferedWriterMetrics = DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider$MLTxnPendingAckLogBufferedWriterMetrics.class */
    public static class MLTxnPendingAckLogBufferedWriterMetrics extends TxnLogBufferedWriterMetricsStats {
        private MLTxnPendingAckLogBufferedWriterMetrics(String str) {
            super("pulsar_txn_pending_ack_store", new String[]{"broker"}, new String[]{str}, CollectorRegistry.defaultRegistry);
        }
    }

    public static void initBufferedWriterMetrics(String str) {
        if (bufferedWriterMetrics != DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS) {
            return;
        }
        synchronized (MLPendingAckStoreProvider.class) {
            if (bufferedWriterMetrics != DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS) {
                return;
            }
            bufferedWriterMetrics = new MLTxnPendingAckLogBufferedWriterMetrics(str);
        }
    }

    public static void closeBufferedWriterMetrics() {
        synchronized (MLPendingAckStoreProvider.class) {
            if (bufferedWriterMetrics == DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS) {
                return;
            }
            bufferedWriterMetrics.close();
            bufferedWriterMetrics = DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
        }
    }

    @Override // org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider
    public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscription persistentSubscription) {
        CompletableFuture<PendingAckStore> completableFuture = new CompletableFuture<>();
        if (persistentSubscription == null) {
            completableFuture.completeExceptionally(new TransactionPendingAckException.TransactionPendingAckStoreProviderException("The subscription is null."));
            return completableFuture;
        }
        PersistentTopic persistentTopic = (PersistentTopic) persistentSubscription.getTopic();
        PulsarService pulsar = persistentTopic.getBrokerService().getPulsar();
        Timer brokerClientSharedTimer = pulsar.getBrokerClientSharedTimer();
        ServiceConfiguration configuration = pulsar.getConfiguration();
        TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
        txnLogBufferedWriterConfig.setBatchEnabled(configuration.isTransactionPendingAckBatchedWriteEnabled());
        txnLogBufferedWriterConfig.setBatchedWriteMaxRecords(configuration.getTransactionPendingAckBatchedWriteMaxRecords());
        txnLogBufferedWriterConfig.setBatchedWriteMaxSize(configuration.getTransactionPendingAckBatchedWriteMaxSize());
        txnLogBufferedWriterConfig.setBatchedWriteMaxDelayInMillis(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis());
        String transactionPendingAckStoreSuffix = MLPendingAckStore.getTransactionPendingAckStoreSuffix(persistentTopic.getName(), persistentSubscription.getName());
        persistentTopic.getBrokerService().getManagedLedgerFactory().asyncExists(TopicName.get(transactionPendingAckStoreSuffix).getPersistenceNamingEncoding()).thenAccept(bool -> {
            persistentTopic.getBrokerService().getManagedLedgerConfig(bool.booleanValue() ? TopicName.get(transactionPendingAckStoreSuffix) : TopicName.get(persistentTopic.getName())).thenAccept(managedLedgerConfig -> {
                managedLedgerConfig.setCreateIfMissing(true);
                persistentTopic.getBrokerService().getManagedLedgerFactory().asyncOpen(TopicName.get(transactionPendingAckStoreSuffix).getPersistenceNamingEncoding(), managedLedgerConfig, new AsyncCallbacks.OpenLedgerCallback() { // from class: org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider.1
                    public void openLedgerComplete(final ManagedLedger managedLedger, Object obj) {
                        managedLedger.asyncOpenCursor(MLPendingAckStore.getTransactionPendingAckStoreCursorName(), CommandSubscribe.InitialPosition.Earliest, new AsyncCallbacks.OpenCursorCallback() { // from class: org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider.1.1
                            public void openCursorComplete(ManagedCursor managedCursor, Object obj2) {
                                completableFuture.complete(new MLPendingAckStore(managedLedger, managedCursor, persistentSubscription.getCursor(), persistentTopic.getBrokerService().getPulsar().getConfiguration().getTransactionPendingAckLogIndexMinLag(), txnLogBufferedWriterConfig, brokerClientSharedTimer, MLPendingAckStoreProvider.bufferedWriterMetrics, persistentTopic.getBrokerService().getPulsar().getOrderedExecutor().chooseThread()));
                                if (MLPendingAckStoreProvider.log.isDebugEnabled()) {
                                    MLPendingAckStoreProvider.log.debug("{},{} open MLPendingAckStore cursor success", persistentTopic.getName(), persistentSubscription.getName());
                                }
                            }

                            public void openCursorFailed(ManagedLedgerException managedLedgerException, Object obj2) {
                                MLPendingAckStoreProvider.log.error("{},{} open MLPendingAckStore cursor failed.", new Object[]{persistentTopic.getName(), persistentSubscription.getName(), managedLedgerException});
                                completableFuture.completeExceptionally(managedLedgerException);
                            }
                        }, (Object) null);
                    }

                    public void openLedgerFailed(ManagedLedgerException managedLedgerException, Object obj) {
                        MLPendingAckStoreProvider.log.error("{}, {} open MLPendingAckStore managedLedger failed.", new Object[]{persistentTopic.getName(), persistentSubscription.getName(), managedLedgerException});
                        completableFuture.completeExceptionally(managedLedgerException);
                    }
                }, () -> {
                    return CompletableFuture.completedFuture(true);
                }, (Object) null);
            }).exceptionally(th -> {
                Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                log.error("[{}] [{}] Failed to get managedLedger config when init pending ack store!", new Object[]{persistentTopic, persistentSubscription, unwrapCompletionException});
                completableFuture.completeExceptionally(unwrapCompletionException);
                return null;
            });
        }).exceptionally(th -> {
            Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
            log.error("[{}] [{}] Failed to check the pending ack topic exist when init pending ack store!", new Object[]{persistentTopic, persistentSubscription, unwrapCompletionException});
            completableFuture.completeExceptionally(unwrapCompletionException);
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider
    public CompletableFuture<Boolean> checkInitializedBefore(PersistentSubscription persistentSubscription) {
        PersistentTopic persistentTopic = (PersistentTopic) persistentSubscription.getTopic();
        return persistentTopic.getBrokerService().getManagedLedgerFactory().asyncExists(TopicName.get(MLPendingAckStore.getTransactionPendingAckStoreSuffix(persistentTopic.getName(), persistentSubscription.getName())).getPersistenceNamingEncoding());
    }
}
