/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionMetadataStoreService {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionMetadataStoreService.class);
    private final Map<TransactionCoordinatorID, TransactionMetadataStore> stores;
    private final TransactionMetadataStoreProvider transactionMetadataStoreProvider;
    private final PulsarService pulsarService;
    private final TransactionBufferClient tbClient;

    public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider, PulsarService pulsarService, TransactionBufferClient tbClient) {
        this.pulsarService = pulsarService;
        this.stores = new ConcurrentHashMap<TransactionCoordinatorID, TransactionMetadataStore>();
        this.transactionMetadataStoreProvider = transactionMetadataStoreProvider;
        this.tbClient = tbClient;
    }

    public void start() {
        this.pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener(){

            @Override
            public void onLoad(NamespaceBundle bundle) {
                TransactionMetadataStoreService.this.pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete((topics, ex) -> {
                    if (ex == null) {
                        for (String topic : topics) {
                            TopicName name = TopicName.get((String)topic);
                            if (!TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName().equals(TopicName.get((String)name.getPartitionedTopicName()).getLocalName()) || !name.isPartitioned()) continue;
                            TransactionMetadataStoreService.this.addTransactionMetadataStore(TransactionCoordinatorID.get((long)name.getPartitionIndex()));
                        }
                    } else {
                        LOG.error("Failed to get owned topic list when triggering on-loading bundle {}.", (Object)bundle, ex);
                    }
                });
            }

            @Override
            public void unLoad(NamespaceBundle bundle) {
                TransactionMetadataStoreService.this.pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete((topics, ex) -> {
                    if (ex == null) {
                        for (String topic : topics) {
                            TopicName name = TopicName.get((String)topic);
                            if (!TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName().equals(TopicName.get((String)name.getPartitionedTopicName()).getLocalName()) || !name.isPartitioned()) continue;
                            TransactionMetadataStoreService.this.removeTransactionMetadataStore(TransactionCoordinatorID.get((long)name.getPartitionIndex()));
                        }
                    } else {
                        LOG.error("Failed to get owned topic list error when triggering un-loading bundle {}.", (Object)bundle, ex);
                    }
                });
            }

            @Override
            public boolean test(NamespaceBundle namespaceBundle) {
                return namespaceBundle.getNamespaceObject().equals((Object)NamespaceName.SYSTEM_NAMESPACE);
            }
        });
    }

    public void addTransactionMetadataStore(TransactionCoordinatorID tcId) {
        this.transactionMetadataStoreProvider.openStore(tcId, this.pulsarService.getManagedLedgerFactory()).whenComplete((store, ex) -> {
            if (ex != null) {
                LOG.error("Add transaction metadata store with id {} error", (Object)tcId.getId(), ex);
            } else {
                this.stores.put(tcId, (TransactionMetadataStore)store);
                LOG.info("Added new transaction meta store {}", (Object)tcId);
            }
        });
    }

    public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
        TransactionMetadataStore metadataStore = this.stores.remove(tcId);
        if (metadataStore != null) {
            metadataStore.closeAsync().whenComplete((v, ex) -> {
                if (ex != null) {
                    LOG.error("Close transaction metadata store with id " + tcId, ex);
                } else {
                    LOG.info("Removed and closed transaction meta store {}", (Object)tcId);
                }
            });
        }
    }

    public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills) {
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.newTransaction(timeoutInMills);
    }

    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<String> partitions) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.addProducedPartitionToTxn(txnId, partitions);
    }

    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnId, List<TransactionSubscription> partitions) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.addAckedPartitionToTxn(txnId, partitions);
    }

    public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnId) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.getTxnMeta(txnId);
    }

    public CompletableFuture<Void> updateTxnStatus(TxnID txnId, PulsarTransactionMetadata.TxnStatus newStatus, PulsarTransactionMetadata.TxnStatus expectedStatus) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.updateTxnStatus(txnId, newStatus, expectedStatus);
    }

    public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, List<PulsarApi.MessageIdData> messageIdDataList) {
        PulsarTransactionMetadata.TxnStatus newStatus;
        CompletionStage completableFuture = new CompletableFuture();
        switch (txnAction) {
            case 0: {
                newStatus = PulsarTransactionMetadata.TxnStatus.COMMITTING;
                break;
            }
            case 1: {
                newStatus = PulsarTransactionMetadata.TxnStatus.ABORTING;
                break;
            }
            default: {
                UnsupportedTxnActionException exception = new UnsupportedTxnActionException(txnID, txnAction);
                LOG.error(exception.getMessage());
                completableFuture.completeExceptionally(exception);
                return completableFuture;
            }
        }
        completableFuture = this.updateTxnStatus(txnID, newStatus, PulsarTransactionMetadata.TxnStatus.OPEN).thenCompose(ignored -> this.endTxnInTransactionBuffer(txnID, txnAction, messageIdDataList));
        if (PulsarTransactionMetadata.TxnStatus.COMMITTING.equals((Object)newStatus)) {
            completableFuture = completableFuture.thenCompose(ignored -> this.updateTxnStatus(txnID, PulsarTransactionMetadata.TxnStatus.COMMITTED, PulsarTransactionMetadata.TxnStatus.COMMITTING));
        } else if (PulsarTransactionMetadata.TxnStatus.ABORTING.equals((Object)newStatus)) {
            completableFuture = completableFuture.thenCompose(ignored -> this.updateTxnStatus(txnID, PulsarTransactionMetadata.TxnStatus.ABORTED, PulsarTransactionMetadata.TxnStatus.ABORTING));
        }
        return completableFuture;
    }

    private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAction, List<PulsarApi.MessageIdData> messageIdDataList) {
        CompletableFuture<Void> resultFuture = new CompletableFuture<Void>();
        ArrayList completableFutureList = new ArrayList();
        this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
            if (throwable != null) {
                resultFuture.completeExceptionally((Throwable)throwable);
                return;
            }
            txnMeta.ackedPartitions().forEach(tbSub -> {
                CompletableFuture actionFuture = new CompletableFuture();
                if (0 == txnAction) {
                    actionFuture = this.tbClient.commitTxnOnSubscription(tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
                } else if (1 == txnAction) {
                    actionFuture = this.tbClient.abortTxnOnSubscription(tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
                } else {
                    actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
                }
                completableFutureList.add(actionFuture);
            });
            ArrayList<MessageIdImpl> messageIdList = new ArrayList<MessageIdImpl>();
            for (PulsarApi.MessageIdData messageIdData : messageIdDataList) {
                messageIdList.add(new MessageIdImpl(messageIdData.getLedgerId(), messageIdData.getEntryId(), messageIdData.getPartition()));
                messageIdData.recycle();
            }
            txnMeta.producedPartitions().forEach(partition -> {
                CompletableFuture actionFuture = new CompletableFuture();
                if (0 == txnAction) {
                    actionFuture = this.tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits(), messageIdList.stream().filter(msg -> ((MessageIdImpl)msg).getPartitionIndex() == TopicName.get((String)partition).getPartitionIndex()).collect(Collectors.toList()));
                } else if (1 == txnAction) {
                    actionFuture = this.tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits(), messageIdList.stream().filter(msg -> ((MessageIdImpl)msg).getPartitionIndex() == TopicName.get((String)partition).getPartitionIndex()).collect(Collectors.toList()));
                } else {
                    actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
                }
                completableFutureList.add(actionFuture);
            });
            try {
                FutureUtil.waitForAll((List)completableFutureList).whenComplete((ignored, waitThrowable) -> {
                    if (waitThrowable != null) {
                        resultFuture.completeExceptionally((Throwable)waitThrowable);
                        return;
                    }
                    resultFuture.complete(null);
                });
            }
            catch (Exception e) {
                resultFuture.completeExceptionally(e);
            }
        });
        return resultFuture;
    }

    private TransactionCoordinatorID getTcIdFromTxnId(TxnID txnId) {
        return new TransactionCoordinatorID(txnId.getMostSigBits());
    }

    public TransactionMetadataStoreProvider getTransactionMetadataStoreProvider() {
        return this.transactionMetadataStoreProvider;
    }

    @VisibleForTesting
    public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() {
        return Collections.unmodifiableMap(this.stores);
    }
}

