/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.andes.client;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.andes.AMQException;
import org.wso2.andes.AMQInternalException;
import org.wso2.andes.client.AMQConnection;
import org.wso2.andes.client.AMQDestination;
import org.wso2.andes.client.AMQQueue;
import org.wso2.andes.client.AMQSession;
import org.wso2.andes.client.AMQSession_0_10;
import org.wso2.andes.client.BasicMessageConsumer;
import org.wso2.andes.client.DelayedObject;
import org.wso2.andes.client.failover.FailoverException;
import org.wso2.andes.client.message.AMQMessageDelegateFactory;
import org.wso2.andes.client.message.AMQMessageDelegate_0_10;
import org.wso2.andes.client.message.AbstractJMSMessage;
import org.wso2.andes.client.message.MessageFactoryRegistry;
import org.wso2.andes.client.message.UnprocessedMessage_0_10;
import org.wso2.andes.client.protocol.AMQProtocolHandler;
import org.wso2.andes.filter.JMSSelectorFilter;
import org.wso2.andes.filter.MessageFilter;
import org.wso2.andes.framing.FieldTable;
import org.wso2.andes.protocol.AMQConstant;
import org.wso2.andes.transport.Acquired;
import org.wso2.andes.transport.MessageCreditUnit;
import org.wso2.andes.transport.Option;
import org.wso2.andes.transport.RangeSet;
import org.wso2.andes.transport.SessionException;

