/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.compaction;

import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.compaction.Compactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TwoPhaseCompactor
extends Compactor {
    private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
    private static final int MAX_OUTSTANDING = 500;
    private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";

    public TwoPhaseCompactor(ServiceConfiguration conf, PulsarClient pulsar, BookKeeper bk, ScheduledExecutorService scheduler) {
        super(conf, pulsar, bk, scheduler);
    }

    @Override
    protected CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk) {
        return this.phaseOne(reader).thenCompose(r -> this.phaseTwo(reader, r.from, r.to, r.latestForKey, bk));
    }

    private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
        HashMap<String, MessageId> latestForKey = new HashMap<String, MessageId>();
        CompletableFuture<PhaseOneResult> loopPromise = new CompletableFuture<PhaseOneResult>();
        this.phaseOneLoop(reader, Optional.empty(), Optional.empty(), latestForKey, loopPromise);
        return loopPromise;
    }

    private void phaseOneLoop(RawReader reader, Optional<MessageId> firstMessageId, Optional<MessageId> lastMessageId, Map<String, MessageId> latestForKey, CompletableFuture<PhaseOneResult> loopPromise) {
        if (loopPromise.isDone()) {
            return;
        }
        CompletableFuture<RawMessage> future = reader.readNextAsync();
        this.scheduleTimeout(future);
        future.whenComplete((m, exception) -> {
            try {
                if (exception != null) {
                    if (exception instanceof TimeoutException && firstMessageId.isPresent()) {
                        loopPromise.complete(new PhaseOneResult((MessageId)firstMessageId.get(), (MessageId)lastMessageId.get(), latestForKey));
                    } else {
                        loopPromise.completeExceptionally((Throwable)exception);
                    }
                    return;
                }
                MessageId id = m.getMessageId();
                String key = TwoPhaseCompactor.extractKey(m);
                latestForKey.put(key, id);
                this.phaseOneLoop(reader, Optional.of(firstMessageId.orElse(id)), Optional.of(id), latestForKey, loopPromise);
            }
            finally {
                m.close();
            }
        });
    }

    private void scheduleTimeout(CompletableFuture<RawMessage> future) {
        ScheduledFuture<?> timeout = this.scheduler.schedule(() -> future.completeExceptionally(new TimeoutException("Timeout")), 10L, TimeUnit.SECONDS);
        future.whenComplete((res, exception) -> timeout.cancel(true));
    }

    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, Map<String, MessageId> latestForKey, BookKeeper bk) {
        return this.createLedger(bk).thenCompose(ledger -> this.phaseTwoSeekThenLoop(reader, from, to, latestForKey, bk, (LedgerHandle)ledger));
    }

    private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId from, MessageId to, Map<String, MessageId> latestForKey, BookKeeper bk, LedgerHandle ledger) {
        CompletableFuture<Long> promise = new CompletableFuture<Long>();
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)reader.seekAsync(from).thenCompose(v -> {
            Semaphore outstanding = new Semaphore(500);
            CompletableFuture<Void> loopPromise = new CompletableFuture<Void>();
            this.phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise);
            return loopPromise;
        })).thenCompose(v -> this.closeLedger(ledger))).thenCompose(v -> reader.acknowledgeCumulativeAsync(to, (Map<String, Long>)ImmutableMap.of((Object)COMPACTED_TOPIC_LEDGER_PROPERTY, (Object)ledger.getId())))).whenComplete((res, exception) -> {
            if (exception != null) {
                this.deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
                    if (exception2 != null) {
                        log.warn("Cleanup of ledger {} for failed", (Object)ledger, exception2);
                    }
                    promise.completeExceptionally((Throwable)exception);
                });
            } else {
                promise.complete(ledger.getId());
            }
        });
        return promise;
    }

    private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey, LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) {
        reader.readNextAsync().whenComplete((m, exception) -> {
            try {
                if (exception != null) {
                    promise.completeExceptionally((Throwable)exception);
                    return;
                }
                if (promise.isDone()) {
                    return;
                }
                try {
                    MessageId id = m.getMessageId();
                    String key = TwoPhaseCompactor.extractKey(m);
                    if (((MessageId)latestForKey.get(key)).equals(id)) {
                        outstanding.acquire();
                        CompletionStage addFuture = this.addToCompactedLedger(lh, (RawMessage)m).whenComplete((res, exception2) -> {
                            outstanding.release();
                            if (exception2 != null) {
                                promise.completeExceptionally((Throwable)exception2);
                            }
                        });
                        if (to.equals(id)) {
                            ((CompletableFuture)addFuture).whenComplete((res, exception2) -> {
                                if (exception2 == null) {
                                    promise.complete(null);
                                }
                            });
                        }
                    }
                    this.phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    promise.completeExceptionally(ie);
                }
            }
            finally {
                m.close();
            }
        });
    }

    private CompletableFuture<LedgerHandle> createLedger(BookKeeper bk) {
        CompletableFuture<LedgerHandle> bkf = new CompletableFuture<LedgerHandle>();
        bk.asyncCreateLedger(this.conf.getManagedLedgerDefaultEnsembleSize(), this.conf.getManagedLedgerDefaultWriteQuorum(), this.conf.getManagedLedgerDefaultAckQuorum(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD, (rc, ledger, ctx) -> {
            if (rc != 0) {
                bkf.completeExceptionally((Throwable)BKException.create((int)rc));
            } else {
                bkf.complete(ledger);
            }
        }, null);
        return bkf;
    }

    private CompletableFuture<Void> deleteLedger(BookKeeper bk, LedgerHandle lh) {
        CompletableFuture<Void> bkf = new CompletableFuture<Void>();
        bk.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
            if (rc != 0) {
                bkf.completeExceptionally((Throwable)BKException.create((int)rc));
            } else {
                bkf.complete(null);
            }
        }, null);
        return bkf;
    }

    private CompletableFuture<Void> closeLedger(LedgerHandle lh) {
        CompletableFuture<Void> bkf = new CompletableFuture<Void>();
        lh.asyncClose((rc, ledger, ctx) -> {
            if (rc != 0) {
                bkf.completeExceptionally((Throwable)BKException.create((int)rc));
            } else {
                bkf.complete(null);
            }
        }, null);
        return bkf;
    }

    private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, RawMessage m) {
        CompletableFuture<Void> bkf = new CompletableFuture<Void>();
        ByteBuf serialized = m.serialize();
        lh.asyncAddEntry(serialized, (rc, ledger, eid, ctx) -> {
            if (rc != 0) {
                bkf.completeExceptionally((Throwable)BKException.create((int)rc));
            } else {
                bkf.complete(null);
            }
        }, null);
        serialized.release();
        return bkf;
    }

    private static String extractKey(RawMessage m) {
        ByteBuf headersAndPayload = m.getHeadersAndPayload();
        PulsarApi.MessageMetadata msgMetadata = Commands.parseMessageMetadata((ByteBuf)headersAndPayload);
        return msgMetadata.getPartitionKey();
    }

    private static class PhaseOneResult {
        final MessageId from;
        final MessageId to;
        final Map<String, MessageId> latestForKey;

        PhaseOneResult(MessageId from, MessageId to, Map<String, MessageId> latestForKey) {
            this.from = from;
            this.to = to;
            this.latestForKey = latestForKey;
        }
    }
}

