/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.google.common.base.Objects;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentSubscription
implements Subscription {
    private final PersistentTopic topic;
    private final ManagedCursor cursor;
    private volatile Dispatcher dispatcher;
    private final String topicName;
    private final String subName;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private static final AtomicIntegerFieldUpdater<PersistentSubscription> IS_FENCED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentSubscription.class, "isFenced");
    private volatile int isFenced = 0;
    private PersistentMessageExpiryMonitor expiryMonitor;
    private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
    private final AsyncCallbacks.MarkDeleteCallback markDeleteCallback = new AsyncCallbacks.MarkDeleteCallback(){

        public void markDeleteComplete(Object ctx) {
            PositionImpl pos = (PositionImpl)ctx;
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Mark deleted messages until position {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, pos});
            }
        }

        public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Failed to mark delete for position ", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, ctx, exception});
            }
        }
    };
    private final AsyncCallbacks.DeleteCallback deleteCallback = new AsyncCallbacks.DeleteCallback(){

        public void deleteComplete(Object ctx) {
            PositionImpl pos = (PositionImpl)ctx;
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Deleted message at {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, pos});
            }
        }

        public void deleteFailed(ManagedLedgerException exception, Object ctx) {
            log.warn("[{}][{}] Failed to delete message at {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, ctx, exception});
        }
    };
    private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);

    public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor) {
        this.topic = topic;
        this.cursor = cursor;
        this.topicName = topic.getName();
        this.subName = subscriptionName;
        this.expiryMonitor = new PersistentMessageExpiryMonitor(this.topicName, subscriptionName, cursor);
        IS_FENCED_UPDATER.set(this, 0);
    }

    @Override
    public String getName() {
        return this.subName;
    }

    @Override
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        block13: {
            block12: {
                if (IS_FENCED_UPDATER.get(this) == 1) {
                    log.warn("Attempting to add consumer {} on a fenced subscription", (Object)consumer);
                    throw new BrokerServiceException.SubscriptionFencedException("Subscription is fenced");
                }
                if (this.topic.getManagedLedger().isTerminated() && this.cursor.getNumberOfEntriesInBacklog() == 0L) {
                    consumer.reachedEndOfTopic();
                }
                if (this.dispatcher != null && this.dispatcher.isConsumerConnected()) break block12;
                switch (consumer.subType()) {
                    case Exclusive: {
                        if (this.dispatcher == null || this.dispatcher.getType() != PulsarApi.CommandSubscribe.SubType.Exclusive) {
                            this.dispatcher = new PersistentDispatcherSingleActiveConsumer(this.cursor, PulsarApi.CommandSubscribe.SubType.Exclusive, 0, this.topic);
                        }
                        break block13;
                    }
                    case Shared: {
                        if (this.dispatcher == null || this.dispatcher.getType() != PulsarApi.CommandSubscribe.SubType.Shared) {
                            this.dispatcher = new PersistentDispatcherMultipleConsumers(this.topic, this.cursor);
                        }
                        break block13;
                    }
                    case Failover: {
                        int partitionIndex = DestinationName.getPartitionIndex((String)this.topicName);
                        if (partitionIndex < 0) {
                            partitionIndex = 0;
                        }
                        if (this.dispatcher == null || this.dispatcher.getType() != PulsarApi.CommandSubscribe.SubType.Failover) {
                            this.dispatcher = new PersistentDispatcherSingleActiveConsumer(this.cursor, PulsarApi.CommandSubscribe.SubType.Failover, partitionIndex, this.topic);
                        }
                        break block13;
                    }
                    default: {
                        throw new BrokerServiceException.ServerMetadataException("Unsupported subscription type");
                    }
                }
            }
            if (consumer.subType() != this.dispatcher.getType()) {
                throw new BrokerServiceException.SubscriptionBusyException("Subscription is of different type");
            }
        }
        this.dispatcher.addConsumer(consumer);
        this.activateCursor();
    }

    @Override
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        if (this.dispatcher != null) {
            this.dispatcher.removeConsumer(consumer);
        }
        if (this.dispatcher.getConsumers().isEmpty()) {
            this.deactivateCursor();
            if (!this.cursor.isDurable()) {
                this.close();
                this.topic.removeSubscription(this.subName);
            }
        }
        PersistentTopic.USAGE_COUNT_UPDATER.decrementAndGet(this.topic);
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] [{}] Removed consumer -- count: {}", new Object[]{this.topic.getName(), this.subName, consumer.consumerName(), PersistentTopic.USAGE_COUNT_UPDATER.get(this.topic)});
        }
    }

    public void deactivateCursor() {
        this.cursor.setInactive();
    }

    public void activateCursor() {
        this.cursor.setActive();
    }

    @Override
    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
        this.dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
    }

    @Override
    public void acknowledgeMessage(PositionImpl position, PulsarApi.CommandAck.AckType ackType) {
        if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Cumulative ack on {}", new Object[]{this.topicName, this.subName, position});
            }
            this.cursor.asyncMarkDelete((Position)position, this.markDeleteCallback, (Object)position);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Individual ack on {}", new Object[]{this.topicName, this.subName, position});
            }
            this.cursor.asyncDelete((Position)position, this.deleteCallback, (Object)position);
        }
        if (this.topic.getManagedLedger().isTerminated() && this.cursor.getNumberOfEntriesInBacklog() == 0L) {
            this.dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
        }
    }

    public String toString() {
        return Objects.toStringHelper((Object)this).add("topic", (Object)this.topicName).add("name", (Object)this.subName).toString();
    }

    @Override
    public String getDestination() {
        return this.topicName;
    }

    @Override
    public PulsarApi.CommandSubscribe.SubType getType() {
        return this.dispatcher != null ? this.dispatcher.getType() : null;
    }

    @Override
    public String getTypeString() {
        PulsarApi.CommandSubscribe.SubType type = this.getType();
        if (type == null) {
            return "None";
        }
        switch (type) {
            case Exclusive: {
                return "Exclusive";
            }
            case Failover: {
                return "Failover";
            }
            case Shared: {
                return "Shared";
            }
        }
        return "Null";
    }

    @Override
    public CompletableFuture<Void> clearBacklog() {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Backlog size before clearing: {}", new Object[]{this.topicName, this.subName, this.cursor.getNumberOfEntriesInBacklog()});
        }
        this.cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback(){

            public void clearBacklogComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Backlog size after clearing: {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, PersistentSubscription.this.cursor.getNumberOfEntriesInBacklog()});
                }
                future.complete(null);
            }

            public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}][{}] Failed to clear backlog", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, exception});
                future.completeExceptionally((Throwable)exception);
            }
        }, null);
        return future;
    }

    @Override
    public CompletableFuture<Void> skipMessages(final int numMessagesToSkip) {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Skipping {} messages, current backlog {}", new Object[]{this.topicName, this.subName, numMessagesToSkip, this.cursor.getNumberOfEntriesInBacklog()});
        }
        this.cursor.asyncSkipEntries(numMessagesToSkip, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.SkipEntriesCallback(){

            public void skipEntriesComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Skipped {} messages, new backlog {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, numMessagesToSkip, PersistentSubscription.this.cursor.getNumberOfEntriesInBacklog()});
                }
                future.complete(null);
            }

            public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}][{}] Failed to skip {} messages", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, numMessagesToSkip, exception});
                future.completeExceptionally((Throwable)exception);
            }
        }, null);
        return future;
    }

    @Override
    public CompletableFuture<Void> resetCursor(final long timestamp) {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder(this.topicName, this.cursor);
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Resetting subscription to timestamp {}", new Object[]{this.topicName, this.subName, timestamp});
        }
        persistentMessageFinder.findMessages(timestamp, new AsyncCallbacks.FindEntryCallback(){

            public void findEntryComplete(Position position, Object ctx) {
                Position finalPosition;
                if (position == null) {
                    finalPosition = PersistentSubscription.this.cursor.getFirstPosition();
                    if (finalPosition == null) {
                        log.warn("[{}][{}] Unable to find position for timestamp {}. Unable to reset cursor to first position", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, timestamp});
                        future.completeExceptionally(new BrokerServiceException.SubscriptionInvalidCursorPosition("Unable to find position for specified timestamp"));
                        return;
                    }
                    log.info("[{}][{}] Unable to find position for timestamp {}. Resetting cursor to first position {} in ledger", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, timestamp, finalPosition});
                } else {
                    finalPosition = position;
                }
                if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, 0, 1)) {
                    future.completeExceptionally(new BrokerServiceException.SubscriptionBusyException("Failed to fence subscription"));
                    return;
                }
                CompletableFuture<Object> disconnectFuture = PersistentSubscription.this.dispatcher != null && PersistentSubscription.this.dispatcher.isConsumerConnected() ? PersistentSubscription.this.dispatcher.disconnectAllConsumers() : CompletableFuture.completedFuture(null);
                disconnectFuture.whenComplete((aVoid, throwable) -> {
                    if (throwable != null) {
                        log.error("[{}][{}] Failed to disconnect consumer from subscription", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, throwable});
                        IS_FENCED_UPDATER.set(PersistentSubscription.this, 0);
                        future.completeExceptionally(new BrokerServiceException.SubscriptionBusyException("Failed to disconnect consumers from subscription"));
                        return;
                    }
                    log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset", (Object)PersistentSubscription.this.topicName, (Object)PersistentSubscription.this.subName);
                    try {
                        PersistentSubscription.this.cursor.asyncResetCursor(finalPosition, new AsyncCallbacks.ResetCursorCallback(){

                            public void resetComplete(Object ctx) {
                                if (log.isDebugEnabled()) {
                                    log.debug("[{}][{}] Successfully reset subscription to timestamp {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, timestamp});
                                }
                                IS_FENCED_UPDATER.set(PersistentSubscription.this, 0);
                                future.complete(null);
                            }

                            public void resetFailed(ManagedLedgerException exception, Object ctx) {
                                log.error("[{}][{}] Failed to reset subscription to timestamp {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, timestamp, exception});
                                IS_FENCED_UPDATER.set(PersistentSubscription.this, 0);
                                if (exception instanceof ManagedLedgerException.InvalidCursorPositionException) {
                                    future.completeExceptionally(new BrokerServiceException.SubscriptionInvalidCursorPosition(exception.getMessage()));
                                } else if (exception instanceof ManagedLedgerException.ConcurrentFindCursorPositionException) {
                                    future.completeExceptionally(new BrokerServiceException.SubscriptionBusyException(exception.getMessage()));
                                } else {
                                    future.completeExceptionally(new BrokerServiceException((Throwable)exception));
                                }
                            }
                        });
                    }
                    catch (Exception e) {
                        log.error("[{}][{}] Error while resetting cursor", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, e});
                        IS_FENCED_UPDATER.set(PersistentSubscription.this, 0);
                        future.completeExceptionally(new BrokerServiceException(e));
                    }
                });
            }

            public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
                if (exception instanceof ManagedLedgerException.ConcurrentFindCursorPositionException) {
                    future.completeExceptionally(new BrokerServiceException.SubscriptionBusyException(exception.getMessage()));
                } else {
                    future.completeExceptionally(new BrokerServiceException((Throwable)exception));
                }
            }
        });
        return future;
    }

    @Override
    public CompletableFuture<Entry> peekNthMessage(int messagePosition) {
        final CompletableFuture<Entry> future = new CompletableFuture<Entry>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Getting message at position {}", new Object[]{this.topicName, this.subName, messagePosition});
        }
        this.cursor.asyncGetNthEntry(messagePosition, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.ReadEntryCallback(){

            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally((Throwable)exception);
            }

            public void readEntryComplete(Entry entry, Object ctx) {
                future.complete(entry);
            }
        }, null);
        return future;
    }

    @Override
    public long getNumberOfEntriesInBacklog() {
        return this.cursor.getNumberOfEntriesInBacklog();
    }

    @Override
    public synchronized Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> close() {
        final CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        PersistentSubscription persistentSubscription = this;
        synchronized (persistentSubscription) {
            if (this.dispatcher != null && this.dispatcher.isConsumerConnected()) {
                closeFuture.completeExceptionally(new BrokerServiceException.SubscriptionBusyException("Subscription has active consumers"));
                return closeFuture;
            }
            IS_FENCED_UPDATER.set(this, 1);
            log.info("[{}][{}] Successfully fenced cursor ledger [{}]", new Object[]{this.topicName, this.subName, this.cursor});
        }
        this.cursor.asyncClose(new AsyncCallbacks.CloseCallback(){

            public void closeComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Successfully closed cursor ledger", (Object)PersistentSubscription.this.topicName, (Object)PersistentSubscription.this.subName);
                }
                closeFuture.complete(null);
            }

            public void closeFailed(ManagedLedgerException exception, Object ctx) {
                IS_FENCED_UPDATER.set(PersistentSubscription.this, 0);
                log.error("[{}][{}] Error closing cursor for subscription", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, exception});
                closeFuture.completeExceptionally(new BrokerServiceException.PersistenceException((Throwable)exception));
            }
        }, null);
        return closeFuture;
    }

    @Override
    public synchronized CompletableFuture<Void> disconnect() {
        CompletableFuture<Void> disconnectFuture = new CompletableFuture<Void>();
        IS_FENCED_UPDATER.set(this, 1);
        ((CompletableFuture)((CompletableFuture)(this.dispatcher != null ? this.dispatcher.close() : CompletableFuture.completedFuture(null)).thenCompose(v -> this.close())).thenRun(() -> {
            log.info("[{}][{}] Successfully disconnected and closed subscription", (Object)this.topicName, (Object)this.subName);
            disconnectFuture.complete(null);
        })).exceptionally(exception -> {
            IS_FENCED_UPDATER.set(this, 0);
            this.dispatcher.reset();
            log.error("[{}][{}] Error disconnecting consumers from subscription", new Object[]{this.topicName, this.subName, exception});
            disconnectFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return disconnectFuture;
    }

    @Override
    public CompletableFuture<Void> delete() {
        CompletableFuture<Void> deleteFuture = new CompletableFuture<Void>();
        log.info("[{}][{}] Unsubscribing", (Object)this.topicName, (Object)this.subName);
        ((CompletableFuture)((CompletableFuture)this.close().thenCompose(v -> this.topic.unsubscribe(this.subName))).thenAccept(v -> {
            boolean bl = deleteFuture.complete(null);
        })).exceptionally(exception -> {
            IS_FENCED_UPDATER.set(this, 0);
            log.error("[{}][{}] Error deleting subscription", new Object[]{this.topicName, this.subName, exception});
            deleteFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return deleteFuture;
    }

    @Override
    public CompletableFuture<Void> doUnsubscribe(Consumer consumer) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        try {
            if (this.dispatcher.canUnsubscribe(consumer)) {
                consumer.close();
                return this.delete();
            }
            future.completeExceptionally(new BrokerServiceException.ServerMetadataException("Unconnected or shared consumer attempting to unsubscribe"));
        }
        catch (BrokerServiceException e) {
            log.warn("Error removing consumer {}", (Object)consumer);
            future.completeExceptionally(e);
        }
        return future;
    }

    public CopyOnWriteArrayList<Consumer> getConsumers() {
        Dispatcher dispatcher = this.dispatcher;
        if (dispatcher != null) {
            return dispatcher.getConsumers();
        }
        return CopyOnWriteArrayList.empty();
    }

    @Override
    public void expireMessages(int messageTTLInSeconds) {
        if (this.getNumberOfEntriesInBacklog() == 0L || this.dispatcher != null && this.dispatcher.isConsumerConnected() && this.getNumberOfEntriesInBacklog() < 1000L && !this.topic.isOldestMessageExpired(this.cursor, messageTTLInSeconds)) {
            return;
        }
        this.expiryMonitor.expireMessages(messageTTLInSeconds);
    }

    @Override
    public double getExpiredMessageRate() {
        return this.expiryMonitor.getMessageExpiryRate();
    }

    public SubscriptionStats getStats() {
        SubscriptionStats subStats = new SubscriptionStats();
        Dispatcher dispatcher = this.dispatcher;
        if (dispatcher != null) {
            dispatcher.getConsumers().forEach(consumer -> {
                ConsumerStats consumerStats = consumer.getStats();
                subscriptionStats.consumers.add(consumerStats);
                subscriptionStats.msgRateOut += consumerStats.msgRateOut;
                subscriptionStats.msgThroughputOut += consumerStats.msgThroughputOut;
                subscriptionStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
                subscriptionStats.unackedMessages += (long)consumerStats.unackedMessages;
            });
        }
        subStats.type = this.getType();
        if (PulsarApi.CommandSubscribe.SubType.Shared.equals((Object)subStats.type) && dispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subStats.unackedMessages = ((PersistentDispatcherMultipleConsumers)dispatcher).getTotalUnackedMessages();
            subStats.blockedSubscriptionOnUnackedMsgs = ((PersistentDispatcherMultipleConsumers)dispatcher).isBlockedDispatcherOnUnackedMsgs();
        }
        subStats.msgBacklog = this.getNumberOfEntriesInBacklog();
        subStats.msgRateExpired = this.expiryMonitor.getMessageExpiryRate();
        return subStats;
    }

    @Override
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
        this.dispatcher.redeliverUnacknowledgedMessages(consumer);
    }

    @Override
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
        this.dispatcher.redeliverUnacknowledgedMessages(consumer, positions);
    }

    @Override
    public void addUnAckedMessages(int unAckMessages) {
        this.dispatcher.addUnAckedMessages(unAckMessages);
    }

    @Override
    public void markTopicWithBatchMessagePublished() {
        this.topic.markBatchMessagePublished();
    }

    void topicTerminated() {
        if (this.cursor.getNumberOfEntriesInBacklog() == 0L) {
            this.dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
        }
    }
}

