/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.buffer.impl;

import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionCursor;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.broker.transaction.buffer.impl.PersistentTransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionCursorImpl;
import org.apache.pulsar.common.api.proto.PulsarMarkers;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.impl.common.TxnID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentTransactionBuffer
extends PersistentTopic
implements TransactionBuffer {
    private static final Logger log = LoggerFactory.getLogger(PersistentTransactionBuffer.class);
    private TransactionCursor txnCursor = new TransactionCursorImpl();
    private ManagedCursor retentionCursor;

    public PersistentTransactionBuffer(String topic, ManagedLedger ledger, BrokerService brokerService) throws BrokerServiceException.NamingException, ManagedLedgerException {
        super(topic, ledger, brokerService);
        this.retentionCursor = ledger.newNonDurableCursor((Position)PositionImpl.earliest, "txn-buffer-retention");
    }

    @Override
    public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
        return this.txnCursor.getTxnMeta(txnID, false);
    }

    @Override
    public CompletableFuture<Void> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
        return this.publishMessage(txnId, buffer, sequenceId).thenCompose(position -> this.appendBuffer(txnId, (Position)position, sequenceId));
    }

    private CompletableFuture<Void> appendBuffer(TxnID txnID, Position position, long sequenceId) {
        return this.txnCursor.getTxnMeta(txnID, true).thenCompose(meta -> meta.appendEntry(sequenceId, position));
    }

    @Override
    public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long startSequenceId) {
        return this.txnCursor.getTxnMeta(txnID, false).thenCompose(this::createNewReader);
    }

    private CompletableFuture<TransactionBufferReader> createNewReader(TransactionMeta meta) {
        CompletableFuture<TransactionBufferReader> createReaderFuture = new CompletableFuture<TransactionBufferReader>();
        try {
            PersistentTransactionBufferReader reader = new PersistentTransactionBufferReader(meta, this.ledger);
            createReaderFuture.complete(reader);
        }
        catch (TransactionNotSealedException e) {
            createReaderFuture.completeExceptionally(e);
        }
        return createReaderFuture;
    }

    @Override
    public CompletableFuture<Void> commitTxn(TxnID txnID, long committedAtLedgerId, long committedAtEntryId) {
        return ((CompletableFuture)((CompletableFuture)this.txnCursor.getTxnMeta(txnID, false).thenApply(meta -> this.createCommitMarker((TransactionMeta)meta, committedAtLedgerId, committedAtEntryId))).thenCompose(marker -> this.publishMessage(txnID, marker.marker, marker.sequenceId))).thenCompose(position -> this.txnCursor.commitTxn(committedAtLedgerId, committedAtEntryId, txnID, (Position)position));
    }

    private Marker createCommitMarker(TransactionMeta meta, long committedAtLedgerId, long committedAtEntryId) {
        if (log.isDebugEnabled()) {
            log.debug("Transaction {} create a commit marker", (Object)meta.id());
        }
        long sequenceId = meta.lastSequenceId() + 1L;
        PulsarMarkers.MessageIdData messageIdData = PulsarMarkers.MessageIdData.newBuilder().setLedgerId(committedAtLedgerId).setEntryId(committedAtEntryId).build();
        ByteBuf commitMarker = Markers.newTxnCommitMarker((long)sequenceId, (long)meta.id().getMostSigBits(), (long)meta.id().getLeastSigBits(), (PulsarMarkers.MessageIdData)messageIdData);
        Marker marker = Marker.builder().sequenceId(sequenceId).marker(commitMarker).build();
        return marker;
    }

    @Override
    public CompletableFuture<Void> abortTxn(TxnID txnID) {
        return ((CompletableFuture)((CompletableFuture)this.txnCursor.getTxnMeta(txnID, false).thenApply(meta -> this.createAbortMarker((TransactionMeta)meta))).thenCompose(marker -> this.publishMessage(txnID, marker.marker, marker.sequenceId))).thenCompose(position -> this.txnCursor.abortTxn(txnID));
    }

    private Marker createAbortMarker(TransactionMeta meta) {
        if (log.isDebugEnabled()) {
            log.debug("Transaction {} create a abort marker", (Object)meta.id());
        }
        long sequenceId = meta.lastSequenceId() + 1L;
        ByteBuf abortMarker = Markers.newTxnAbortMarker((long)sequenceId, (long)meta.id().getMostSigBits(), (long)meta.id().getLeastSigBits());
        Marker marker = Marker.builder().sequenceId(sequenceId).marker(abortMarker).build();
        return marker;
    }

    private CompletableFuture<Position> publishMessage(TxnID txnID, ByteBuf msg, long sequenceId) {
        final CompletableFuture<Position> publishFuture = new CompletableFuture<Position>();
        this.publishMessage(msg, new TxnCtx(txnID.toString(), sequenceId, publishFuture){

            @Override
            public void completed(Exception e, long ledgerId, long entryId) {
                if (e != null) {
                    publishFuture.completeExceptionally(e);
                } else {
                    publishFuture.complete(PositionImpl.get((long)ledgerId, (long)entryId));
                }
            }
        });
        return publishFuture;
    }

    @Override
    public CompletableFuture<Void> purgeTxns(List<Long> dataLedgers) {
        if (log.isDebugEnabled()) {
            log.debug("Begin to purge the ledgers {}", dataLedgers);
        }
        List futures = dataLedgers.stream().map(dataLedger -> this.cleanTxnsOnLedger((long)dataLedger)).collect(Collectors.toList());
        return FutureUtil.waitForAll(futures).thenCompose(v -> this.removeCommittedLedgerFromIndex(dataLedgers));
    }

    private CompletableFuture<Void> removeCommittedLedgerFromIndex(List<Long> dataLedgers) {
        List removeFutures = dataLedgers.stream().map(dataLedger -> this.txnCursor.removeTxnsCommittedAtLedger((long)dataLedger)).collect(Collectors.toList());
        return FutureUtil.waitForAll(removeFutures);
    }

    private CompletableFuture<Void> cleanTxnsOnLedger(long dataledger) {
        if (log.isDebugEnabled()) {
            log.debug("Start to clean ledger {}", (Object)dataledger);
        }
        return this.txnCursor.getAllTxnsCommittedAtLedger(dataledger).thenCompose(txnIDS -> this.deleteTxns((Set<TxnID>)txnIDS));
    }

    private CompletableFuture<Void> deleteTxns(Set<TxnID> txnIDS) {
        if (log.isDebugEnabled()) {
            log.debug("Start delete txns {} under ledger", txnIDS);
        }
        List futures = txnIDS.stream().map(txnID -> this.deleteTxn((TxnID)txnID)).collect(Collectors.toList());
        return FutureUtil.waitForAll(futures);
    }

    private CompletableFuture<Void> deleteTxn(TxnID txnID) {
        if (log.isDebugEnabled()) {
            log.debug("Start to delete txn {} entries", (Object)txnID);
        }
        return ((CompletableFuture)this.txnCursor.getTxnMeta(txnID, false).thenCompose(meta -> meta.readEntries(meta.numEntries(), -1L))).thenCompose(longPositionSortedMap -> this.deleteEntries((SortedMap<Long, Position>)longPositionSortedMap, txnID));
    }

    private CompletableFuture<Void> deleteEntries(SortedMap<Long, Position> entriesMap, TxnID txnID) {
        if (log.isDebugEnabled()) {
            log.debug("Delete entries {}", entriesMap);
        }
        List deleteFutures = entriesMap.values().stream().map(position -> this.asyncDeletePosition((Position)position, txnID)).collect(Collectors.toList());
        return FutureUtil.waitForAll(deleteFutures);
    }

    private CompletableFuture<Void> asyncDeletePosition(final Position position, final TxnID txnID) {
        if (log.isDebugEnabled()) {
            log.debug("Ready to delete position {} for txn {}", (Object)position, (Object)txnID);
        }
        final CompletableFuture<Void> deleteFuture = new CompletableFuture<Void>();
        this.retentionCursor.asyncMarkDelete(position, new AsyncCallbacks.MarkDeleteCallback(){

            public void markDeleteComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("Success delete transaction `{}` entry on position {}", (Object)txnID, (Object)position);
                }
                deleteFuture.complete(null);
            }

            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                log.error("Failed delete transaction `{}` entry on position {}", new Object[]{txnID, position, exception});
                deleteFuture.completeExceptionally((Throwable)exception);
            }
        }, null);
        return deleteFuture;
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        return FutureUtil.failedFuture((Throwable)new UnsupportedOperationException());
    }

    static final class Marker {
        long sequenceId;
        ByteBuf marker;

        Marker(long sequenceId, ByteBuf marker) {
            this.sequenceId = sequenceId;
            this.marker = marker;
        }

        public static MarkerBuilder builder() {
            return new MarkerBuilder();
        }

        public static class MarkerBuilder {
            private long sequenceId;
            private ByteBuf marker;

            MarkerBuilder() {
            }

            public MarkerBuilder sequenceId(long sequenceId) {
                this.sequenceId = sequenceId;
                return this;
            }

            public MarkerBuilder marker(ByteBuf marker) {
                this.marker = marker;
                return this;
            }

            public Marker build() {
                return new Marker(this.sequenceId, this.marker);
            }

            public String toString() {
                return "PersistentTransactionBuffer.Marker.MarkerBuilder(sequenceId=" + this.sequenceId + ", marker=" + this.marker + ")";
            }
        }
    }

    static abstract class TxnCtx
    implements Topic.PublishContext {
        private final long sequenceId;
        private final CompletableFuture<Position> completableFuture;
        private final String producerName;

        TxnCtx(String producerName, long sequenceId, CompletableFuture<Position> future) {
            this.sequenceId = sequenceId;
            this.completableFuture = future;
            this.producerName = producerName;
        }

        @Override
        public String getProducerName() {
            return this.producerName;
        }

        @Override
        public long getSequenceId() {
            return this.sequenceId;
        }
    }
}

