package org.activemq.service.impl;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import java.util.ArrayList;
import javax.jms.JMSException;
import org.activemq.broker.Broker;
import org.activemq.broker.BrokerClient;
import org.activemq.broker.BrokerConnector;
import org.activemq.broker.BrokerContainer;
import org.activemq.filter.Filter;
import org.activemq.message.ActiveMQDestination;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.BrokerInfo;
import org.activemq.message.ConsumerInfo;
import org.activemq.message.MessageAck;
import org.activemq.security.SecurityAdapter;
import org.activemq.service.DeadLetterPolicy;
import org.activemq.service.Dispatcher;
import org.activemq.service.MessageContainer;
import org.activemq.service.MessageIdentity;
import org.activemq.service.QueueList;
import org.activemq.service.QueueListEntry;
import org.activemq.service.RedeliveryPolicy;
import org.activemq.service.SubscriberEntry;
import org.activemq.service.Subscription;
import org.activemq.service.TransactionManager;
import org.activemq.service.TransactionTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:activemq-core-3.2.jar:org/activemq/service/impl/SubscriptionImpl.class */
public class SubscriptionImpl implements Subscription {
    private static final Log log;
    private String clientId;
    private String subscriberName;
    private ActiveMQDestination destination;
    private String selector;
    private int prefetchLimit;
    private boolean noLocal;
    private int consumerNumber;
    private String consumerId;
    private boolean browser;
    protected Dispatcher dispatch;
    protected String brokerName;
    protected String clusterName;
    protected MessageIdentity lastMessageIdentity;
    private Filter filter;
    private SubscriberEntry subscriberEntry;
    private BrokerClient activeClient;
    private RedeliveryPolicy redeliveryPolicy;
    private DeadLetterPolicy deadLetterPolicy;
    static Class class$org$activemq$service$impl$SubscriptionImpl;
    protected SynchronizedInt unconsumedMessagesDispatched = new SynchronizedInt(0);
    protected QueueList messagePtrs = new DefaultQueueList();
    private boolean usePrefetch = true;
    private SynchronizedBoolean active = new SynchronizedBoolean(false);
    private Object lock = new Object();

    public SubscriptionImpl(Dispatcher dispatcher, BrokerClient brokerClient, ConsumerInfo consumerInfo, Filter filter, RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
        this.dispatch = dispatcher;
        this.filter = filter;
        this.redeliveryPolicy = redeliveryPolicy;
        this.deadLetterPolicy = deadLetterPolicy;
        setActiveConsumer(brokerClient, consumerInfo);
    }

    @Override // org.activemq.service.Subscription
    public void setActiveConsumer(BrokerClient brokerClient, ConsumerInfo consumerInfo) {
        BrokerConnector brokerConnector;
        BrokerInfo brokerInfo;
        if (consumerInfo != null) {
            this.clientId = consumerInfo.getClientId();
            this.subscriberName = consumerInfo.getConsumerName();
            this.noLocal = consumerInfo.isNoLocal();
            this.destination = consumerInfo.getDestination();
            this.selector = consumerInfo.getSelector();
            this.prefetchLimit = consumerInfo.getPrefetchNumber();
            this.consumerNumber = consumerInfo.getConsumerNo();
            this.consumerId = consumerInfo.getConsumerId();
            this.browser = consumerInfo.isBrowser();
        }
        this.activeClient = brokerClient;
        if (brokerClient == null || (brokerConnector = brokerClient.getBrokerConnector()) == null || (brokerInfo = brokerConnector.getBrokerInfo()) == null) {
            return;
        }
        this.brokerName = brokerInfo.getBrokerName();
        this.clusterName = brokerInfo.getClusterName();
    }

    public String toString() {
        return new StringBuffer().append("SubscriptionImpl(").append(super.hashCode()).append(")[").append(this.consumerId).append(PropertyAccessor.PROPERTY_KEY_SUFFIX).append(this.clientId).append(": ").append(this.subscriberName).append(" : ").append(this.destination).toString();
    }

