/*
 * Decompiled with CFR 0.152.
 */
package org.activemq;

import java.util.LinkedList;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import org.activemq.ActiveMQSession;
import org.activemq.ActiveMQTopicSubscriber;
import org.activemq.Closeable;
import org.activemq.io.util.MemoryBoundedQueue;
import org.activemq.management.JMSConsumerStatsImpl;
import org.activemq.management.StatsCapable;
import org.activemq.management.StatsImpl;
import org.activemq.message.ActiveMQDestination;
import org.activemq.message.ActiveMQMessage;
import org.activemq.selector.SelectorParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ActiveMQMessageConsumer
implements MessageConsumer,
StatsCapable,
Closeable {
    private static final Log log = LogFactory.getLog((Class)ActiveMQMessageConsumer.class);
    protected ActiveMQSession session;
    protected String consumerIdentifier;
    protected MemoryBoundedQueue messageQueue;
    protected String messageSelector;
    private MessageListener messageListener;
    protected String consumerName;
    protected ActiveMQDestination destination;
    private boolean closed;
    protected int consumerNumber;
    protected int prefetchNumber;
    protected long startTime;
    protected boolean noLocal;
    protected boolean browser;
    private Thread accessThread;
    private Object messageListenerGuard;
    private JMSConsumerStatsImpl stats;
    private boolean running = true;
    private LinkedList stoppedQueue = new LinkedList();

    protected ActiveMQMessageConsumer(ActiveMQSession theSession, ActiveMQDestination dest, String name, String selector, int cnum, int prefetch, boolean noLocalValue, boolean browserValue) throws JMSException {
        if (dest == null) {
            throw new InvalidDestinationException("Do not understand a null destination");
        }
        if (dest.isTemporary() && theSession.connection.isJ2EEcompliant() && !theSession.isInternalSession()) {
            String physicalName = dest.getPhysicalName();
            if (physicalName == null) {
                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
            }
            String clientID = theSession.connection.getInitializedClientID();
            if (physicalName.indexOf(clientID) < 0) {
                throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
            }
            if (dest.isDeleted()) {
                throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
            }
        }
        dest.incrementConsumerCounter();
        if (selector != null && (selector = selector.trim()).length() > 0) {
            new SelectorParser().parse(selector);
        }
        this.session = theSession;
        this.destination = dest;
        this.consumerName = name;
        this.messageSelector = selector;
        this.consumerNumber = cnum;
        this.prefetchNumber = prefetch;
        this.noLocal = noLocalValue;
        this.browser = browserValue;
        this.consumerIdentifier = theSession.connection.getClientID() + "." + theSession.getSessionId() + "." + this.consumerNumber;
        this.startTime = System.currentTimeMillis();
        this.messageListenerGuard = new Object();
        this.messageQueue = theSession.connection.getMemoryBoundedQueue(this.consumerIdentifier);
        this.stats = new JMSConsumerStatsImpl(theSession.getSessionStats(), dest);
        this.session.addConsumer(this);
    }

    public long getLocalMemoryUsage() {
        return this.messageQueue.getLocalMemoryUsedByThisQueue();
    }

    public int size() {
        return this.messageQueue.size();
    }

    public StatsImpl getStats() {
        return this.stats;
    }

    public JMSConsumerStatsImpl getConsumerStats() {
        return this.stats;
    }

    public String toString() {
        return "MessageConsumer: " + this.consumerIdentifier + "[" + this.consumerNumber + "]";
    }

    public int getPrefetchNumber() {
        return this.prefetchNumber;
    }

    public void setPrefetchNumber(int prefetchNumber) {
        this.prefetchNumber = prefetchNumber;
    }

    public String getMessageSelector() throws JMSException {
        this.checkClosed();
        return this.messageSelector;
    }

    public MessageListener getMessageListener() throws JMSException {
        this.checkClosed();
        return this.messageListener;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setMessageListener(MessageListener listener) throws JMSException {
        this.checkClosed();
        Object object = this.messageListenerGuard;
        synchronized (object) {
            this.messageListener = listener;
            if (listener != null) {
                this.session.setSessionConsumerDispatchState(2);
                ActiveMQMessage msg = null;
                try {
                    while ((msg = (ActiveMQMessage)this.messageQueue.dequeueNoWait()) != null) {
                        this.processMessage(msg);
                    }
                }
                catch (InterruptedException ex) {
                    JMSException jmsEx = new JMSException("Interrupted setting message listener");
                    jmsEx.setLinkedException((Exception)ex);
                    throw jmsEx;
                }
            }
        }
    }

    public Message receive() throws JMSException {
        this.checkClosed();
        this.session.setSessionConsumerDispatchState(3);
        try {
            this.accessThread = Thread.currentThread();
            ActiveMQMessage message = (ActiveMQMessage)this.messageQueue.dequeue();
            this.accessThread = null;
            if (message != null) {
                boolean expired = message.isExpired();
                this.messageDelivered(message, true, expired);
                message = !expired ? message.shallowCopy() : (ActiveMQMessage)this.receiveNoWait();
            }
            if (message != null && log.isDebugEnabled()) {
                log.debug((Object)("Message received: " + message));
            }
            return message;
        }
        catch (InterruptedException ioe) {
            return null;
        }
    }

    public Message receive(long timeout) throws JMSException {
        this.checkClosed();
        this.session.setSessionConsumerDispatchState(3);
        try {
            if (timeout == 0L) {
                return this.receive();
            }
            this.accessThread = Thread.currentThread();
            ActiveMQMessage message = (ActiveMQMessage)this.messageQueue.dequeue(timeout);
            this.accessThread = null;
            if (message != null) {
                boolean expired = message.isExpired();
                this.messageDelivered(message, true, expired);
                message = !expired ? message.shallowCopy() : (ActiveMQMessage)this.receiveNoWait();
            }
            if (message != null && log.isDebugEnabled()) {
                log.debug((Object)("Message received: " + message));
            }
            return message;
        }
        catch (InterruptedException ioe) {
            return null;
        }
    }

    public Message receiveNoWait() throws JMSException {
        this.checkClosed();
        this.session.setSessionConsumerDispatchState(3);
        try {
            ActiveMQMessage message = null;
            while ((message = (ActiveMQMessage)this.messageQueue.dequeueNoWait()) != null) {
                boolean expired = message.isExpired();
                this.messageDelivered(message, true, expired);
                if (expired) continue;
                if (message != null && log.isDebugEnabled()) {
                    log.debug((Object)("Message received: " + message));
                }
                return message.shallowCopy();
            }
        }
        catch (InterruptedException ioe) {
            throw new JMSException("Queue is interrupted: " + ioe.getMessage());
        }
        return null;
    }

    public void close() throws JMSException {
        try {
            this.accessThread.interrupt();
        }
        catch (NullPointerException npe) {
        }
        catch (SecurityException securityException) {
            // empty catch block
        }
        if (this.destination != null) {
            this.destination.decrementConsumerCounter();
        }
        this.session.removeConsumer(this);
        this.messageQueue.close();
        this.closed = true;
    }

    public boolean isDurableSubscriber() {
        return this instanceof ActiveMQTopicSubscriber && this.consumerName != null && this.consumerName.length() > 0;
    }

    public boolean isTransientSubscriber() {
        return this.destination != null && this.destination.isTopic() && (this.consumerName == null || this.consumerName.length() == 0);
    }

    protected void checkClosed() throws IllegalStateException {
        if (this.closed) {
            throw new IllegalStateException("The Consumer is closed");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected synchronized void processMessage(ActiveMQMessage message) {
        if (!this.running) {
            this.stoppedQueue.addLast(message);
            return;
        }
        message.setConsumerIdentifer(this.consumerIdentifier);
        MessageListener listener = null;
        Object object = this.messageListenerGuard;
        synchronized (object) {
            listener = this.messageListener;
        }
        try {
            if (!this.closed) {
                if (message.getJMSActiveMQDestination() == null) {
                    message.setJMSDestination(this.getDestination());
                }
                if (listener != null) {
                    this.beforeMessageDelivered(message);
                    boolean expired = message.isExpired();
                    if (!expired) {
                        if (log.isDebugEnabled()) {
                            log.debug((Object)("Message delivered to message listener: " + message));
                        }
                        listener.onMessage((Message)message.shallowCopy());
                    }
                    this.afterMessageDelivered(message, true, expired, true);
                } else {
                    this.messageQueue.enqueue(message);
                }
            } else {
                this.messageDelivered(message, false, false);
            }
        }
        catch (Exception e) {
            log.warn((Object)("could not process message: " + message), (Throwable)e);
            this.messageDelivered(message, false, false);
        }
    }

    protected String getConsumerIdentifier() {
        return this.consumerIdentifier;
    }

    protected String getConsumerName() {
        return this.consumerName;
    }

    protected void setConsumerName(String value) {
        this.consumerName = value;
    }

    protected int getConsumerNumber() {
        return this.consumerNumber;
    }

    protected void setConsumerNumber(int value) {
        this.consumerNumber = value;
    }

    protected boolean isNoLocal() {
        return this.noLocal;
    }

    protected boolean isBrowser() {
        return this.browser;
    }

    protected void setBrowser(boolean value) {
        this.browser = value;
    }

    protected ActiveMQDestination getDestination() {
        return this.destination;
    }

    protected long getStartTime() {
        return this.startTime;
    }

    protected void clearMessagesInProgress() {
        this.messageQueue.clear();
        this.stoppedQueue.clear();
    }

    private void messageDelivered(ActiveMQMessage message, boolean messageRead, boolean messageExpired) {
        this.afterMessageDelivered(message, messageRead, messageExpired, false);
    }

    private void beforeMessageDelivered(ActiveMQMessage message) {
        if (message == null) {
            return;
        }
        boolean topic = this.destination != null && this.destination.isTopic();
        message.setTransientConsumed((!this.isDurableSubscriber() || !message.isPersistent()) && topic);
        this.session.beforeMessageDelivered(message);
    }

    private void afterMessageDelivered(ActiveMQMessage message, boolean messageRead, boolean messageExpired, boolean beforeCalled) {
        if (message == null) {
            return;
        }
        boolean consumed = this.browser ? false : messageRead;
        ActiveMQDestination destination = message.getJMSActiveMQDestination();
        boolean topic = destination != null && destination.isTopic();
        message.setTransientConsumed((!this.isDurableSubscriber() || !message.isPersistent()) && topic);
        this.session.afterMessageDelivered(this.isDurableSubscriber() || this.destination.isQueue(), message, consumed, messageExpired, beforeCalled);
        if (messageRead) {
            this.stats.onMessage(message);
        }
    }

    public synchronized void start() {
        this.running = true;
        while (!this.stoppedQueue.isEmpty()) {
            ActiveMQMessage m = (ActiveMQMessage)this.stoppedQueue.removeFirst();
            this.processMessage(m);
        }
    }

    public synchronized void stop() {
        this.running = false;
    }
}

