/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.advisories;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.activemq.advisories.ConsumerAdvisoryEvent;
import org.activemq.advisories.ProducerDemandEvent;
import org.activemq.advisories.ProducerDemandListener;
import org.activemq.message.ActiveMQDestination;
import org.activemq.message.ConsumerInfo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ProducerDemandAdvisor {
    private static final Log log = LogFactory.getLog((Class)ProducerDemandAdvisor.class);
    private final ActiveMQDestination destination;
    private Connection connection;
    private Session session;
    private SynchronizedBoolean started = new SynchronizedBoolean(false);
    private int consumerCount;
    private ProducerDemandListener demandListener;

    public ProducerDemandAdvisor(Connection connection, Destination destination) throws JMSException {
        this.connection = connection;
        this.destination = ActiveMQDestination.transformDestination(destination);
    }

    private void fireDemandEvent() {
        this.demandListener.onEvent(new ProducerDemandEvent(this.destination, this.isInDemand()));
    }

    public boolean isInDemand() {
        return this.consumerCount > 0;
    }

    public ProducerDemandListener getDemandListener() {
        return this.demandListener;
    }

    public synchronized void setDemandListener(ProducerDemandListener demandListener) {
        this.demandListener = demandListener;
        this.fireDemandEvent();
    }

    public void start() throws JMSException {
        if (this.started.commit(false, true)) {
            this.session = this.connection.createSession(false, 1);
            MessageConsumer consumer = this.session.createConsumer((Destination)this.destination.getTopicForConsumerAdvisory());
            consumer.setMessageListener(new MessageListener(){

                public void onMessage(Message msg) {
                    ProducerDemandAdvisor.this.process(msg);
                }
            });
        }
    }

    public void stop() throws JMSException {
        if (this.started.commit(true, false) && this.session != null) {
            this.session.close();
        }
    }

    protected void process(Message msg) {
        if (msg instanceof ObjectMessage) {
            try {
                ConsumerInfo info = (ConsumerInfo)((ObjectMessage)msg).getObject();
                ConsumerAdvisoryEvent event = new ConsumerAdvisoryEvent(info);
                boolean inDemand = this.isInDemand();
                this.consumerCount = info.isStarted() ? ++this.consumerCount : --this.consumerCount;
                if (inDemand ^ this.isInDemand() && this.demandListener != null) {
                    this.fireDemandEvent();
                }
            }
            catch (JMSException e) {
                log.error((Object)("Failed to process message: " + msg));
            }
        }
    }
}

