/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.compaction;

import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.compaction.Compactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TwoPhaseCompactor
extends Compactor {
    private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
    private static final int MAX_OUTSTANDING = 500;
    private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";

    public TwoPhaseCompactor(ServiceConfiguration conf, PulsarClient pulsar, BookKeeper bk, ScheduledExecutorService scheduler) {
        super(conf, pulsar, bk, scheduler);
    }

    @Override
    protected CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk) {
        return this.phaseOne(reader).thenCompose(r -> this.phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk));
    }

    private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
        HashMap latestForKey = new HashMap();
        CompletableFuture<PhaseOneResult> loopPromise = new CompletableFuture<PhaseOneResult>();
        reader.getLastMessageIdAsync().whenComplete((lastMessageId, exception) -> {
            if (exception != null) {
                loopPromise.completeExceptionally((Throwable)exception);
            } else {
                log.info("Commencing phase one of compaction for {}, reading to {}", (Object)reader.getTopic(), lastMessageId);
                this.phaseOneLoop(reader, Optional.empty(), Optional.empty(), (MessageId)lastMessageId, latestForKey, loopPromise);
            }
        });
        return loopPromise;
    }

    private void phaseOneLoop(RawReader reader, Optional<MessageId> firstMessageId, Optional<MessageId> toMessageId, MessageId lastMessageId, Map<String, MessageId> latestForKey, CompletableFuture<PhaseOneResult> loopPromise) {
        if (loopPromise.isDone()) {
            return;
        }
        CompletableFuture<RawMessage> future = reader.readNextAsync();
        this.scheduleTimeout(future);
        future.whenCompleteAsync((m, exception) -> {
            try {
                MessageId to;
                if (exception != null) {
                    loopPromise.completeExceptionally((Throwable)exception);
                    return;
                }
                MessageId id = m.getMessageId();
                boolean deletedMessage = false;
                if (RawBatchConverter.isReadableBatch(m)) {
                    try {
                        RawBatchConverter.extractIdsAndKeys(m).forEach(e -> {
                            MessageId messageId = latestForKey.put((String)e.getRight(), (MessageId)e.getLeft());
                        });
                    }
                    catch (IOException ioe) {
                        log.info("Error decoding batch for message {}. Whole batch will be included in output", (Object)id, (Object)ioe);
                    }
                } else {
                    Pair<String, Integer> keyAndSize = TwoPhaseCompactor.extractKeyAndSize(m);
                    if (keyAndSize != null) {
                        if ((Integer)keyAndSize.getRight() > 0) {
                            latestForKey.put((String)keyAndSize.getLeft(), id);
                        } else {
                            deletedMessage = true;
                            latestForKey.remove(keyAndSize.getLeft());
                        }
                    }
                }
                MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
                MessageId messageId2 = to = deletedMessage ? (MessageId)toMessageId.orElse(null) : id;
                if (id.compareTo((Object)lastMessageId) == 0) {
                    loopPromise.complete(new PhaseOneResult(first, to, lastMessageId, latestForKey));
                } else {
                    this.phaseOneLoop(reader, Optional.ofNullable(first), Optional.ofNullable(to), lastMessageId, latestForKey, loopPromise);
                }
            }
            finally {
                m.close();
            }
        }, (Executor)this.scheduler);
    }

    private void scheduleTimeout(CompletableFuture<RawMessage> future) {
        ScheduledFuture<?> timeout = this.scheduler.schedule(() -> future.completeExceptionally(new TimeoutException("Timeout")), 10L, TimeUnit.SECONDS);
        future.whenComplete((res, exception) -> timeout.cancel(true));
    }

    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId, Map<String, MessageId> latestForKey, BookKeeper bk) {
        ImmutableMap metadata = ImmutableMap.of((Object)"compactedTopic", (Object)reader.getTopic().getBytes(StandardCharsets.UTF_8), (Object)"compactedTo", (Object)to.toByteArray());
        return this.createLedger(bk, (Map<String, byte[]>)metadata).thenCompose(ledger -> {
            log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}", new Object[]{reader.getTopic(), from, to, latestForKey.size(), ledger.getId()});
            return this.phaseTwoSeekThenLoop(reader, from, to, lastReadId, latestForKey, bk, (LedgerHandle)ledger);
        });
    }

    private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId from, MessageId to, MessageId lastReadId, Map<String, MessageId> latestForKey, BookKeeper bk, LedgerHandle ledger) {
        CompletableFuture<Long> promise = new CompletableFuture<Long>();
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)reader.seekAsync(from).thenCompose(v -> {
            Semaphore outstanding = new Semaphore(500);
            CompletableFuture<Void> loopPromise = new CompletableFuture<Void>();
            this.phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise);
            return loopPromise;
        })).thenCompose(v -> this.closeLedger(ledger))).thenCompose(v -> reader.acknowledgeCumulativeAsync(lastReadId, (Map<String, Long>)ImmutableMap.of((Object)COMPACTED_TOPIC_LEDGER_PROPERTY, (Object)ledger.getId())))).whenComplete((res, exception) -> {
            if (exception != null) {
                this.deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
                    if (exception2 != null) {
                        log.warn("Cleanup of ledger {} for failed", (Object)ledger, exception2);
                    }
                    promise.completeExceptionally((Throwable)exception);
                });
            } else {
                promise.complete(ledger.getId());
            }
        });
        return promise;
    }

    private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey, LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) {
        reader.readNextAsync().whenCompleteAsync((m, exception) -> {
            if (exception != null) {
                promise.completeExceptionally((Throwable)exception);
                return;
            }
            if (promise.isDone()) {
                return;
            }
            MessageId id = m.getMessageId();
            Optional<Object> messageToAdd = Optional.empty();
            if (RawBatchConverter.isReadableBatch(m)) {
                try {
                    messageToAdd = RawBatchConverter.rebatchMessage(m, (key, subid) -> ((MessageId)latestForKey.get(key)).equals(subid));
                }
                catch (IOException ioe) {
                    log.info("Error decoding batch for message {}. Whole batch will be included in output", (Object)id, (Object)ioe);
                    messageToAdd = Optional.of(m);
                }
            } else {
                Pair<String, Integer> keyAndSize = TwoPhaseCompactor.extractKeyAndSize(m);
                if (keyAndSize == null) {
                    messageToAdd = Optional.of(m);
                } else {
                    MessageId msg = (MessageId)latestForKey.get(keyAndSize.getLeft());
                    if (msg != null && msg.equals(id)) {
                        if ((Integer)keyAndSize.getRight() <= 0) {
                            promise.completeExceptionally(new IllegalArgumentException("Compaction phase found empty record from sorted key-map"));
                        }
                        messageToAdd = Optional.of(m);
                    } else {
                        m.close();
                        if (to.equals(id)) {
                            promise.complete(null);
                        }
                    }
                }
            }
            messageToAdd.ifPresent(toAdd -> {
                try {
                    outstanding.acquire();
                    CompletionStage addFuture = this.addToCompactedLedger(lh, (RawMessage)toAdd).whenComplete((res, exception2) -> {
                        outstanding.release();
                        if (exception2 != null) {
                            promise.completeExceptionally((Throwable)exception2);
                        }
                    });
                    if (to.equals(id)) {
                        ((CompletableFuture)addFuture).whenComplete((res, exception2) -> {
                            if (exception2 == null) {
                                promise.complete(null);
                            }
                        });
                    }
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    promise.completeExceptionally(ie);
                }
            });
            this.phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
        }, (Executor)this.scheduler);
    }

    private CompletableFuture<LedgerHandle> createLedger(BookKeeper bk, Map<String, byte[]> metadata) {
        CompletableFuture<LedgerHandle> bkf = new CompletableFuture<LedgerHandle>();
        bk.asyncCreateLedger(this.conf.getManagedLedgerDefaultEnsembleSize(), this.conf.getManagedLedgerDefaultWriteQuorum(), this.conf.getManagedLedgerDefaultAckQuorum(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD, (rc, ledger, ctx) -> {
            if (rc != 0) {
                bkf.completeExceptionally((Throwable)BKException.create((int)rc));
            } else {
                bkf.complete(ledger);
            }
        }, null, metadata);
        return bkf;
    }

    private CompletableFuture<Void> deleteLedger(BookKeeper bk, LedgerHandle lh) {
        CompletableFuture<Void> bkf = new CompletableFuture<Void>();
        bk.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
            if (rc != 0) {
                bkf.completeExceptionally((Throwable)BKException.create((int)rc));
            } else {
                bkf.complete(null);
            }
        }, null);
        return bkf;
    }

    private CompletableFuture<Void> closeLedger(LedgerHandle lh) {
        CompletableFuture<Void> bkf = new CompletableFuture<Void>();
        lh.asyncClose((rc, ledger, ctx) -> {
            if (rc != 0) {
                bkf.completeExceptionally((Throwable)BKException.create((int)rc));
            } else {
                bkf.complete(null);
            }
        }, null);
        return bkf;
    }

    private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, RawMessage m) {
        CompletableFuture<Void> bkf = new CompletableFuture<Void>();
        ByteBuf serialized = m.serialize();
        lh.asyncAddEntry(serialized, (rc, ledger, eid, ctx) -> {
            if (rc != 0) {
                bkf.completeExceptionally((Throwable)BKException.create((int)rc));
            } else {
                bkf.complete(null);
            }
        }, null);
        serialized.release();
        return bkf;
    }

    private static Pair<String, Integer> extractKeyAndSize(RawMessage m) {
        ByteBuf headersAndPayload = m.getHeadersAndPayload();
        PulsarApi.MessageMetadata msgMetadata = Commands.parseMessageMetadata((ByteBuf)headersAndPayload);
        try {
            if (msgMetadata.hasPartitionKey()) {
                int size = headersAndPayload.readableBytes();
                if (msgMetadata.hasUncompressedSize()) {
                    size = msgMetadata.getUncompressedSize();
                }
                Pair pair = Pair.of((Object)msgMetadata.getPartitionKey(), (Object)size);
                return pair;
            }
            return null;
        }
        finally {
            msgMetadata.recycle();
        }
    }

    private static class PhaseOneResult {
        final MessageId from;
        final MessageId to;
        final MessageId lastReadId;
        final Map<String, MessageId> latestForKey;

        PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId, Map<String, MessageId> latestForKey) {
            this.from = from;
            this.to = to;
            this.lastReadId = lastReadId;
            this.latestForKey = latestForKey;
        }
    }
}

