package org.apache.qpid.client;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.filter.JMSSelectorFilter;
import org.apache.qpidity.filter.MessageFilter;
import org.apache.qpidity.nclient.util.MessageListener;
import org.apache.qpidity.transport.DeliveryProperties;
import org.apache.qpidity.transport.RangeSet;
import org.apache.qpidity.transport.ReplyTo;
import org.apache.qpidity.transport.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/qpid-client-1.0-incubating-M3-615355.jar:org/apache/qpid/client/BasicMessageConsumer_0_10.class */
public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer> implements MessageListener {
    private final AtomicLong _messageCounter;
    private final AtomicLong _messagesReceived;
    protected final Logger _logger;
    private MessageFilter _filter;
    private AMQSession_0_10 _0_10session;
    private boolean _preAcquire;
    private boolean _isStarted;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/qpid-client-1.0-incubating-M3-615355.jar:org/apache/qpid/client/BasicMessageConsumer_0_10$NullTocken.class */
    public class NullTocken {
        private NullTocken() {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BasicMessageConsumer_0_10(int i, AMQConnection aMQConnection, AMQDestination aMQDestination, String str, boolean z, MessageFactoryRegistry messageFactoryRegistry, AMQSession aMQSession, AMQProtocolHandler aMQProtocolHandler, FieldTable fieldTable, int i2, int i3, boolean z2, int i4, boolean z3, boolean z4) throws JMSException {
        super(i, aMQConnection, aMQDestination, str, z, messageFactoryRegistry, aMQSession, aMQProtocolHandler, fieldTable, i2, i3, z2, i4, z3, z4);
        this._messageCounter = new AtomicLong(0L);
        this._messagesReceived = new AtomicLong(0L);
        this._logger = LoggerFactory.getLogger(getClass());
        this._filter = null;
        this._preAcquire = true;
        this._isStarted = false;
        this._0_10session = (AMQSession_0_10) aMQSession;
        if (str != null && !str.equals("")) {
            try {
                this._filter = new JMSSelectorFilter(str);
                if (aMQDestination instanceof AMQQueue) {
                    this._preAcquire = false;
                }
            } catch (QpidException e) {
                throw new InvalidSelectorException("cannot create consumer because of selector issue");
            }
        }
        this._isStarted = aMQConnection.started();
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void notifyMessage(AbstractJMSMessage abstractJMSMessage, int i) {
        boolean z = false;
        try {
            z = checkPreConditions(abstractJMSMessage);
        } catch (AMQException e) {
            try {
                getSession().getAMQConnection().getExceptionListener().onException(new JMSAMQException("Error when receiving message", e));
            } catch (Exception e2) {
                if (this._messageCounter.get() >= 0) {
                    this._messageCounter.decrementAndGet();
                    this._synchronousQueue.add(new NullTocken());
                }
                this._logger.error("Exception when receiving message", (Throwable) e2);
            }
        }
        if (z) {
            super.notifyMessage(abstractJMSMessage, i);
        }
    }

    @Override // org.apache.qpidity.nclient.util.MessageListener
    public void onMessage(Message message) {
        AMQShortString aMQShortString;
        AMQShortString aMQShortString2;
        if (isMessageListenerSet()) {
            this._messagesReceived.incrementAndGet();
            if (this._messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH) {
                this._0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), (short) 0, AMQSession_0_10.MAX_PREFETCH);
                this._messagesReceived.set(0L);
            }
        }
        int channelId = getSession().getChannelId();
        long messageTransferId = message.getMessageTransferId();
        String aMQShortString3 = getConsumerTag().toString();
        boolean z = false;
        Struct[] structArr = {message.getMessageProperties(), message.getDeliveryProperties()};
        if (message.getDeliveryProperties() != null) {
            aMQShortString = new AMQShortString(message.getDeliveryProperties().getExchange());
            aMQShortString2 = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
            z = message.getDeliveryProperties().getRedelivered();
        } else {
            aMQShortString = new AMQShortString("");
            aMQShortString2 = new AMQShortString("");
            structArr[1] = new DeliveryProperties();
        }
        UnprocessedMessage_0_10 unprocessedMessage_0_10 = new UnprocessedMessage_0_10(channelId, messageTransferId, aMQShortString3, aMQShortString, aMQShortString2, z);
        try {
            unprocessedMessage_0_10.receiveBody(message.readData());
        } catch (IOException e) {
            getSession().getAMQConnection().exceptionReceived(e);
        }
        ReplyTo replyTo = message.getMessageProperties().getReplyTo();
        if (replyTo != null && replyTo.getExchangeName() != null && !replyTo.getExchangeName().equals("")) {
            unprocessedMessage_0_10.setReplyToURL(message.getMessageProperties().getReplyTo().getExchangeName() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey());
        }
        unprocessedMessage_0_10.setContentHeader(structArr);
        this._messageCounter.incrementAndGet();
        getSession().messageReceived(unprocessedMessage_0_10);
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void sendCancel() throws JMSAMQException {
        ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTag().toString());
        ((AMQSession_0_10) getSession()).getQpidSession().sync();
        getSession().confirmConsumerCancelled(getConsumerTag());
        try {
            ((AMQSession_0_10) getSession()).getCurrentException();
        } catch (AMQException e) {
            throw new JMSAMQException("Problem when stopping consumer", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void postDeliver(AbstractJMSMessage abstractJMSMessage) throws JMSException {
        ((AMQSession_0_10) getSession()).addMessageTag(abstractJMSMessage.getDeliveryTag());
        super.postDeliver(abstractJMSMessage);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void notifyMessage(UnprocessedMessage unprocessedMessage, int i) {
        String str;
        String replyToURL = unprocessedMessage.getReplyToURL();
        if (replyToURL != null && !replyToURL.equals("")) {
            AMQShortString aMQShortString = new AMQShortString(replyToURL.substring(0, replyToURL.indexOf(47)));
            String str2 = "://" + replyToURL;
            if (aMQShortString.equals((CharSequence) ExchangeDefaults.TOPIC_EXCHANGE_NAME)) {
                str = ((Object) ExchangeDefaults.TOPIC_EXCHANGE_CLASS) + str2;
            } else if (aMQShortString.equals((CharSequence) ExchangeDefaults.DIRECT_EXCHANGE_NAME)) {
                str = ((Object) ExchangeDefaults.DIRECT_EXCHANGE_CLASS) + str2;
            } else if (aMQShortString.equals((CharSequence) ExchangeDefaults.HEADERS_EXCHANGE_NAME)) {
                str = ((Object) ExchangeDefaults.HEADERS_EXCHANGE_CLASS) + str2;
            } else if (aMQShortString.equals((CharSequence) ExchangeDefaults.FANOUT_EXCHANGE_NAME)) {
                str = ((Object) ExchangeDefaults.FANOUT_EXCHANGE_CLASS) + str2;
            } else {
                str = ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(aMQShortString.toString()).get().getType() + str2;
            }
            ((UnprocessedMessage_0_10) unprocessedMessage).setReplyToURL(str);
        }
        super.notifyMessage(unprocessedMessage, i);
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<Struct[], ByteBuffer> unprocessedMessage) throws Exception {
        return this._messageFactory.createMessage(unprocessedMessage.getDeliveryTag(), unprocessedMessage.isRedelivered(), unprocessedMessage.getExchange(), unprocessedMessage.getRoutingKey(), unprocessedMessage.getContentHeader(), unprocessedMessage.getBodies(), unprocessedMessage.getReplyToURL());
    }

    private boolean checkPreConditions(AbstractJMSMessage abstractJMSMessage) throws AMQException {
        boolean z = true;
        try {
            if (getMessageSelector() != null && !getMessageSelector().equals("")) {
                z = this._filter.matches(abstractJMSMessage);
            }
            if (this._logger.isDebugEnabled()) {
                this._logger.debug("messageOk " + z);
                this._logger.debug("_preAcquire " + this._preAcquire);
            }
            if (!z && this._preAcquire) {
                if (this._logger.isDebugEnabled()) {
                    this._logger.debug("filterMessage - trying to ack message");
                }
                acknowledgeMessage(abstractJMSMessage);
            } else if (!z) {
                if (this._logger.isDebugEnabled()) {
                    this._logger.debug("Message not OK, releasing");
                }
                releaseMessage(abstractJMSMessage);
            }
            if (!this._preAcquire && z && !isNoConsume()) {
                if (this._logger.isDebugEnabled()) {
                    this._logger.debug("filterMessage - trying to acquire message");
                }
                z = acquireMessage(abstractJMSMessage);
            }
            if (!z) {
                requestCreditIfCreditMode();
            }
            return z;
        } catch (Exception e) {
            throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e);
        }
    }

    private void requestCreditIfCreditMode() {
        try {
            if (getMessageListener() == null) {
                int intValue = this._messageCounter.intValue();
                this._0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), (short) 0, 1L);
                this._0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), (short) 1, -1L);
                this._0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
                this._0_10session.getQpidSession().sync();
                this._0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), (short) 1, -1L);
                if (this._messageCounter.intValue() <= intValue) {
                    this._synchronousQueue.add(new NullTocken());
                } else {
                    this._messageCounter.decrementAndGet();
                }
            }
        } catch (Exception e) {
            this._logger.error("Error getting message listener, couldn't request credit after releasing a message that failed the selector test", (Throwable) e);
        }
    }

    private void acknowledgeMessage(AbstractJMSMessage abstractJMSMessage) throws AMQException {
        if (this._preAcquire) {
            return;
        }
        RangeSet rangeSet = new RangeSet();
        rangeSet.add(abstractJMSMessage.getDeliveryTag());
        this._0_10session.getQpidSession().messageAcknowledge(rangeSet);
        this._0_10session.getCurrentException();
    }

    private void releaseMessage(AbstractJMSMessage abstractJMSMessage) throws AMQException {
        if (this._preAcquire) {
            RangeSet rangeSet = new RangeSet();
            rangeSet.add(abstractJMSMessage.getDeliveryTag());
            this._0_10session.getQpidSession().messageRelease(rangeSet);
            this._0_10session.getCurrentException();
        }
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    protected void rollbackReceivedMessages() {
    }

    private boolean acquireMessage(AbstractJMSMessage abstractJMSMessage) throws AMQException {
        boolean z = false;
        if (!this._preAcquire) {
            RangeSet rangeSet = new RangeSet();
            rangeSet.add(abstractJMSMessage.getDeliveryTag());
            this._0_10session.getQpidSession().messageAcquire(rangeSet, (short) 0);
            this._0_10session.getQpidSession().sync();
            RangeSet accquiredMessages = this._0_10session.getQpidSession().getAccquiredMessages();
            if (accquiredMessages != null && accquiredMessages.size() > 0) {
                z = true;
            }
            this._0_10session.getCurrentException();
        }
        return z;
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer, javax.jms.MessageConsumer
    public void setMessageListener(javax.jms.MessageListener messageListener) throws JMSException {
        super.setMessageListener(messageListener);
        if (messageListener == null) {
            this._0_10session.getQpidSession().messageStop(getConsumerTag().toString());
            this._0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), (short) 0);
            this._0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), (short) 1, -1L);
            this._0_10session.getQpidSession().sync();
            return;
        }
        if (this._connection.started()) {
            this._0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), (short) 1);
            this._0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), (short) 0, AMQSession_0_10.MAX_PREFETCH);
            this._0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), (short) 1, -1L);
            this._0_10session.getQpidSession().sync();
            this._messagesReceived.set(0L);
        }
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    public Object getMessageFromQueue(long j) throws InterruptedException {
        Object poll;
        if (!this._isStarted) {
            return null;
        }
        this._0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), (short) 0, 1L);
        if (j == 0) {
            poll = this._synchronousQueue.take();
        } else {
            poll = j > 0 ? this._synchronousQueue.poll(j, TimeUnit.MILLISECONDS) : this._synchronousQueue.poll();
            if (poll == null) {
                this._logger.debug("Message Didn't arrive in time, checking if one is inflight");
                this._0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
                this._0_10session.getQpidSession().sync();
                this._0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), (short) 1, -1L);
                if (this._messageCounter.get() > 0) {
                    poll = this._synchronousQueue.take();
                }
            }
        }
        if (poll instanceof NullTocken) {
            poll = null;
        }
        return poll;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void preApplicationProcessing(AbstractJMSMessage abstractJMSMessage) throws JMSException {
        this._messageCounter.decrementAndGet();
        super.preApplicationProcessing(abstractJMSMessage);
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void start() {
        this._isStarted = true;
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void stop() {
        this._isStarted = false;
    }
}
