/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.nonpersistent;

import com.google.common.base.MoreObjects;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcher;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NonPersistentSubscription
implements Subscription {
    private final NonPersistentTopic topic;
    private volatile NonPersistentDispatcher dispatcher;
    private final String topicName;
    private final String subName;
    private final String fullName;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private static final AtomicIntegerFieldUpdater<NonPersistentSubscription> IS_FENCED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NonPersistentSubscription.class, "isFenced");
    private volatile int isFenced = 0;
    private volatile long lastActive;
    private static final Logger log = LoggerFactory.getLogger(NonPersistentSubscription.class);

    public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName) {
        this.topic = topic;
        this.topicName = topic.getName();
        this.subName = subscriptionName;
        this.fullName = MoreObjects.toStringHelper((Object)this).add("topic", (Object)this.topicName).add("name", (Object)this.subName).toString();
        IS_FENCED_UPDATER.set(this, 0);
        this.lastActive = System.currentTimeMillis();
    }

    @Override
    public String getName() {
        return this.subName;
    }

    @Override
    public Topic getTopic() {
        return this.topic;
    }

    @Override
    public boolean isReplicated() {
        return false;
    }

    @Override
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        this.updateLastActive();
        if (IS_FENCED_UPDATER.get(this) == 1) {
            log.warn("Attempting to add consumer {} on a fenced subscription", (Object)consumer);
            throw new BrokerServiceException.SubscriptionFencedException("Subscription is fenced");
        }
        if (this.dispatcher == null || !this.dispatcher.isConsumerConnected()) {
            NonPersistentDispatcher previousDispatcher = null;
            block0 : switch (consumer.subType()) {
                case Exclusive: {
                    if (this.dispatcher != null && this.dispatcher.getType() == PulsarApi.CommandSubscribe.SubType.Exclusive) break;
                    previousDispatcher = this.dispatcher;
                    this.dispatcher = new NonPersistentDispatcherSingleActiveConsumer(PulsarApi.CommandSubscribe.SubType.Exclusive, 0, this.topic, (Subscription)this);
                    break;
                }
                case Shared: {
                    if (this.dispatcher != null && this.dispatcher.getType() == PulsarApi.CommandSubscribe.SubType.Shared) break;
                    previousDispatcher = this.dispatcher;
                    this.dispatcher = new NonPersistentDispatcherMultipleConsumers(this.topic, this);
                    break;
                }
                case Failover: {
                    int partitionIndex = TopicName.getPartitionIndex((String)this.topicName);
                    if (partitionIndex < 0) {
                        partitionIndex = 0;
                    }
                    if (this.dispatcher != null && this.dispatcher.getType() == PulsarApi.CommandSubscribe.SubType.Failover) break;
                    previousDispatcher = this.dispatcher;
                    this.dispatcher = new NonPersistentDispatcherSingleActiveConsumer(PulsarApi.CommandSubscribe.SubType.Failover, partitionIndex, this.topic, (Subscription)this);
                    break;
                }
                case Key_Shared: {
                    if (this.dispatcher != null && this.dispatcher.getType() == PulsarApi.CommandSubscribe.SubType.Key_Shared) break;
                    previousDispatcher = this.dispatcher;
                    PulsarApi.KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta() : PulsarApi.KeySharedMeta.getDefaultInstance();
                    switch (ksm.getKeySharedMode()) {
                        case STICKY: {
                            this.dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(this.topic, this, new HashRangeExclusiveStickyKeyConsumerSelector());
                            break block0;
                        }
                    }
                    ServiceConfiguration conf = this.topic.getBrokerService().getPulsar().getConfiguration();
                    StickyKeyConsumerSelector selector = conf.isSubscriptionKeySharedUseConsistentHashing() ? new ConsistentHashingStickyKeyConsumerSelector(conf.getSubscriptionKeySharedConsistentHashingReplicaPoints()) : new HashRangeAutoSplitStickyKeyConsumerSelector();
                    this.dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(this.topic, this, selector);
                    break;
                }
                default: {
                    throw new BrokerServiceException.ServerMetadataException("Unsupported subscription type");
                }
            }
            if (previousDispatcher != null) {
                ((CompletableFuture)previousDispatcher.close().thenRun(() -> log.info("[{}][{}] Successfully closed previous dispatcher", (Object)this.topicName, (Object)this.subName))).exceptionally(ex -> {
                    log.error("[{}][{}] Failed to close previous dispatcher", new Object[]{this.topicName, this.subName, ex});
                    return null;
                });
            }
        } else if (consumer.subType() != this.dispatcher.getType()) {
            throw new BrokerServiceException.SubscriptionBusyException("Subscription is of different type");
        }
        this.dispatcher.addConsumer(consumer);
    }

    @Override
    public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
        this.updateLastActive();
        if (this.dispatcher != null) {
            this.dispatcher.removeConsumer(consumer);
        }
        NonPersistentTopic.USAGE_COUNT_UPDATER.decrementAndGet(this.topic);
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] [{}] Removed consumer -- count: {}", new Object[]{this.topic.getName(), this.subName, consumer.consumerName(), NonPersistentTopic.USAGE_COUNT_UPDATER.get(this.topic)});
        }
    }

    @Override
    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
        this.dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
    }

    @Override
    public void acknowledgeMessage(List<Position> position, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties) {
    }

    public String toString() {
        return this.fullName;
    }

    @Override
    public String getTopicName() {
        return this.topicName;
    }

    @Override
    public PulsarApi.CommandSubscribe.SubType getType() {
        return this.dispatcher != null ? this.dispatcher.getType() : null;
    }

    @Override
    public String getTypeString() {
        PulsarApi.CommandSubscribe.SubType type = this.getType();
        if (type == null) {
            return "None";
        }
        switch (type) {
            case Exclusive: {
                return "Exclusive";
            }
            case Failover: {
                return "Failover";
            }
            case Shared: {
                return "Shared";
            }
            case Key_Shared: {
                return "Key_Shared";
            }
        }
        return "Null";
    }

    @Override
    public CompletableFuture<Void> clearBacklog() {
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Void> skipMessages(int numMessagesToSkip) {
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Void> resetCursor(long timestamp) {
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Entry> peekNthMessage(int messagePosition) {
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public long getNumberOfEntriesInBacklog(boolean getPreciseBacklog) {
        return 0L;
    }

    @Override
    public NonPersistentDispatcher getDispatcher() {
        return this.dispatcher;
    }

    @Override
    public CompletableFuture<Void> close() {
        IS_FENCED_UPDATER.set(this, 1);
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public synchronized CompletableFuture<Void> disconnect() {
        CompletableFuture<Void> disconnectFuture = new CompletableFuture<Void>();
        IS_FENCED_UPDATER.set(this, 1);
        ((CompletableFuture)((CompletableFuture)(this.dispatcher != null ? this.dispatcher.close() : CompletableFuture.completedFuture(null)).thenCompose(v -> this.close())).thenRun(() -> {
            log.info("[{}][{}] Successfully disconnected and closed subscription", (Object)this.topicName, (Object)this.subName);
            disconnectFuture.complete(null);
        })).exceptionally(exception -> {
            IS_FENCED_UPDATER.set(this, 0);
            if (this.dispatcher != null) {
                this.dispatcher.reset();
            }
            log.error("[{}][{}] Error disconnecting consumers from subscription", new Object[]{this.topicName, this.subName, exception});
            disconnectFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return disconnectFuture;
    }

    @Override
    public CompletableFuture<Void> delete() {
        return this.delete(false);
    }

    @Override
    public CompletableFuture<Void> deleteForcefully() {
        return this.delete(true);
    }

    private CompletableFuture<Void> delete(boolean closeIfConsumersConnected) {
        CompletableFuture<Void> deleteFuture = new CompletableFuture<Void>();
        log.info("[{}][{}] Unsubscribing", (Object)this.topicName, (Object)this.subName);
        CompletableFuture closeSubscriptionFuture = new CompletableFuture();
        if (closeIfConsumersConnected) {
            ((CompletableFuture)this.disconnect().thenRun(() -> closeSubscriptionFuture.complete(null))).exceptionally(ex -> {
                log.error("[{}][{}] Error disconnecting and closing subscription", new Object[]{this.topicName, this.subName, ex});
                closeSubscriptionFuture.completeExceptionally((Throwable)ex);
                return null;
            });
        } else {
            ((CompletableFuture)this.close().thenRun(() -> closeSubscriptionFuture.complete(null))).exceptionally(exception -> {
                log.error("[{}][{}] Error closing subscription", new Object[]{this.topicName, this.subName, exception});
                closeSubscriptionFuture.completeExceptionally((Throwable)exception);
                return null;
            });
        }
        ((CompletableFuture)((CompletableFuture)closeSubscriptionFuture.thenCompose(v -> this.topic.unsubscribe(this.subName))).thenAccept(v -> {
            NonPersistentSubscription nonPersistentSubscription = this;
            synchronized (nonPersistentSubscription) {
                ((CompletableFuture)(this.dispatcher != null ? this.dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
                    log.info("[{}][{}] Successfully deleted subscription", (Object)this.topicName, (Object)this.subName);
                    deleteFuture.complete(null);
                })).exceptionally(ex -> {
                    IS_FENCED_UPDATER.set(this, 0);
                    if (this.dispatcher != null) {
                        this.dispatcher.reset();
                    }
                    log.error("[{}][{}] Error deleting subscription", new Object[]{this.topicName, this.subName, ex});
                    deleteFuture.completeExceptionally((Throwable)ex);
                    return null;
                });
            }
        })).exceptionally(exception -> {
            IS_FENCED_UPDATER.set(this, 0);
            log.error("[{}][{}] Error deleting subscription", new Object[]{this.topicName, this.subName, exception});
            deleteFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return deleteFuture;
    }

    @Override
    public CompletableFuture<Void> doUnsubscribe(Consumer consumer) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        try {
            if (this.dispatcher.canUnsubscribe(consumer)) {
                consumer.close();
                return this.delete();
            }
            future.completeExceptionally(new BrokerServiceException.ServerMetadataException("Unconnected or shared consumer attempting to unsubscribe"));
        }
        catch (BrokerServiceException e) {
            log.warn("Error removing consumer {}", (Object)consumer);
            future.completeExceptionally(e);
        }
        return future;
    }

    @Override
    public List<Consumer> getConsumers() {
        NonPersistentDispatcher dispatcher = this.dispatcher;
        if (dispatcher != null) {
            return dispatcher.getConsumers();
        }
        return Collections.emptyList();
    }

    @Override
    public void expireMessages(int messageTTLInSeconds) {
    }

    public NonPersistentSubscriptionStats getStats() {
        NonPersistentSubscriptionStats subStats = new NonPersistentSubscriptionStats();
        NonPersistentDispatcher dispatcher = this.dispatcher;
        if (dispatcher != null) {
            dispatcher.getConsumers().forEach(consumer -> {
                ConsumerStats consumerStats = consumer.getStats();
                subStats.consumers.add(consumerStats);
                subStats.msgRateOut += consumerStats.msgRateOut;
                subStats.msgThroughputOut += consumerStats.msgThroughputOut;
                subStats.bytesOutCounter += consumerStats.bytesOutCounter;
                subStats.msgOutCounter += consumerStats.msgOutCounter;
                subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
            });
        }
        subStats.type = this.getType();
        subStats.msgDropRate = dispatcher.getMessageDropRate().getValueRate();
        return subStats;
    }

    @Override
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
    }

    @Override
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
    }

    @Override
    public void addUnAckedMessages(int unAckMessages) {
    }

    @Override
    public double getExpiredMessageRate() {
        return 0.0;
    }

    @Override
    public void markTopicWithBatchMessagePublished() {
        this.topic.markBatchMessagePublished();
    }

    @Override
    public CompletableFuture<Void> resetCursor(Position position) {
        return CompletableFuture.completedFuture(null);
    }

    public long getLastActive() {
        return this.lastActive;
    }

    public void updateLastActive() {
        this.lastActive = System.currentTimeMillis();
    }
}