    @Override // org.activemq.service.Subscription
    public void clear() throws JMSException {
        synchronized (this.lock) {
            QueueListEntry firstEntry = this.messagePtrs.getFirstEntry();
            while (firstEntry != null) {
                ((MessagePointer) firstEntry.getElement()).clear();
                firstEntry = this.messagePtrs.getNextEntry(firstEntry);
            }
            this.messagePtrs.clear();
        }
    }

    @Override // org.activemq.service.Subscription
    public void reset() throws JMSException {
        synchronized (this.lock) {
            QueueListEntry firstEntry = this.messagePtrs.getFirstEntry();
            while (firstEntry != null) {
                MessagePointer messagePointer = (MessagePointer) firstEntry.getElement();
                if (!messagePointer.isDispatched() || messagePointer.isDeleted()) {
                    break;
                }
                messagePointer.reset();
                messagePointer.setRedelivered(true);
                firstEntry = this.messagePtrs.getNextEntry(firstEntry);
            }
        }
    }

    public BrokerClient getActiveClient() {
        return this.activeClient;
    }

    @Override // org.activemq.service.Subscription
    public String getClientId() {
        return this.clientId;
    }

    public void setClientId(String str) {
        this.clientId = str;
    }

    public Filter getFilter() {
        return this.filter;
    }

    public void setFilter(Filter filter) {
        this.filter = filter;
    }

    @Override // org.activemq.service.Subscription
    public boolean isWildcard() {
        return this.filter.isWildcard();
    }

    @Override // org.activemq.service.Subscription
    public String getPersistentKey() {
        return null;
    }

    @Override // org.activemq.service.Subscription
    public boolean isSameDurableSubscription(ConsumerInfo consumerInfo) throws JMSException {
        return isDurableTopic() && equal(this.clientId, consumerInfo.getClientId()) && equal(this.subscriberName, consumerInfo.getConsumerName());
    }

    public boolean isNoLocal() {
        return this.noLocal;
    }

    public void setNoLocal(boolean z) {
        this.noLocal = z;
    }

    @Override // org.activemq.service.Subscription
    public String getSubscriberName() {
        return this.subscriberName;
    }

    public void setSubscriberName(String str) {
        this.subscriberName = str;
    }

