/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.compaction;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.time.Duration;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.CompactionReaderImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StrategicTwoPhaseCompactor
extends TwoPhaseCompactor {
    private static final Logger log = LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
    private static final int MAX_OUTSTANDING = 500;
    private static final int MAX_NUM_MESSAGES_IN_BATCH = 1000;
    private static final int MAX_BYTES_IN_BATCH = 131072;
    private static final int MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS = 20000;
    private final Duration phaseOneLoopReadTimeout;
    private final RawBatchMessageContainerImpl batchMessageContainer;

    @VisibleForTesting
    public StrategicTwoPhaseCompactor(ServiceConfiguration conf, PulsarClient pulsar, BookKeeper bk, ScheduledExecutorService scheduler, int maxNumMessagesInBatch) {
        this(conf, pulsar, bk, scheduler, maxNumMessagesInBatch, 131072);
    }

    private StrategicTwoPhaseCompactor(ServiceConfiguration conf, PulsarClient pulsar, BookKeeper bk, ScheduledExecutorService scheduler, int maxNumMessagesInBatch, int maxBytesInBatch) {
        super(conf, pulsar, bk, scheduler);
        this.batchMessageContainer = new RawBatchMessageContainerImpl(maxNumMessagesInBatch, maxBytesInBatch);
        this.phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
    }

    public StrategicTwoPhaseCompactor(ServiceConfiguration conf, PulsarClient pulsar, BookKeeper bk, ScheduledExecutorService scheduler) {
        this(conf, pulsar, bk, scheduler, 1000, 131072);
    }

    @Override
    public CompletableFuture<Long> compact(String topic) {
        throw new UnsupportedOperationException();
    }

    public <T> CompletableFuture<Long> compact(String topic, TopicCompactionStrategy<T> strategy) {
        return this.compact(topic, strategy, null);
    }

    public <T> CompletableFuture<Long> compact(String topic, TopicCompactionStrategy<T> strategy, CryptoKeyReader cryptoKeyReader) {
        CompletableFuture consumerFuture = new CompletableFuture();
        if (cryptoKeyReader != null) {
            this.batchMessageContainer.setCryptoKeyReader(cryptoKeyReader);
        }
        CompactionReaderImpl reader = CompactionReaderImpl.create((PulsarClientImpl)this.pulsar, strategy.getSchema(), topic, consumerFuture, cryptoKeyReader);
        return consumerFuture.thenComposeAsync(__ -> this.compactAndCloseReader((Reader)reader, strategy), (Executor)this.scheduler);
    }

    <T> CompletableFuture<Long> doCompaction(Reader<T> reader, TopicCompactionStrategy strategy) {
        if (!(reader instanceof CompactionReaderImpl)) {
            return CompletableFuture.failedFuture(new IllegalStateException("reader has to be CompactionReaderImpl"));
        }
        return reader.hasMessageAvailableAsync().thenCompose(available -> {
            if (available.booleanValue()) {
                return this.phaseOne(reader, strategy).thenCompose(result -> this.phaseTwo((PhaseOneResult)result, reader, this.bk));
            }
            log.info("Skip compaction of the empty topic {}", (Object)reader.getTopic());
            return CompletableFuture.completedFuture(-1L);
        });
    }

    <T> CompletableFuture<Long> compactAndCloseReader(Reader<T> reader, TopicCompactionStrategy strategy) {
        CompletableFuture<Long> promise = new CompletableFuture<Long>();
        this.mxBean.addCompactionStartOp(reader.getTopic());
        this.doCompaction(reader, strategy).whenComplete((ledgerId, exception) -> {
            log.info("Completed doCompaction ledgerId:{}", ledgerId);
            reader.closeAsync().whenComplete((v, exception2) -> {
                if (exception2 != null) {
                    log.warn("Error closing reader handle {}, ignoring", (Object)reader, exception2);
                }
                if (exception != null) {
                    this.mxBean.addCompactionEndOp(reader.getTopic(), false);
                    promise.completeExceptionally((Throwable)exception);
                } else {
                    this.mxBean.addCompactionEndOp(reader.getTopic(), true);
                    promise.complete((Long)ledgerId);
                }
            });
        });
        return promise;
    }

    private <T> boolean doCompactMessage(Message<T> msg, PhaseOneResult<T> result, TopicCompactionStrategy<T> strategy) {
        Object prevVal;
        Map cache = result.cache;
        String key = msg.getKey();
        if (key == null) {
            msg.release();
            return true;
        }
        Object val = msg.getValue();
        Message prev = cache.get(key);
        Object object = prevVal = prev == null ? null : prev.getValue();
        if (!strategy.shouldKeepLeft(prevVal, val)) {
            if (val != null && msg.size() > 0) {
                cache.remove(key);
                cache.put(key, msg);
            } else {
                cache.remove(key);
                msg.release();
            }
            if (prev != null) {
                prev.release();
            }
            result.validCompactionCount.incrementAndGet();
            return true;
        }
        msg.release();
        result.invalidCompactionCount.incrementAndGet();
        return false;
    }

    private <T> CompletableFuture<PhaseOneResult> phaseOne(Reader<T> reader, TopicCompactionStrategy strategy) {
        CompletableFuture<PhaseOneResult> promise = new CompletableFuture<PhaseOneResult>();
        PhaseOneResult result = new PhaseOneResult(reader.getTopic());
        ((CompletableFuture)((CompactionReaderImpl)reader).getLastMessageIdAsync().thenAccept(lastMessageId -> {
            log.info("Commencing phase one of compaction for {}, reading to {}", (Object)reader.getTopic(), lastMessageId);
            result.lastId = StrategicTwoPhaseCompactor.copyMessageId(lastMessageId);
            this.phaseOneLoop(reader, promise, result, strategy);
        })).exceptionally(ex -> {
            promise.completeExceptionally((Throwable)ex);
            return null;
        });
        return promise;
    }

    private static MessageId copyMessageId(MessageId msgId) {
        if (msgId instanceof BatchMessageIdImpl) {
            BatchMessageIdImpl tempId = (BatchMessageIdImpl)msgId;
            return new BatchMessageIdImpl((MessageIdAdv)tempId);
        }
        if (msgId instanceof MessageIdImpl) {
            MessageIdImpl tempId = (MessageIdImpl)msgId;
            return new MessageIdImpl(tempId.getLedgerId(), tempId.getEntryId(), tempId.getPartitionIndex());
        }
        throw new IllegalStateException("Unknown lastMessageId type");
    }

    private <T> void phaseOneLoop(Reader<T> reader, CompletableFuture<PhaseOneResult> promise, PhaseOneResult<T> result, TopicCompactionStrategy<T> strategy) {
        if (promise.isDone()) {
            return;
        }
        CompletableFuture future = reader.readNextAsync();
        FutureUtil.addTimeoutHandling((CompletableFuture)future, (Duration)this.phaseOneLoopReadTimeout, (ScheduledExecutorService)this.scheduler, () -> FutureUtil.createTimeoutException((String)"Timeout", this.getClass(), (String)"phaseOneLoop(...)"));
        ((CompletableFuture)future.thenAcceptAsync(msg -> {
            MessageId id = msg.getMessageId();
            boolean completed = false;
            if (result.lastId.compareTo((Object)id) == 0) {
                completed = true;
            }
            result.numReadMessages.incrementAndGet();
            this.mxBean.addCompactionReadOp(reader.getTopic(), msg.size());
            if (this.doCompactMessage((Message)msg, result, strategy)) {
                this.mxBean.addCompactionRemovedEvent(reader.getTopic());
            }
            if (result.firstId == null) {
                result.firstId = StrategicTwoPhaseCompactor.copyMessageId(id);
                log.info("Resetting cursor to firstId:{}", (Object)result.firstId);
                try {
                    reader.seek(result.firstId);
                }
                catch (Throwable e) {
                    throw new RuntimeException(String.format("Failed while resetting the cursor to firstId:%s", result.firstId), e);
                }
                this.waitForReconnection(reader);
            }
            if (completed) {
                promise.complete(result);
            } else {
                this.phaseOneLoop(reader, promise, result, strategy);
            }
        }, (Executor)this.scheduler)).exceptionally(ex -> {
            promise.completeExceptionally((Throwable)ex);
            return null;
        });
    }

    private <T> void waitForReconnection(Reader<T> reader) {
        long started = System.currentTimeMillis();
        try {
            Thread.sleep(100L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        while (!reader.isConnected()) {
            long now = System.currentTimeMillis();
            if (now - started > 20000L) {
                String errorMsg = String.format("Reader has not been reconnected for %d secs. Stopping the compaction.", 20);
                log.error(errorMsg);
                throw new RuntimeException(errorMsg);
            }
            log.warn("Reader has not been reconnected after the cursor reset. elapsed :{} ms. Retrying soon.", (Object)(now - started));
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                log.warn("The thread got interrupted while waiting. continuing", (Throwable)e);
            }
        }
    }

    private <T> CompletableFuture<Long> phaseTwo(PhaseOneResult<T> phaseOneResult, Reader<T> reader, BookKeeper bk) {
        log.info("Completed phase one. Result:{}. ", phaseOneResult);
        Map metadata = LedgerMetadataUtils.buildMetadataForCompactedLedger((String)phaseOneResult.topic, (byte[])phaseOneResult.lastId.toByteArray());
        return this.createLedger(bk, metadata).thenCompose(ledger -> {
            log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}", new Object[]{phaseOneResult.topic, phaseOneResult.firstId, phaseOneResult.lastId, phaseOneResult.cache.size(), ledger.getId()});
            return this.runPhaseTwo(phaseOneResult, reader, (LedgerHandle)ledger, bk);
        });
    }

    private <T> CompletableFuture<Long> runPhaseTwo(PhaseOneResult<T> phaseOneResult, Reader<T> reader, LedgerHandle ledger, BookKeeper bk) {
        CompletableFuture<Long> promise = new CompletableFuture<Long>();
        Semaphore outstanding = new Semaphore(500);
        CompletableFuture<Void> loopPromise = new CompletableFuture<Void>();
        this.phaseTwoLoop(phaseOneResult.topic, phaseOneResult.cache.values().iterator(), ledger, outstanding, loopPromise);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)loopPromise.thenCompose(v -> {
            log.info("Flushing batch container numMessagesInBatch:{}", (Object)this.batchMessageContainer.getNumMessagesInBatch());
            return this.addToCompactedLedger(ledger, null, reader.getTopic(), outstanding).whenComplete((res, exception2) -> {
                if (exception2 != null) {
                    promise.completeExceptionally((Throwable)exception2);
                    return;
                }
            });
        })).thenCompose(v -> {
            log.info("Acking ledger id {}", (Object)phaseOneResult.lastId);
            return ((CompactionReaderImpl)reader).acknowledgeCumulativeAsync(phaseOneResult.lastId, Map.of("CompactedTopicLedger", ledger.getId()));
        })).thenCompose(v -> this.closeLedger(ledger))).whenComplete((v, exception) -> {
            if (exception != null) {
                this.deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
                    if (exception2 != null) {
                        log.error("Cleanup of ledger {} for failed", (Object)ledger, exception2);
                    }
                    promise.completeExceptionally((Throwable)exception);
                });
            } else {
                log.info("kept ledger:{}", (Object)ledger.getId());
                promise.complete(ledger.getId());
            }
        });
        return promise;
    }

    private <T> void phaseTwoLoop(String topic, Iterator<Message<T>> reader, LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) {
        if (promise.isDone()) {
            return;
        }
        CompletableFuture.runAsync(() -> {
            if (!reader.hasNext()) {
                try {
                    outstanding.acquire(500);
                }
                catch (InterruptedException e) {
                    promise.completeExceptionally(e);
                    return;
                }
                outstanding.release(500);
                promise.complete(null);
                return;
            }
            Message message = (Message)reader.next();
            this.mxBean.addCompactionReadOp(topic, message.size());
            this.addToCompactedLedger(lh, message, topic, outstanding).whenComplete((res, exception2) -> {
                if (exception2 != null) {
                    promise.completeExceptionally((Throwable)exception2);
                    return;
                }
            });
            this.phaseTwoLoop(topic, reader, lh, outstanding, promise);
        }, this.scheduler).exceptionally(ex -> {
            promise.completeExceptionally((Throwable)ex);
            return null;
        });
    }

    <T> CompletableFuture<Boolean> addToCompactedLedger(LedgerHandle lh, Message<T> m, String topic, Semaphore outstanding) {
        CompletableFuture<Boolean> bkf = new CompletableFuture<Boolean>();
        if (m == null || this.batchMessageContainer.add((MessageImpl)m, null)) {
            if (this.batchMessageContainer.getNumMessagesInBatch() > 0) {
                try {
                    ByteBuf serialized = this.batchMessageContainer.toByteBuf();
                    outstanding.acquire();
                    this.mxBean.addCompactionWriteOp(topic, serialized.readableBytes());
                    long start = System.nanoTime();
                    lh.asyncAddEntry(serialized, (rc, ledger, eid, ctx) -> {
                        outstanding.release();
                        this.mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS);
                        if (rc != 0) {
                            bkf.completeExceptionally(BKException.create((int)rc));
                        } else {
                            bkf.complete(true);
                        }
                    }, null);
                }
                catch (Throwable t) {
                    log.error("Failed to add entry", t);
                    this.batchMessageContainer.discard((Exception)t);
                    return FutureUtil.failedFuture((Throwable)t);
                }
            } else {
                bkf.complete(false);
            }
        } else {
            bkf.complete(false);
        }
        return bkf;
    }

    private static class PhaseOneResult<T> {
        MessageId firstId;
        MessageId lastId;
        Map<String, Message<T>> cache;
        AtomicInteger invalidCompactionCount;
        AtomicInteger validCompactionCount;
        AtomicInteger numReadMessages;
        String topic;

        PhaseOneResult(String topic) {
            this.topic = topic;
            this.cache = new LinkedHashMap<String, Message<T>>();
            this.invalidCompactionCount = new AtomicInteger();
            this.validCompactionCount = new AtomicInteger();
            this.numReadMessages = new AtomicInteger();
        }

        public String toString() {
            return String.format("{Topic:%s, firstId:%s, lastId:%s, cache.size:%d, invalidCompactionCount:%d, validCompactionCount:%d, numReadMessages:%d}", this.topic, this.firstId != null ? this.firstId.toString() : "", this.lastId != null ? this.lastId.toString() : "", this.cache.size(), this.invalidCompactionCount.get(), this.validCompactionCount.get(), this.numReadMessages.get());
        }
    }
}

