package org.apache.qpid.server.virtualhost.plugins;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages;
import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;

/* loaded from: input_file:org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.class */
public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin {
    private SlowConsumerDetectionConfiguration _config;
    private ConfiguredQueueBindingListener _listener;

    /* loaded from: input_file:org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection$SlowConsumerFactory.class */
    public static class SlowConsumerFactory implements VirtualHostPluginFactory {
        @Override // org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory
        public SlowConsumerDetection newInstance(VirtualHost virtualHost) {
            SlowConsumerDetectionConfiguration slowConsumerDetectionConfiguration = (SlowConsumerDetectionConfiguration) virtualHost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName());
            if (slowConsumerDetectionConfiguration == null) {
                return null;
            }
            SlowConsumerDetection slowConsumerDetection = new SlowConsumerDetection(virtualHost);
            slowConsumerDetection.configure(slowConsumerDetectionConfiguration);
            return slowConsumerDetection;
        }
    }

    @Override // org.apache.qpid.server.plugins.Plugin
    public void configure(ConfigurationPlugin configurationPlugin) {
        this._config = (SlowConsumerDetectionConfiguration) configurationPlugin;
        this._listener = new ConfiguredQueueBindingListener(getVirtualHost().getName());
        ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry();
        Iterator<AMQShortString> it = exchangeRegistry.getExchangeNames().iterator();
        while (it.hasNext()) {
            exchangeRegistry.getExchange(it.next()).addBindingListener(this._listener);
        }
    }

    public SlowConsumerDetection(VirtualHost virtualHost) {
        super(virtualHost);
    }

    @Override // org.apache.qpid.server.virtualhost.HouseKeepingTask
    public void execute() {
        CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING());
        for (AMQQueue aMQQueue : this._listener.getQueueCache()) {
            CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(aMQQueue.getName()));
            try {
                SlowConsumerDetectionQueueConfiguration slowConsumerDetectionQueueConfiguration = (SlowConsumerDetectionQueueConfiguration) aMQQueue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
                if (checkQueueStatus(aMQQueue, slowConsumerDetectionQueueConfiguration)) {
                    SlowConsumerPolicyPlugin policy = slowConsumerDetectionQueueConfiguration.getPolicy();
                    if (policy == null) {
                        getLogger().warn("No slow consumer policy for queue " + aMQQueue.getName());
                    } else {
                        policy.performPolicy(aMQQueue);
                    }
                }
            } catch (Exception e) {
                getLogger().error("Exception in SlowConsumersDetection for queue: " + aMQQueue.getName(), e);
            }
        }
        CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE());
    }

    @Override // org.apache.qpid.server.virtualhost.plugins.VirtualHostHouseKeepingPlugin, org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin
    public long getDelay() {
        return this._config.getDelay();
    }

    @Override // org.apache.qpid.server.virtualhost.plugins.VirtualHostHouseKeepingPlugin, org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin
    public TimeUnit getTimeUnit() {
        return this._config.getTimeUnit();
    }

    private boolean checkQueueStatus(AMQQueue aMQQueue, SlowConsumerDetectionQueueConfiguration slowConsumerDetectionQueueConfiguration) {
        if (slowConsumerDetectionQueueConfiguration == null) {
            return false;
        }
        if (getLogger().isInfoEnabled()) {
            getLogger().info("Retrieved Queue(" + aMQQueue.getName() + ") Config:" + slowConsumerDetectionQueueConfiguration);
        }
        int messageCount = aMQQueue.getMessageCount();
        if ((slowConsumerDetectionQueueConfiguration.getMessageCount() == 0 || messageCount < slowConsumerDetectionQueueConfiguration.getMessageCount()) && ((slowConsumerDetectionQueueConfiguration.getDepth() == 0 || aMQQueue.getQueueDepth() < slowConsumerDetectionQueueConfiguration.getDepth()) && (slowConsumerDetectionQueueConfiguration.getMessageAge() == 0 || messageCount <= 0 || aMQQueue.getOldestMessageArrivalTime() < slowConsumerDetectionQueueConfiguration.getMessageAge()))) {
            return false;
        }
        if (!getLogger().isDebugEnabled()) {
            return true;
        }
        getLogger().debug("Detected Slow Consumer on Queue(" + aMQQueue.getName() + ")");
        getLogger().debug("Queue Count:" + aMQQueue.getMessageCount() + ":" + slowConsumerDetectionQueueConfiguration.getMessageCount());
        getLogger().debug("Queue Depth:" + aMQQueue.getQueueDepth() + ":" + slowConsumerDetectionQueueConfiguration.getDepth());
        getLogger().debug("Queue Arrival:" + aMQQueue.getOldestMessageArrivalTime() + ":" + slowConsumerDetectionQueueConfiguration.getMessageAge());
        return true;
    }
}
