package org.apache.qpid.jms.provider.amqp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.ProviderConstants;
import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageBuilder;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/jms/provider/amqp/AmqpConsumer.class */
public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver> {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
    protected static final Symbol COPY = Symbol.getSymbol("copy");
    protected static final Symbol JMS_NO_LOCAL_SYMBOL = Symbol.valueOf("no-local");
    protected static final Symbol JMS_SELECTOR_SYMBOL = Symbol.valueOf("jms-selector");
    private static final int INITIAL_BUFFER_CAPACITY = 131072;
    protected final AmqpSession session;
    protected final Map<JmsInboundMessageDispatch, Delivery> delivered;
    protected boolean presettle;
    private final ByteBuf incomingBuffer;
    private final AtomicLong _incomingSequence;
    private AsyncResult stopRequest;

    public AmqpConsumer(AmqpSession amqpSession, JmsConsumerInfo jmsConsumerInfo) {
        super(jmsConsumerInfo);
        this.delivered = new LinkedHashMap();
        this.incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
        this._incomingSequence = new AtomicLong(0L);
        this.session = amqpSession;
        ((JmsConsumerInfo) this.resource).getConsumerId().setProviderHint(this);
    }

    public void start(AsyncResult asyncResult) {
        getEndpoint().flow(((JmsConsumerInfo) this.resource).getPrefetchSize());
        asyncResult.onSuccess();
    }

    public void stop(AsyncResult asyncResult) {
        Receiver endpoint = getEndpoint();
        if (endpoint.getRemoteCredit() > 0) {
            this.stopRequest = asyncResult;
            endpoint.drain(0);
        } else if (endpoint.getQueued() == 0) {
            asyncResult.onSuccess();
        } else {
            this.stopRequest = asyncResult;
        }
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource, org.apache.qpid.jms.provider.amqp.AmqpResource
    public void processFlowUpdates(AmqpProvider amqpProvider) throws IOException {
        if (this.stopRequest != null) {
            Receiver endpoint = getEndpoint();
            if (endpoint.getRemoteCredit() <= 0 && endpoint.getQueued() == 0) {
                this.stopRequest.onSuccess();
                this.stopRequest = null;
            }
        }
        super.processFlowUpdates(amqpProvider);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource
    public void doOpen() {
        String destinationAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(((JmsConsumerInfo) this.resource).getDestination(), this.session.getConnection());
        Source source = new Source();
        source.setAddress(destinationAddress);
        Target target = new Target();
        configureSource(source);
        String str = "qpid-jms:receiver:" + getConsumerId() + ":" + destinationAddress;
        if (((JmsConsumerInfo) this.resource).getSubscriptionName() != null && !((JmsConsumerInfo) this.resource).getSubscriptionName().isEmpty()) {
            str = ((JmsConsumerInfo) this.resource).getSubscriptionName();
        }
        Receiver receiver = this.session.getProtonSession().receiver(str);
        receiver.setSource(source);
        receiver.setTarget(target);
        if (isPresettle()) {
            receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
        } else {
            receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        }
        receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        setEndpoint(receiver);
        super.doOpen();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource
    public void doOpenCompletion() {
        if (getEndpoint().getRemoteSource() != null) {
            super.doOpenCompletion();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource
    public Exception getOpenAbortException() {
        return getEndpoint().getRemoteSource() != null ? super.getOpenAbortException() : new InvalidDestinationException("Link creation was refused");
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource, org.apache.qpid.jms.provider.amqp.AmqpResource
    public void opened() {
        this.session.addResource(this);
        super.opened();
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource, org.apache.qpid.jms.provider.amqp.AmqpResource
    public void closed() {
        this.session.removeResource(this);
        super.closed();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void configureSource(Source source) {
        HashMap hashMap = new HashMap();
        Symbol[] symbolArr = {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
        if (((JmsConsumerInfo) this.resource).getSubscriptionName() == null || ((JmsConsumerInfo) this.resource).getSubscriptionName().isEmpty()) {
            source.setDurable(TerminusDurability.NONE);
            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
        } else {
            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
            source.setDurable(TerminusDurability.UNSETTLED_STATE);
            source.setDistributionMode(COPY);
        }
        Symbol typeCapability = AmqpDestinationHelper.INSTANCE.toTypeCapability(((JmsConsumerInfo) this.resource).getDestination());
        if (typeCapability != null) {
            source.setCapabilities(new Symbol[]{typeCapability});
        }
        source.setOutcomes(symbolArr);
        Modified modified = new Modified();
        modified.setDeliveryFailed(true);
        modified.setUndeliverableHere(false);
        source.setDefaultOutcome(modified);
        if (((JmsConsumerInfo) this.resource).isNoLocal()) {
            hashMap.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL);
        }
        if (((JmsConsumerInfo) this.resource).getSelector() != null && !((JmsConsumerInfo) this.resource).getSelector().trim().equals("")) {
            hashMap.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(((JmsConsumerInfo) this.resource).getSelector()));
        }
        if (hashMap.isEmpty()) {
            return;
        }
        source.setFilter(hashMap);
    }

    public void acknowledge() {
        LOG.trace("Session Acknowledge for consumer: {}", ((JmsConsumerInfo) this.resource).getConsumerId());
        for (Delivery delivery : this.delivered.values()) {
            delivery.disposition(Accepted.getInstance());
            delivery.settle();
        }
        this.delivered.clear();
    }

    public void acknowledge(JmsInboundMessageDispatch jmsInboundMessageDispatch, ProviderConstants.ACK_TYPE ack_type) throws JMSException {
        Delivery delivery;
        if (jmsInboundMessageDispatch.getProviderHint() instanceof Delivery) {
            delivery = (Delivery) jmsInboundMessageDispatch.getProviderHint();
        } else {
            delivery = this.delivered.get(jmsInboundMessageDispatch);
            if (delivery == null) {
                LOG.warn("Received Ack for unknown message: {}", jmsInboundMessageDispatch);
                return;
            }
        }
        if (ack_type.equals(ProviderConstants.ACK_TYPE.DELIVERED)) {
            LOG.debug("Delivered Ack of message: {}", jmsInboundMessageDispatch);
            if (!isPresettle()) {
                this.delivered.put(jmsInboundMessageDispatch, delivery);
            }
            sendFlowIfNeeded();
            return;
        }
        if (!ack_type.equals(ProviderConstants.ACK_TYPE.CONSUMED)) {
            if (ack_type.equals(ProviderConstants.ACK_TYPE.REDELIVERED)) {
                return;
            }
            if (ack_type.equals(ProviderConstants.ACK_TYPE.POISONED)) {
                deliveryFailed(delivery, false);
                return;
            } else if (!ack_type.equals(ProviderConstants.ACK_TYPE.RELEASED)) {
                LOG.warn("Unsupported Ack Type for message: {}", jmsInboundMessageDispatch);
                return;
            } else {
                delivery.disposition(Released.getInstance());
                delivery.settle();
                return;
            }
        }
        if (isPresettle() || this.delivered.remove(jmsInboundMessageDispatch) == null) {
            sendFlowIfNeeded();
        }
        LOG.debug("Consumed Ack of message: {}", jmsInboundMessageDispatch);
        if (delivery.isSettled()) {
            return;
        }
        if (!this.session.isTransacted() || isBrowser()) {
            delivery.disposition(Accepted.getInstance());
            delivery.settle();
            return;
        }
        Binary amqpTransactionId = this.session.getTransactionContext().getAmqpTransactionId();
        if (amqpTransactionId != null) {
            TransactionalState transactionalState = new TransactionalState();
            transactionalState.setOutcome(Accepted.getInstance());
            transactionalState.setTxnId(amqpTransactionId);
            delivery.disposition(transactionalState);
            delivery.settle();
            this.session.getTransactionContext().registerTxConsumer(this);
        }
    }

    private void sendFlowIfNeeded() {
        if (((JmsConsumerInfo) this.resource).getPrefetchSize() == 0) {
            return;
        }
        int credit = getEndpoint().getCredit();
        if (credit <= ((JmsConsumerInfo) this.resource).getPrefetchSize() * 0.2d) {
            getEndpoint().flow(((JmsConsumerInfo) this.resource).getPrefetchSize() - credit);
        }
    }

    public void recover() throws Exception {
        LOG.debug("Session Recover for consumer: {}", ((JmsConsumerInfo) this.resource).getConsumerId());
        Set<JmsInboundMessageDispatch> keySet = this.delivered.keySet();
        ListIterator listIterator = new ArrayList(keySet).listIterator(keySet.size());
        while (listIterator.hasPrevious()) {
            JmsInboundMessageDispatch jmsInboundMessageDispatch = (JmsInboundMessageDispatch) listIterator.previous();
            jmsInboundMessageDispatch.getMessage().getFacade().setRedeliveryCount(jmsInboundMessageDispatch.getMessage().getFacade().getRedeliveryCount() + 1);
            jmsInboundMessageDispatch.setEnqueueFirst(true);
            deliver(jmsInboundMessageDispatch);
        }
        this.delivered.clear();
    }

    public void pull(long j) {
        if (((JmsConsumerInfo) this.resource).getPrefetchSize() == 0 && getEndpoint().getCredit() == 0) {
            getEndpoint().flow(1);
        }
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource, org.apache.qpid.jms.provider.amqp.AmqpResource
    public void processDeliveryUpdates(AmqpProvider amqpProvider) throws IOException {
        Delivery current;
        do {
            current = getEndpoint().current();
            if (current != null) {
                if (!current.isReadable() || current.isPartial()) {
                    LOG.trace("{} has a partial incoming Message(s), deferring.", this);
                    current = null;
                } else {
                    LOG.trace("{} has incoming Message(s).", this);
                    try {
                        processDelivery(current);
                        getEndpoint().advance();
                    } catch (Exception e) {
                        throw IOExceptionSupport.create(e);
                    }
                }
            } else if (this.stopRequest != null && getEndpoint().getRemoteCredit() <= 0) {
                this.stopRequest.onSuccess();
                this.stopRequest = null;
            }
        } while (current != null);
        super.processDeliveryUpdates(amqpProvider);
    }

    private void processDelivery(Delivery delivery) throws Exception {
        try {
            JmsMessage createJmsMessage = AmqpJmsMessageBuilder.createJmsMessage(this, decodeIncomingMessage(delivery));
            createJmsMessage.onDispatch();
            JmsInboundMessageDispatch jmsInboundMessageDispatch = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
            jmsInboundMessageDispatch.setMessage(createJmsMessage);
            jmsInboundMessageDispatch.setConsumerId(((JmsConsumerInfo) this.resource).getConsumerId());
            jmsInboundMessageDispatch.setProviderHint(delivery);
            jmsInboundMessageDispatch.setMessageId(createJmsMessage.getFacade().getProviderMessageIdObject());
            delivery.setContext(jmsInboundMessageDispatch);
            deliver(jmsInboundMessageDispatch);
        } catch (Exception e) {
            LOG.warn("Error on transform: {}", e.getMessage());
            deliveryFailed(delivery, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getNextIncomingSequenceNumber() {
        return this._incomingSequence.incrementAndGet();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource
    public void doClose() {
        if (((JmsConsumerInfo) this.resource).isDurable()) {
            getEndpoint().detach();
        } else {
            getEndpoint().close();
        }
    }

    public AmqpConnection getConnection() {
        return this.session.getConnection();
    }

    public AmqpSession getSession() {
        return this.session;
    }

    public JmsConsumerId getConsumerId() {
        return ((JmsConsumerInfo) this.resource).getConsumerId();
    }

    public JmsDestination getDestination() {
        return ((JmsConsumerInfo) this.resource).getDestination();
    }

    public Receiver getProtonReceiver() {
        return getEndpoint();
    }

    public boolean isBrowser() {
        return false;
    }

    public boolean isPresettle() {
        return this.presettle;
    }

    public void setPresettle(boolean z) {
        this.presettle = z;
    }

    public String toString() {
        return "AmqpConsumer { " + ((JmsConsumerInfo) this.resource).getConsumerId() + " }";
    }

    protected void deliveryFailed(Delivery delivery, boolean z) {
        Modified modified = new Modified();
        modified.setUndeliverableHere(true);
        modified.setDeliveryFailed(true);
        delivery.disposition(modified);
        delivery.settle();
        if (z) {
            getEndpoint().flow(1);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void deliver(JmsInboundMessageDispatch jmsInboundMessageDispatch) throws Exception {
        ProviderListener providerListener = this.session.getProvider().getProviderListener();
        if (providerListener == null) {
            LOG.error("Provider listener is not set, message will be dropped: {}", jmsInboundMessageDispatch);
            return;
        }
        if (jmsInboundMessageDispatch.getMessage() != null) {
            LOG.debug("Dispatching received message: {}", jmsInboundMessageDispatch);
        } else {
            LOG.debug("Dispatching end of browse to: {}", jmsInboundMessageDispatch.getConsumerId());
        }
        providerListener.onInboundMessage(jmsInboundMessageDispatch);
    }

    protected Message decodeIncomingMessage(Delivery delivery) {
        while (true) {
            int recv = getEndpoint().recv(this.incomingBuffer.array(), this.incomingBuffer.writerIndex(), this.incomingBuffer.writableBytes());
            if (recv <= 0) {
                try {
                    Message create = Message.Factory.create();
                    create.decode(this.incomingBuffer.array(), 0, this.incomingBuffer.readableBytes());
                    this.incomingBuffer.clear();
                    return create;
                } catch (Throwable th) {
                    this.incomingBuffer.clear();
                    throw th;
                }
            }
            this.incomingBuffer.writerIndex(this.incomingBuffer.writerIndex() + recv);
            if (!this.incomingBuffer.isWritable()) {
                this.incomingBuffer.capacity((int) (this.incomingBuffer.capacity() * 1.5d));
            }
        }
    }

    public void preCommit() {
    }

    public void preRollback() {
    }

    public void postCommit() throws Exception {
    }

    public void postRollback() throws Exception {
    }
}