    public RedeliveryPolicy getRedeliveryPolicy() {
        return this.redeliveryPolicy;
    }

    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
        this.redeliveryPolicy = redeliveryPolicy;
    }

    @Override // org.activemq.service.Subscription
    public boolean isTarget(ActiveMQMessage activeMQMessage) throws JMSException {
        boolean z = false;
        if (activeMQMessage != null && (this.activeClient == null || this.brokerName == null || this.clusterName == null || !this.activeClient.isClusteredConnection() || !activeMQMessage.isEntryCluster(this.clusterName) || activeMQMessage.isEntryBroker(this.brokerName))) {
            z = activeMQMessage.isDispatchedFromDLQ() || this.filter.matches(activeMQMessage);
            if (this.noLocal && z && clientIDsEqual(activeMQMessage)) {
                z = false;
            }
            if (z && !isAuthorizedForMessage(activeMQMessage)) {
                z = false;
            }
        }
        return z;
    }

    @Override // org.activemq.service.Subscription
    public void addMessage(MessageContainer messageContainer, ActiveMQMessage activeMQMessage) throws JMSException {
        if (log.isDebugEnabled()) {
            log.debug(new StringBuffer().append("Adding to subscription: ").append(this).append(" message: ").append(activeMQMessage).toString());
        }
        MessagePointer messagePointer = new MessagePointer(messageContainer, activeMQMessage);
        synchronized (this.lock) {
            this.messagePtrs.add(messagePointer);
        }
        this.dispatch.wakeup(this);
        this.lastMessageIdentity = activeMQMessage.getJMSMessageIdentity();
    }

    @Override // org.activemq.service.Subscription
    public void messageConsumed(MessageAck messageAck) throws JMSException {
        int i = 0;
        boolean z = false;
        synchronized (this.lock) {
            QueueListEntry firstEntry = this.messagePtrs.getFirstEntry();
            while (true) {
                if (firstEntry == null) {
                    break;
                }
                MessagePointer messagePointer = (MessagePointer) firstEntry.getElement();
                i++;
                if (!messageAck.isPartOfTransaction() || messagePointer.getMessageIdentity().equals(messageAck.getMessageIdentity())) {
                    if ((messageAck.isExpired() || messageAck.isMessageRead()) && !this.browser) {
                        messagePointer.delete(messageAck);
                    }
                    if (messageAck.isMessageRead() || this.browser) {
                        this.unconsumedMessagesDispatched.decrement();
                        TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask(this, messagePointer) { // from class: org.activemq.service.impl.SubscriptionImpl.1
                            private final MessagePointer val$pointer;
                            private final SubscriptionImpl this$0;

                            {
                                this.this$0 = this;
                                this.val$pointer = messagePointer;
                            }

                            @Override // org.activemq.service.TransactionTask
                            public void execute() throws Throwable {
                                this.this$0.unconsumedMessagesDispatched.increment();
                                this.val$pointer.reset();
                                this.val$pointer.setRedelivered(true);
                                this.this$0.dispatch.wakeup(this.this$0);
                            }
                        });
                        TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(this, firstEntry, messageAck, messagePointer) { // from class: org.activemq.service.impl.SubscriptionImpl.2
                            private final QueueListEntry val$theEntry;
                            private final MessageAck val$ack;
                            private final MessagePointer val$pointer;
                            private final SubscriptionImpl this$0;

                            {
                                this.this$0 = this;
                                this.val$theEntry = firstEntry;
                                this.val$ack = messageAck;
                                this.val$pointer = messagePointer;
                            }

                            @Override // org.activemq.service.TransactionTask
                            public void execute() throws Throwable {
                                ActiveMQMessage message;
                                this.this$0.messagePtrs.remove(this.val$theEntry);
                                if ((!this.val$ack.isExpired() && !this.val$ack.isMessageRead()) || this.this$0.browser || !this.val$ack.isExpired() || this.val$pointer.getContainer().isDeadLetterQueue() || (message = this.val$pointer.getContainer().getMessage(this.val$pointer.getMessageIdentity())) == null) {
                                    return;
                                }
                                this.this$0.deadLetterPolicy.sendToDeadLetter(message);
                            }
                        });
                    } else {
                        messagePointer.reset();
                        messagePointer.setRedelivered(true);
                    }
                    if (messagePointer.getMessageIdentity().equals(messageAck.getMessageIdentity())) {
                        z = true;
                        break;
                    }
                }
                firstEntry = this.messagePtrs.getNextEntry(firstEntry);
            }
        }
        if (!z && log.isDebugEnabled()) {
            log.debug(new StringBuffer().append("Did not find a matching message for identity: ").append(messageAck.getMessageIdentity()).toString());
        }
        this.dispatch.wakeup(this);
    }

    @Override // org.activemq.service.Subscription
    public ActiveMQMessage[] getMessagesToDispatch() throws JMSException {
        if (this.usePrefetch) {
            return getMessagesWithPrefetch();
        }
        ArrayList arrayList = new ArrayList();
        synchronized (this.lock) {
            QueueListEntry firstEntry = this.messagePtrs.getFirstEntry();
            while (firstEntry != null) {
                MessagePointer messagePointer = (MessagePointer) firstEntry.getElement();
                if (!messagePointer.isDispatched()) {
                    ActiveMQMessage message = messagePointer.getContainer().getMessage(messagePointer.getMessageIdentity());
                    if (message != null) {
                        if (messagePointer.isDispatched() || messagePointer.isRedelivered()) {
                            message.setJMSRedelivered(true);
                            if (this.redeliveryPolicy.isBackOffMode() && message.getDeliveryCount() < this.redeliveryPolicy.getMaximumRetryCount()) {
                                try {
                                    Thread.sleep((long) (this.redeliveryPolicy.getInitialRedeliveryTimeout() * message.getDeliveryCount() * this.redeliveryPolicy.getBackOffIncreaseRate()));
                                } catch (InterruptedException e) {
                                }
                            }
                            message.incrementDeliveryCount();
                        }
                        if (messagePointer.getContainer().isDeadLetterQueue() || (!message.isExpired() && message.getDeliveryCount() < this.redeliveryPolicy.getMaximumRetryCount())) {
                            messagePointer.setDispatched(true);
                            message.setDispatchedFromDLQ(messagePointer.getContainer().isDeadLetterQueue());
                            arrayList.add(message);
                        } else {
                            if (message.isExpired()) {
                                log.warn(new StringBuffer().append("Message: ").append(message).append(" has expired").toString());
                            } else {
                                log.warn(new StringBuffer().append("Message: ").append(message).append(" exceeded retry count: ").append(message.getDeliveryCount()).toString());
                            }
                            this.deadLetterPolicy.sendToDeadLetter(message);
                            QueueListEntry queueListEntry = firstEntry;
                            firstEntry = this.messagePtrs.getPrevEntry(queueListEntry);
                            this.messagePtrs.remove(queueListEntry);
                        }
                    } else {
                        log.info(new StringBuffer().append("Message probably expired: ").append(message).toString());
                        QueueListEntry queueListEntry2 = firstEntry;
                        firstEntry = this.messagePtrs.getPrevEntry(queueListEntry2);
                        this.messagePtrs.remove(queueListEntry2);
                        if (message != null) {
                            this.deadLetterPolicy.sendToDeadLetter(message);
                        }
                    }
                }
                firstEntry = this.messagePtrs.getNextEntry(firstEntry);
            }
        }
        return (ActiveMQMessage[]) arrayList.toArray(new ActiveMQMessage[arrayList.size()]);
    }

    @Override // org.activemq.service.Subscription
    public SubscriberEntry getSubscriptionEntry() {
        if (this.subscriberEntry == null) {
            this.subscriberEntry = createSubscriptionEntry();
        }
        return this.subscriberEntry;
    }

    @Override // org.activemq.service.Subscription
    public boolean isLocalSubscription() {
        if (this.activeClient != null) {
            return (this.activeClient.isClusteredConnection() || this.activeClient.isBrokerConnection()) ? false : true;
        }
        return true;
    }

    protected SubscriberEntry createSubscriptionEntry() {
        SubscriberEntry subscriberEntry = new SubscriberEntry();
        subscriberEntry.setClientID(this.clientId);
        subscriberEntry.setConsumerName(this.subscriberName);
        subscriberEntry.setDestination(this.destination.getPhysicalName());
        subscriberEntry.setSelector(this.selector);
        return subscriberEntry;
    }

    protected ActiveMQMessage[] getMessagesWithPrefetch() throws JMSException {
        ArrayList arrayList = new ArrayList();
        synchronized (this.lock) {
            QueueListEntry firstEntry = this.messagePtrs.getFirstEntry();
            int i = 0;
            boolean z = false;
            int i2 = this.prefetchLimit - this.unconsumedMessagesDispatched.get();
            while (firstEntry != null && (i < i2 || z)) {
                MessagePointer messagePointer = (MessagePointer) firstEntry.getElement();
                if (!messagePointer.isDispatched()) {
                    ActiveMQMessage message = messagePointer.getContainer().getMessage(messagePointer.getMessageIdentity());
                    if (message == null || message.isExpired()) {
                        log.info(new StringBuffer().append("Message probably expired: ").append(message).toString());
                        QueueListEntry queueListEntry = firstEntry;
                        firstEntry = this.messagePtrs.getPrevEntry(queueListEntry);
                        this.messagePtrs.remove(queueListEntry);
                        if (message != null) {
                            this.deadLetterPolicy.sendToDeadLetter(message);
                        }
                    } else {
                        if (messagePointer.isDispatched() || messagePointer.isRedelivered()) {
                            message.setJMSRedelivered(true);
                        }
                        messagePointer.setDispatched(true);
                        arrayList.add(message);
                        z = message.isMessagePart() && !message.isLastMessagePart();
                        this.unconsumedMessagesDispatched.increment();
                        i++;
                    }
                }
                firstEntry = this.messagePtrs.getNextEntry(firstEntry);
            }
        }
        return (ActiveMQMessage[]) arrayList.toArray(new ActiveMQMessage[arrayList.size()]);
    }

    @Override // org.activemq.service.Subscription
    public boolean isAtPrefetchLimit() throws JMSException {
        return this.usePrefetch && this.messagePtrs.size() - this.unconsumedMessagesDispatched.get() >= this.prefetchLimit;
    }

    @Override // org.activemq.service.Subscription
    public boolean isReadyToDispatch() throws JMSException {
        return this.active.get() && this.messagePtrs.size() > 0;
    }

    @Override // org.activemq.service.Subscription
    public ActiveMQDestination getDestination() {
        return this.destination;
    }

    @Override // org.activemq.service.Subscription
    public String getSelector() {
        return this.selector;
    }

    @Override // org.activemq.service.Subscription
    public boolean isActive() {
        return this.active.get();
    }

    @Override // org.activemq.service.Subscription
    public void setActive(boolean z) throws JMSException {
        synchronized (this.active.getLock()) {
            this.active.set(z);
        }
        if (z) {
            return;
        }
        reset();
    }

    @Override // org.activemq.service.Subscription
    public int getConsumerNumber() {
        return this.consumerNumber;
    }

    @Override // org.activemq.service.Subscription
    public String getConsumerId() {
        return this.consumerId;
    }

    @Override // org.activemq.service.Subscription
    public boolean isDurableTopic() throws JMSException {
        return this.destination.isTopic() && this.subscriberName != null && this.subscriberName.length() > 0;
    }

    @Override // org.activemq.service.Subscription
    public boolean isBrowser() throws JMSException {
        return this.browser;
    }

    @Override // org.activemq.service.Subscription
    public MessageIdentity getLastMessageIdentity() throws JMSException {
        return this.lastMessageIdentity;
    }

    @Override // org.activemq.service.Subscription
    public void setLastMessageIdentifier(MessageIdentity messageIdentity) throws JMSException {
        this.lastMessageIdentity = messageIdentity;
    }

    protected boolean clientIDsEqual(ActiveMQMessage activeMQMessage) {
        String jMSClientID = activeMQMessage.getJMSClientID();
        String str = this.clientId;
        if (jMSClientID == null || str == null) {
            return false;
        }
        return jMSClientID.equals(str);
    }

    protected static final boolean equal(Object obj, Object obj2) {
        return obj == obj2 || !(obj == null || obj2 == null || !obj.equals(obj2));
    }

    protected boolean isAuthorizedForMessage(ActiveMQMessage activeMQMessage) {
        BrokerConnector brokerConnector;
        BrokerContainer brokerContainer;
        Broker broker;
        SecurityAdapter securityAdapter;
        BrokerClient activeClient = getActiveClient();
        if (activeClient == null || (brokerConnector = activeClient.getBrokerConnector()) == null || (brokerContainer = brokerConnector.getBrokerContainer()) == null || (broker = brokerContainer.getBroker()) == null || (securityAdapter = broker.getSecurityAdapter()) == null) {
            return true;
        }
        return securityAdapter.authorizeReceive(activeClient, activeMQMessage);
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$activemq$service$impl$SubscriptionImpl == null) {
            cls = class$("org.activemq.service.impl.SubscriptionImpl");
            class$org$activemq$service$impl$SubscriptionImpl = cls;
        } else {
            cls = class$org$activemq$service$impl$SubscriptionImpl;
        }
        log = LogFactory.getLog(cls);
    }
}
