/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionMetadataStoreService {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionMetadataStoreService.class);
    private final Map<TransactionCoordinatorID, TransactionMetadataStore> stores;
    private final TransactionMetadataStoreProvider transactionMetadataStoreProvider;
    private final PulsarService pulsarService;
    private final TransactionBufferClient tbClient;
    private final TransactionTimeoutTrackerFactory timeoutTrackerFactory;
    private static final long endTransactionRetryIntervalTime = 1000L;
    private final Timer transactionOpRetryTimer;

    public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider, PulsarService pulsarService, TransactionBufferClient tbClient, HashedWheelTimer timer) {
        this.pulsarService = pulsarService;
        this.stores = new ConcurrentHashMap<TransactionCoordinatorID, TransactionMetadataStore>();
        this.transactionMetadataStoreProvider = transactionMetadataStoreProvider;
        this.tbClient = tbClient;
        this.timeoutTrackerFactory = new TransactionTimeoutTrackerFactoryImpl(this, timer);
        this.transactionOpRetryTimer = timer;
    }

    public void start() {
        this.pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener(){

            @Override
            public void onLoad(NamespaceBundle bundle) {
                TransactionMetadataStoreService.this.pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete((topics, ex) -> {
                    if (ex == null) {
                        for (String topic : topics) {
                            TopicName name = TopicName.get((String)topic);
                            if (!TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName().equals(TopicName.get((String)name.getPartitionedTopicName()).getLocalName()) || !name.isPartitioned()) continue;
                            TransactionMetadataStoreService.this.addTransactionMetadataStore(TransactionCoordinatorID.get((long)name.getPartitionIndex()));
                        }
                    } else {
                        LOG.error("Failed to get owned topic list when triggering on-loading bundle {}.", (Object)bundle, ex);
                    }
                });
            }

            @Override
            public void unLoad(NamespaceBundle bundle) {
                TransactionMetadataStoreService.this.pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete((topics, ex) -> {
                    if (ex == null) {
                        for (String topic : topics) {
                            TopicName name = TopicName.get((String)topic);
                            if (!TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName().equals(TopicName.get((String)name.getPartitionedTopicName()).getLocalName()) || !name.isPartitioned()) continue;
                            TransactionMetadataStoreService.this.removeTransactionMetadataStore(TransactionCoordinatorID.get((long)name.getPartitionIndex()));
                        }
                    } else {
                        LOG.error("Failed to get owned topic list error when triggering un-loading bundle {}.", (Object)bundle, ex);
                    }
                });
            }

            @Override
            public boolean test(NamespaceBundle namespaceBundle) {
                return namespaceBundle.getNamespaceObject().equals((Object)NamespaceName.SYSTEM_NAMESPACE);
            }
        });
    }

    public void addTransactionMetadataStore(TransactionCoordinatorID tcId) {
        this.pulsarService.getBrokerService().getManagedLedgerConfig(TopicName.get((String)("__transaction_log_" + tcId))).whenComplete((v, e) -> {
            if (e != null) {
                LOG.error("Add transaction metadata store with id {} error", (Object)tcId.getId(), e);
            } else {
                TransactionTimeoutTracker timeoutTracker = this.timeoutTrackerFactory.newTracker(tcId);
                TransactionRecoverTrackerImpl recoverTracker = new TransactionRecoverTrackerImpl(this, timeoutTracker, tcId.getId());
                this.transactionMetadataStoreProvider.openStore(tcId, this.pulsarService.getManagedLedgerFactory(), v, timeoutTracker, (TransactionRecoverTracker)recoverTracker).whenComplete((store, ex) -> {
                    if (ex != null) {
                        LOG.error("Add transaction metadata store with id {} error", (Object)tcId.getId(), ex);
                    } else {
                        this.stores.put(tcId, (TransactionMetadataStore)store);
                        LOG.info("Added new transaction meta store {}", (Object)tcId);
                    }
                });
            }
        });
    }

    public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
        TransactionMetadataStore metadataStore = this.stores.remove(tcId);
        if (metadataStore != null) {
            metadataStore.closeAsync().whenComplete((v, ex) -> {
                if (ex != null) {
                    LOG.error("Close transaction metadata store with id " + tcId, ex);
                } else {
                    LOG.info("Removed and closed transaction meta store {}", (Object)tcId);
                }
            });
        }
    }

    public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills) {
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.newTransaction(timeoutInMills);
    }

    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<String> partitions) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.addProducedPartitionToTxn(txnId, partitions);
    }

    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnId, List<TransactionSubscription> partitions) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.addAckedPartitionToTxn(txnId, partitions);
    }

    public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnId) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.getTxnMeta(txnId);
    }

    public long getLowWaterMark(TxnID txnID) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnID);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return 0L;
        }
        return store.getLowWaterMark();
    }

    public CompletableFuture<Void> updateTxnStatus(TxnID txnId, TxnStatus newStatus, TxnStatus expectedStatus, boolean isTimeout) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.updateTxnStatus(txnId, newStatus, expectedStatus, isTimeout);
    }

    public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolean isTimeout) {
        TxnStatus newStatus;
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        switch (txnAction) {
            case 0: {
                newStatus = TxnStatus.COMMITTING;
                break;
            }
            case 1: {
                newStatus = TxnStatus.ABORTING;
                break;
            }
            default: {
                UnsupportedTxnActionException exception = new UnsupportedTxnActionException(txnID, txnAction);
                LOG.error(exception.getMessage());
                completableFuture.completeExceptionally(exception);
                return completableFuture;
            }
        }
        ((CompletableFuture)this.getTxnMeta(txnID).thenAccept(txnMeta -> {
            TxnStatus txnStatus = txnMeta.status();
            if (txnStatus == TxnStatus.OPEN) {
                ((CompletableFuture)this.updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout).thenAccept(v -> ((CompletableFuture)this.endTxnInTransactionBuffer(txnID, txnAction).thenAccept(a -> completableFuture.complete(null))).exceptionally(e -> {
                    if (!TransactionMetadataStoreService.isRetryableException(e.getCause())) {
                        LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, e});
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, e});
                        }
                        this.transactionOpRetryTimer.newTimeout(timeout -> this.endTransaction(txnID, txnAction, isTimeout), 1000L, TimeUnit.MILLISECONDS);
                    }
                    completableFuture.completeExceptionally((Throwable)e);
                    return null;
                }))).exceptionally(e -> {
                    if (!TransactionMetadataStoreService.isRetryableException(e.getCause())) {
                        LOG.error("EndTransaction UpdateTxnStatus fail! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, e});
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("EndTransaction UpdateTxnStatus op retry! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, e});
                        }
                        this.transactionOpRetryTimer.newTimeout(timeout -> this.endTransaction(txnID, txnAction, isTimeout), 1000L, TimeUnit.MILLISECONDS);
                    }
                    completableFuture.completeExceptionally((Throwable)e);
                    return null;
                });
            } else if (txnStatus == TxnStatus.COMMITTING && txnAction == TxnAction.COMMIT.getValue() || txnStatus == TxnStatus.ABORTING && txnAction == TxnAction.ABORT.getValue()) {
                ((CompletableFuture)this.endTxnInTransactionBuffer(txnID, txnAction).thenAccept(k -> completableFuture.complete(null))).exceptionally(e -> {
                    if (TransactionMetadataStoreService.isRetryableException(e.getCause())) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, e});
                        }
                        this.transactionOpRetryTimer.newTimeout(timeout -> this.endTransaction(txnID, txnAction, isTimeout), 1000L, TimeUnit.MILLISECONDS);
                    } else {
                        LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, e});
                    }
                    completableFuture.completeExceptionally((Throwable)e);
                    return null;
                });
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, TxnAction : {}", (Object)txnID, (Object)txnAction);
                }
                completableFuture.completeExceptionally((Throwable)new CoordinatorException.InvalidTxnStatusException(txnID, newStatus, txnStatus));
            }
        })).exceptionally(e -> {
            if (TransactionMetadataStoreService.isRetryableException(e.getCause())) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("End transaction op retry! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, e});
                }
                this.transactionOpRetryTimer.newTimeout(timeout -> this.endTransaction(txnID, txnAction, isTimeout), 1000L, TimeUnit.MILLISECONDS);
            }
            completableFuture.completeExceptionally((Throwable)e);
            return null;
        });
        return completableFuture;
    }

    public void endTransactionForTimeout(TxnID txnID) {
        ((CompletableFuture)this.getTxnMeta(txnID).thenCompose(txnMeta -> {
            if (txnMeta.status() == TxnStatus.OPEN) {
                return this.endTransaction(txnID, 1, true);
            }
            return null;
        })).exceptionally(e -> {
            if (TransactionMetadataStoreService.isRetryableException(e.getCause())) {
                this.endTransaction(txnID, 1, true);
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("Transaction have been handle complete, don't need to handle by transaction timeout! TxnId : {}", (Object)txnID);
            }
            return null;
        });
    }

    private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAction) {
        CompletableFuture resultFuture = new CompletableFuture();
        ArrayList completableFutureList = new ArrayList();
        this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
            if (throwable != null) {
                resultFuture.completeExceptionally((Throwable)throwable);
                return;
            }
            long lowWaterMark = this.getLowWaterMark(txnID);
            txnMeta.ackedPartitions().forEach(tbSub -> {
                CompletableFuture actionFuture = new CompletableFuture();
                if (0 == txnAction) {
                    actionFuture = this.tbClient.commitTxnOnSubscription(tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                } else if (1 == txnAction) {
                    actionFuture = this.tbClient.abortTxnOnSubscription(tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                } else {
                    actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
                }
                completableFutureList.add(actionFuture);
            });
            txnMeta.producedPartitions().forEach(partition -> {
                CompletableFuture actionFuture = new CompletableFuture();
                if (0 == txnAction) {
                    actionFuture = this.tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                } else if (1 == txnAction) {
                    actionFuture = this.tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                } else {
                    actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
                }
                completableFutureList.add(actionFuture);
            });
            try {
                FutureUtil.waitForAll((List)completableFutureList).whenComplete((ignored, waitThrowable) -> {
                    if (waitThrowable != null) {
                        resultFuture.completeExceptionally((Throwable)waitThrowable);
                        return;
                    }
                    resultFuture.complete(null);
                });
            }
            catch (Exception e) {
                resultFuture.completeExceptionally(e);
            }
        });
        return resultFuture.thenCompose(future -> this.endTxnInTransactionMetadataStore(txnID, txnAction));
    }

    private static boolean isRetryableException(Throwable e) {
        return e instanceof CoordinatorException.TransactionMetadataStoreStateException || e instanceof TransactionBufferClientException.RequestTimeoutException || e instanceof ManagedLedgerException || e instanceof PulsarClientException.BrokerPersistenceException || e instanceof PulsarClientException.LookupException || e instanceof TransactionBufferClientException.ReachMaxPendingOpsException || e instanceof PulsarClientException.ConnectException;
    }

    private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID txnID, int txnAction) {
        if (TxnAction.COMMIT.getValue() == txnAction) {
            return this.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false);
        }
        if (TxnAction.ABORT.getValue() == txnAction) {
            return this.updateTxnStatus(txnID, TxnStatus.ABORTED, TxnStatus.ABORTING, false);
        }
        return FutureUtil.failedFuture((Throwable)new CoordinatorException.InvalidTxnStatusException("Unsupported txnAction " + txnAction));
    }

    private TransactionCoordinatorID getTcIdFromTxnId(TxnID txnId) {
        return new TransactionCoordinatorID(txnId.getMostSigBits());
    }

    @VisibleForTesting
    public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() {
        return Collections.unmodifiableMap(this.stores);
    }
}

