/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.nonpersistent;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcher;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NonPersistentDispatcherMultipleConsumers
extends AbstractDispatcherMultipleConsumers
implements NonPersistentDispatcher {
    private final NonPersistentTopic topic;
    private final Subscription subscription;
    private CompletableFuture<Void> closeFuture = null;
    private final String name;
    private final Rate msgDrop;
    protected static final AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers> TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NonPersistentDispatcherMultipleConsumers.class, "totalAvailablePermits");
    private volatile int totalAvailablePermits = 0;
    private final ServiceConfiguration serviceConfig;
    private static final Logger log = LoggerFactory.getLogger(NonPersistentDispatcherMultipleConsumers.class);

    public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
        this.topic = topic;
        this.subscription = subscription;
        this.name = String.valueOf(topic.getName()) + " / " + subscription.getName();
        this.msgDrop = new Rate();
        this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
    }

    @Override
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        if (AbstractDispatcherMultipleConsumers.IS_CLOSED_UPDATER.get(this) == 1) {
            log.warn("[{}] Dispatcher is already closed. Closing consumer ", (Object)this.name, (Object)consumer);
            consumer.disconnect();
            return;
        }
        if (this.isConsumersExceededOnTopic()) {
            log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", (Object)this.name);
            throw new BrokerServiceException.ConsumerBusyException("Topic reached max consumers limit");
        }
        if (this.isConsumersExceededOnSubscription()) {
            log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", (Object)this.name);
            throw new BrokerServiceException.ConsumerBusyException("Subscription reached max consumers limit");
        }
        this.consumerList.add(consumer);
        this.consumerSet.add((Object)consumer);
    }

    private boolean isConsumersExceededOnTopic() {
        int maxConsumersPerTopic = this.serviceConfig.getMaxConsumersPerTopic();
        return maxConsumersPerTopic > 0 && maxConsumersPerTopic <= this.topic.getNumberOfConsumers();
    }

    private boolean isConsumersExceededOnSubscription() {
        int maxConsumersPerSubscription = this.serviceConfig.getMaxConsumersPerSubscription();
        return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= this.consumerList.size();
    }

    @Override
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        if (this.consumerSet.removeAll((Object)consumer) == 1) {
            this.consumerList.remove(consumer);
            log.info("Removed consumer {}", (Object)consumer);
            if (this.consumerList.isEmpty()) {
                if (this.closeFuture != null) {
                    log.info("[{}] All consumers removed. Subscription is disconnected", (Object)this.name);
                    this.closeFuture.complete(null);
                }
                TOTAL_AVAILABLE_PERMITS_UPDATER.set(this, 0);
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Trying to remove a non-connected consumer: {}", (Object)this.name, (Object)consumer);
            }
            TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -consumer.getAvailablePermits());
        }
    }

    @Override
    public boolean isConsumerConnected() {
        return !this.consumerList.isEmpty();
    }

    @Override
    public CopyOnWriteArrayList<Consumer> getConsumers() {
        return this.consumerList;
    }

    @Override
    public synchronized boolean canUnsubscribe(Consumer consumer) {
        return this.consumerList.size() == 1 && this.consumerSet.contains((Object)consumer);
    }

    @Override
    public CompletableFuture<Void> close() {
        AbstractDispatcherMultipleConsumers.IS_CLOSED_UPDATER.set(this, 1);
        return this.disconnectAllConsumers();
    }

    @Override
    public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
        if (!this.consumerSet.contains((Object)consumer)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Ignoring flow control from disconnected consumer {}", (Object)this.name, (Object)consumer);
            }
            return;
        }
        TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, additionalNumberOfMessages);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Trigger new read after receiving flow control message", (Object)consumer);
        }
    }

    @Override
    public synchronized CompletableFuture<Void> disconnectAllConsumers() {
        this.closeFuture = new CompletableFuture();
        if (this.consumerList.isEmpty()) {
            this.closeFuture.complete(null);
        } else {
            this.consumerList.forEach(Consumer::disconnect);
        }
        return this.closeFuture;
    }

    @Override
    public void reset() {
        AbstractDispatcherMultipleConsumers.IS_CLOSED_UPDATER.set(this, 0);
    }

    @Override
    public PulsarApi.CommandSubscribe.SubType getType() {
        return PulsarApi.CommandSubscribe.SubType.Shared;
    }

    @Override
    public void sendMessages(List<Entry> entries) {
        Consumer consumer;
        Consumer consumer2 = consumer = TOTAL_AVAILABLE_PERMITS_UPDATER.get(this) > 0 ? this.getNextConsumer() : null;
        if (consumer != null) {
            TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -consumer.sendMessages(entries).getTotalSentMessages());
        } else {
            entries.forEach(entry -> {
                int totalMsgs = Consumer.getBatchSizeforEntry(entry.getDataBuffer(), this.subscription, -1L);
                if (totalMsgs > 0) {
                    this.msgDrop.recordEvent();
                }
                entry.release();
            });
        }
    }

    @Override
    public boolean hasPermits() {
        return TOTAL_AVAILABLE_PERMITS_UPDATER.get(this) > 0;
    }

    @Override
    public Rate getMesssageDropRate() {
        return this.msgDrop;
    }

    @Override
    public boolean isConsumerAvailable(Consumer consumer) {
        return consumer != null && consumer.getAvailablePermits() > 0 && consumer.isWritable();
    }
}

