package org.apache.qpidity.njms;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.exchange.ExchangeDefaults;
import org.apache.qpidity.filter.JMSSelectorFilter;
import org.apache.qpidity.filter.MessageFilter;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessageListener;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.njms.message.MessageFactory;
import org.apache.qpidity.njms.message.QpidMessage;
import org.apache.qpidity.transport.Option;
import org.apache.qpidity.transport.RangeSet;

/* loaded from: input_file:WEB-INF/lib/qpid-client-1.0-incubating-M3-615355.jar:org/apache/qpidity/njms/MessageConsumerImpl.class */
public class MessageConsumerImpl extends MessageActor implements MessageConsumer, MessageListener {
    public static final int MAX_MESSAGE_TRANSFERRED = 100;
    private String _messageSelector;
    private MessageFilter _filter;
    protected boolean _noLocal;
    protected String _subscriptionName;
    private boolean _preAcquire;
    private javax.jms.MessageListener _messageListener;
    private final Object _incomingMessageLock;
    private int _messageAsyncrhonouslyReceived;
    private LinkedBlockingQueue<QpidMessage> _queue;

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageConsumerImpl(SessionImpl sessionImpl, DestinationImpl destinationImpl, String str, boolean z, String str2, String str3) throws Exception {
        super(sessionImpl, destinationImpl, str3);
        String qpidQueueName;
        this._messageSelector = null;
        this._filter = null;
        this._preAcquire = true;
        this._incomingMessageLock = new Object();
        this._messageAsyncrhonouslyReceived = 0;
        this._queue = new LinkedBlockingQueue<>();
        if (str != null) {
            this._messageSelector = str;
            this._filter = new JMSSelectorFilter(str);
        }
        this._noLocal = z;
        this._subscriptionName = str2;
        this._isStopped = getSession().isStopped();
        MessagePartListenerAdapter messagePartListenerAdapter = new MessagePartListenerAdapter(this);
        if (destinationImpl instanceof Queue) {
            Session qpidSession = getSession().getQpidSession();
            String qpidQueueName2 = destinationImpl.getQpidQueueName();
            String messageActorID = getMessageActorID();
            short s = this._messageSelector != null ? (short) 1 : (short) 0;
            Option[] optionArr = new Option[1];
            optionArr[0] = this._noLocal ? Option.NO_LOCAL : Option.NO_OPTION;
            qpidSession.messageSubscribe(qpidQueueName2, messageActorID, (short) 0, s, messagePartListenerAdapter, null, optionArr);
            if (this._messageSelector != null) {
                this._preAcquire = false;
            }
        } else {
            if (str2 != null) {
                qpidQueueName = "topic-" + str2;
                getSession().getQpidSession().queueDeclare(qpidQueueName, null, null, Option.EXCLUSIVE, Option.DURABLE);
            } else {
                qpidQueueName = destinationImpl.getQpidQueueName();
                getSession().getQpidSession().queueDeclare(qpidQueueName, null, null, Option.AUTO_DELETE, Option.EXCLUSIVE);
            }
            getSession().getQpidSession().queueBind(qpidQueueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destinationImpl.getRoutingKey(), null);
            Session qpidSession2 = getSession().getQpidSession();
            String str4 = qpidQueueName;
            String messageActorID2 = getMessageActorID();
            Option[] optionArr2 = new Option[2];
            optionArr2[0] = this._noLocal ? Option.NO_LOCAL : Option.NO_OPTION;
            optionArr2[1] = Option.EXCLUSIVE;
            qpidSession2.messageSubscribe(str4, messageActorID2, (short) 0, (short) 0, messagePartListenerAdapter, null, optionArr2);
        }
        getSession().getQpidSession().messageFlowMode(getMessageActorID(), (short) 0);
        getSession().getQpidSession().messageFlow(getMessageActorID(), (short) 1, -1L);
        requestCredit(1);
        try {
            requestSync();
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (getSession().getCurrentException() != null) {
            throw getSession().getCurrentException();
        }
    }

    @Override // javax.jms.MessageConsumer
    public String getMessageSelector() throws JMSException {
        checkNotClosed();
        return this._messageSelector;
    }

    @Override // javax.jms.MessageConsumer
    public javax.jms.MessageListener getMessageListener() throws JMSException {
        checkNotClosed();
        return this._messageListener;
    }

    @Override // javax.jms.MessageConsumer
    public synchronized void setMessageListener(javax.jms.MessageListener messageListener) throws JMSException {
        checkNotClosed();
        try {
            this._messageListener = messageListener;
            if (messageListener != null) {
                resetAsynchMessageReceived();
            }
        } catch (Exception e) {
            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
        }
    }

    private void resetAsynchMessageReceived() throws QpidException {
        if (!this._isStopped && this._messageAsyncrhonouslyReceived >= 100) {
            getSession().getQpidSession().messageStop(getMessageActorID());
        }
        this._messageAsyncrhonouslyReceived = 0;
        requestCredit(100);
    }

    @Override // javax.jms.MessageConsumer
    public Message receive() throws JMSException {
        Message receiveNoWait = receiveNoWait();
        if (receiveNoWait != null) {
            return receiveNoWait;
        }
        try {
            requestCredit(1);
            return (Message) this._queue.take();
        } catch (Exception e) {
            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
        }
    }

    @Override // javax.jms.MessageConsumer
    public Message receive(long j) throws JMSException {
        checkClosed();
        checkIfListenerSet();
        if (j < 0) {
            throw new JMSException("Invalid timeout value: " + j);
        }
        try {
            Message message = (Message) this._queue.poll();
            if (message != null) {
                return message;
            }
            requestCredit(1);
            requestFlush();
            return (Message) this._queue.poll(j, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
        }
    }

    @Override // javax.jms.MessageConsumer
    public Message receiveNoWait() throws JMSException {
        checkClosed();
        checkIfListenerSet();
        try {
            Message message = (Message) this._queue.poll();
            if (message != null) {
                return message;
            }
            requestCredit(1);
            requestFlush();
            requestSync();
            return (Message) this._queue.poll();
        } catch (Exception e) {
            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
        }
    }

    private void requestCredit(int i) {
        getSession().getQpidSession().messageFlow(getMessageActorID(), (short) 0, i);
    }

    private void requestFlush() {
        getSession().getQpidSession().messageFlush(getMessageActorID());
    }

    private void requestSync() {
        getSession().getQpidSession().sync();
    }

    private void checkClosed() throws JMSException {
        if (this._isStopped) {
            throw new JMSException("Session is closed");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpidity.njms.MessageActor
    public void stop() throws Exception {
        getSession().getQpidSession().messageStop(getMessageActorID());
        this._isStopped = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpidity.njms.MessageActor
    public void start() throws Exception {
        synchronized (this._incomingMessageLock) {
            this._isStopped = false;
        }
    }

    @Override // org.apache.qpidity.nclient.util.MessageListener
    public void onMessage(org.apache.qpidity.api.Message message) {
        try {
            QpidMessage qpidMessage = MessageFactory.getQpidMessage(message);
            if (checkPreConditions(qpidMessage)) {
                preApplicationProcessing(qpidMessage);
                if (this._messageListener == null) {
                    this._queue.offer(qpidMessage);
                } else {
                    notifyMessageListener(qpidMessage);
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void notifyMessageListener(QpidMessage qpidMessage) throws RuntimeException {
        try {
            this._messageAsyncrhonouslyReceived++;
            if (this._messageAsyncrhonouslyReceived >= 100) {
                resetAsynchMessageReceived();
            }
            try {
                this._messageListener.onMessage((Message) qpidMessage);
            } catch (RuntimeException e) {
            }
        } catch (Exception e2) {
            throw new RuntimeException(e2.getMessage());
        }
    }

    private void checkIfListenerSet() throws IllegalStateException {
        if (this._messageListener != null) {
            throw new IllegalStateException("A listener has already been set.");
        }
    }

    private void preApplicationProcessing(QpidMessage qpidMessage) throws Exception {
        getSession().preProcessMessage(qpidMessage);
        if (getSession().getTransacted()) {
            getSession().acknowledgeMessage(qpidMessage);
        }
        qpidMessage.afterMessageReceive();
    }

    private boolean checkPreConditions(QpidMessage qpidMessage) throws QpidException {
        boolean z = true;
        if (this._messageSelector != null) {
            z = false;
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug("messageOk " + z);
            _logger.debug("_preAcquire " + this._preAcquire);
        }
        if (!z && this._preAcquire) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("filterMessage - trying to ack message");
            }
            acknowledgeMessage(qpidMessage);
        } else if (!z) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("Message not OK, releasing");
            }
            releaseMessage(qpidMessage);
        }
        if (!this._preAcquire && z) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("filterMessage - trying to acquire message");
            }
            z = acquireMessage(qpidMessage);
        }
        return z;
    }

    private void releaseMessage(QpidMessage qpidMessage) throws QpidException {
        if (this._preAcquire) {
            RangeSet rangeSet = new RangeSet();
            rangeSet.add(qpidMessage.getMessageTransferId());
            getSession().getQpidSession().messageRelease(rangeSet);
            getSession().testQpidException();
        }
    }

    private boolean acquireMessage(QpidMessage qpidMessage) throws QpidException {
        boolean z = false;
        if (!this._preAcquire) {
            RangeSet rangeSet = new RangeSet();
            rangeSet.add(qpidMessage.getMessageTransferId());
            getSession().getQpidSession().messageAcquire(rangeSet, (short) 0);
            getSession().getQpidSession().sync();
            if (getSession().getQpidSession().getAccquiredMessages().size() > 0) {
                z = true;
            }
            getSession().testQpidException();
        }
        return z;
    }

    private void acknowledgeMessage(QpidMessage qpidMessage) throws QpidException {
        if (this._preAcquire) {
            return;
        }
        RangeSet rangeSet = new RangeSet();
        rangeSet.add(qpidMessage.getMessageTransferId());
        getSession().getQpidSession().messageAcknowledge(rangeSet);
        getSession().testQpidException();
    }
}
