/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactorSubscription
extends PersistentSubscription {
    private final CompactedTopic compactedTopic;
    private static final Logger log = LoggerFactory.getLogger(CompactorSubscription.class);

    public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic, String subscriptionName, ManagedCursor cursor) {
        super(topic, subscriptionName, cursor, false);
        Preconditions.checkArgument((boolean)subscriptionName.equals("__compaction"));
        this.compactedTopic = compactedTopic;
        this.cursor.setAlwaysInactive();
        Map properties = cursor.getProperties();
        if (properties.containsKey("CompactedTopicLedger")) {
            long compactedLedgerId = (Long)properties.get("CompactedTopicLedger");
            compactedTopic.newCompactedLedger(cursor.getMarkDeletedPosition(), compactedLedgerId);
        }
    }

    @Override
    public void acknowledgeMessage(List<Position> positions, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties) {
        Preconditions.checkArgument((ackType == PulsarApi.CommandAck.AckType.Cumulative ? 1 : 0) != 0);
        Preconditions.checkArgument((positions.size() == 1 ? 1 : 0) != 0);
        Preconditions.checkArgument((boolean)properties.containsKey("CompactedTopicLedger"));
        long compactedLedgerId = properties.get("CompactedTopicLedger");
        final Position position = positions.get(0);
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Cumulative ack on compactor subscription {}", new Object[]{this.topicName, this.subName, position});
        }
        this.compactedTopic.newCompactedLedger(position, compactedLedgerId).thenAccept(previousContext -> this.cursor.asyncMarkDelete(position, properties, new AsyncCallbacks.MarkDeleteCallback((CompactedTopicImpl.CompactedTopicContext)previousContext){
            final /* synthetic */ CompactedTopicImpl.CompactedTopicContext val$previousContext;
            {
                this.val$previousContext = compactedTopicContext;
            }

            public void markDeleteComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Mark deleted messages until position on compactor subscription {}", new Object[]{CompactorSubscription.this.topicName, CompactorSubscription.this.subName, position});
                }
                if (this.val$previousContext != null) {
                    CompactorSubscription.this.compactedTopic.deleteCompactedLedger(this.val$previousContext.getLedger().getId());
                }
            }

            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Failed to mark delete for position on compactor subscription {}", new Object[]{CompactorSubscription.this.topicName, CompactorSubscription.this.subName, ctx, exception});
                }
            }
        }, null));
        if (this.topic.getManagedLedger().isTerminated() && this.cursor.getNumberOfEntriesInBacklog(false) == 0L) {
            this.dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
        }
    }
}