public class BasicMessageConsumer_0_10
extends BasicMessageConsumer<UnprocessedMessage_0_10> {
    protected final Logger _logger = LoggerFactory.getLogger(this.getClass());
    private MessageFilter _filter = null;
    private AMQSession_0_10 _0_10session;
    private boolean _preAcquire = true;
    private boolean _isStarted = false;
    private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
    private String _consumerTagString;
    private long capacity = 0L;

    protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException {
        super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
        this._0_10session = (AMQSession_0_10)session;
        if (messageSelector != null && !messageSelector.equals("")) {
            try {
                this._filter = new JMSSelectorFilter(messageSelector);
            }
            catch (AMQInternalException e) {
                throw new InvalidSelectorException("cannot create consumer because of selector issue");
            }
            if (destination instanceof AMQQueue) {
                this._preAcquire = false;
            }
        }
        this._isStarted = connection.started();
        if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && destination.getLink().getConsumerCapacity() > 0) {
            this.capacity = destination.getLink().getConsumerCapacity();
        } else if (this.getSession().prefetch()) {
            this.capacity = this._0_10session.getAMQConnection().getMaxPrefetch();
        }
        if (destination.isAddressResolved() && 2 == destination.getAddressType()) {
            boolean namedQueue;
            boolean bl = namedQueue = destination.getLink() != null && destination.getLink().getName() != null;
            if (!namedQueue) {
                this._destination = destination.copyDestination();
                this._destination.setQueueName(null);
            }
        }
    }

    @Override
    public void setConsumerTag(int consumerTag) {
        super.setConsumerTag(consumerTag);
        this._consumerTagString = String.valueOf(consumerTag);
    }

    public String getConsumerTagString() {
        return this._consumerTagString;
    }

    @Override
    public void notifyMessage(AbstractJMSMessage jmsMessage) {
        try {
            if (this.checkPreConditions(jmsMessage)) {
                if (this.isMessageListenerSet() && this.capacity == 0L) {
                    this._0_10session.getQpidSession().messageFlow(this.getConsumerTagString(), MessageCreditUnit.MESSAGE, 1L, Option.UNRELIABLE);
                }
                this._logger.debug("messageOk, trying to notify");
                super.notifyMessage(jmsMessage);
            }
        }
        catch (AMQException e) {
            this._logger.error("Receivecd an Exception when receiving message", e);
            this.getSession().getAMQConnection().exceptionReceived(e);
        }
    }

    @Override
    void sendCancel() throws AMQException {
        this._0_10session.getQpidSession().messageCancel(this.getConsumerTagString(), new Option[0]);
        try {
            this._0_10session.getQpidSession().sync();
            this.getSession().confirmConsumerCancelled(this.getConsumerTag());
        }
        catch (SessionException se) {
            this._0_10session.setCurrentException(se);
        }
        AMQException amqe = this._0_10session.getCurrentException();
        if (amqe != null) {
            throw amqe;
        }
    }

    @Override
    void notifyMessage(UnprocessedMessage_0_10 messageFrame) {
        super.notifyMessage(messageFrame);
    }

    @Override
    protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException {
        super.preApplicationProcessing(jmsMsg);
        if (!(this._session.getTransacted() || this._session.getAcknowledgeMode() == 259 && this._session.getAcknowledgeMode() == 2)) {
            this._session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
        }
    }

    @Override
    public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception {
        AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)this.getSession()).getQpidSession());
        return this._messageFactory.createMessage(msg.getMessageTransfer());
    }

    private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException {
        boolean messageOk = true;
        try {
            if (this._messageSelector != null && !this._messageSelector.equals("")) {
                messageOk = this._filter.matches(message);
            }
        }
        catch (Exception e) {
            throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e);
        }
        if (this._logger.isDebugEnabled()) {
            this._logger.debug("messageOk " + messageOk);
            this._logger.debug("_preAcquire " + this._preAcquire);
        }
        if (!messageOk) {
            if (this._preAcquire) {
                if (this._logger.isDebugEnabled()) {
                    this._logger.debug("filterMessage - trying to ack message");
                }
                this.acknowledgeMessage(message);
            } else {
                if (this._logger.isDebugEnabled()) {
                    this._logger.debug("Message not OK, releasing");
                }
                this.releaseMessage(message);
            }
            if (this.capacity == 0L) {
                this._0_10session.getQpidSession().messageFlow(this.getConsumerTagString(), MessageCreditUnit.MESSAGE, 1L, Option.UNRELIABLE);
            }
        }
        if (!this._preAcquire && messageOk && !this.isNoConsume()) {
            if (this._logger.isDebugEnabled()) {
                this._logger.debug("filterMessage - trying to acquire message");
            }
            messageOk = this.acquireMessage(message);
            this._logger.debug("filterMessage - message acquire status : " + messageOk);
        }
        return messageOk;
    }

    private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException {
        if (!this._preAcquire) {
            RangeSet ranges = new RangeSet();
            ranges.add((int)message.getDeliveryTag());
            this._0_10session.messageAcknowledge(ranges, this._acknowledgeMode != 257);
            AMQException amqe = this._0_10session.getCurrentException();
            if (amqe != null) {
                throw amqe;
            }
        }
    }

    private void releaseMessage(AbstractJMSMessage message) throws AMQException {
        if (this._preAcquire) {
            RangeSet ranges = new RangeSet();
            ranges.add((int)message.getDeliveryTag());
            this._0_10session.getQpidSession().messageRelease(ranges, new Option[0]);
            this._0_10session.sync();
        }
    }

    private boolean acquireMessage(AbstractJMSMessage message) throws AMQException {
        boolean result = false;
        if (!this._preAcquire) {
            RangeSet ranges = new RangeSet();
            ranges.add((int)message.getDeliveryTag());
            Acquired acq = this._0_10session.getQpidSession().messageAcquire(ranges, new Option[0]).get();
            RangeSet acquired = acq.getTransfers();
            if (acquired != null && acquired.size() > 0) {
                result = true;
            }
        }
        return result;
    }

    @Override
    public void setMessageListener(MessageListener messageListener) throws JMSException {
        super.setMessageListener(messageListener);
        if (messageListener != null && this.capacity == 0L) {
            this._0_10session.getQpidSession().messageFlow(this.getConsumerTagString(), MessageCreditUnit.MESSAGE, 1L, Option.UNRELIABLE);
        }
        if (messageListener != null && !this._synchronousQueue.isEmpty()) {
            Iterator messages = this._synchronousQueue.iterator();
            while (messages.hasNext()) {
                AbstractJMSMessage message = (AbstractJMSMessage)((DelayedObject)messages.next()).getObject();
                messages.remove();
                this._session.rejectMessage(message, true);
            }
        }
    }

    @Override
    public void failedOverPost() {
        if (this._0_10session.isStarted() && this._syncReceive.get()) {
            this._0_10session.getQpidSession().messageFlow(this.getConsumerTagString(), MessageCreditUnit.MESSAGE, 1L, Option.UNRELIABLE);
        }
    }

    @Override
    public Object getMessageFromQueue(long l) throws InterruptedException {
        Object o;
        if (this.capacity == 0L) {
            this._syncReceive.set(true);
        }
        if (this._0_10session.isStarted() && this.capacity == 0L && this._synchronousQueue.isEmpty()) {
            this._0_10session.getQpidSession().messageFlow(this.getConsumerTagString(), MessageCreditUnit.MESSAGE, 1L, Option.UNRELIABLE);
        }
        if ((o = super.getMessageFromQueue(l)) == null && this._0_10session.isStarted()) {
            this._0_10session.getQpidSession().messageFlush(this.getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
            this._0_10session.getQpidSession().sync();
            this._0_10session.getQpidSession().messageFlow(this.getConsumerTagString(), MessageCreditUnit.BYTE, -1L, Option.UNRELIABLE);
            if (this.capacity > 0L) {
                this._0_10session.getQpidSession().messageFlow(this.getConsumerTagString(), MessageCreditUnit.MESSAGE, this.capacity, Option.UNRELIABLE);
            }
            this._0_10session.syncDispatchQueue();
            o = super.getMessageFromQueue(-1L);
        }
        if (this.capacity == 0L) {
            this._syncReceive.set(false);
        }
        return o;
    }

    @Override
    void postDeliver(AbstractJMSMessage msg) throws JMSException {
        super.postDeliver(msg);
        if (this._acknowledgeMode == 257 && !this._session.isInRecovery()) {
            this._session.acknowledgeMessage(msg.getDeliveryTag(), false);
        }
        if (this._acknowledgeMode == 1 && !this._session.isInRecovery() && this._session.getAMQConnection().getSyncAck()) {
            ((AMQSession_0_10)this.getSession()).flushAcknowledgments();
            ((AMQSession_0_10)this.getSession()).getQpidSession().sync();
        }
    }

    @Override
    Message receiveBrowse() throws JMSException {
        return this.receiveNoWait();
    }

    @Override
    public void clearPendingMessages() {
        if (this._synchronousQueue.size() > 0) {
            RangeSet ranges = new RangeSet();
            Iterator iterator = this._synchronousQueue.iterator();
            while (iterator.hasNext()) {
                Object o = ((DelayedObject)iterator.next()).getObject();
                if (o instanceof AbstractJMSMessage) {
                    ranges.add((int)((AbstractJMSMessage)o).getDeliveryTag());
                    iterator.remove();
                    continue;
                }
                this._logger.error("Queue contained a :" + o.getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
                iterator.remove();
            }
            this._0_10session.getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
            this.clearReceiveQueue();
        }
    }

    @Override
    public boolean isExclusive() {
        AMQDestination dest = this.getDestination();
        if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) {
            if (dest.getAddressType() == 2) {
                return true;
            }
            return dest.getLink().getSubscription().isExclusive();
        }
        return this._exclusive;
    }

    @Override
    void cleanupQueue() throws AMQException, FailoverException {
        AMQDestination dest = this.getDestination();
        if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR && (dest.getDelete() == AMQDestination.AddressOption.ALWAYS || dest.getDelete() == AMQDestination.AddressOption.RECEIVER)) {
            ((AMQSession_0_10)this.getSession()).getQpidSession().queueDelete(this.getDestination().getQueueName(), new Option[0]);
        }
    }
}

