package org.apache.qpid.jms.provider.amqp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.ProviderConstants;
import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageBuilder;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/jms/provider/amqp/AmqpConsumer.class */
public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver> {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
    private static final int INITIAL_BUFFER_CAPACITY = 131072;
    protected final AmqpSession session;
    protected final Map<JmsInboundMessageDispatch, Delivery> delivered;
    protected boolean presettle;
    protected AsyncResult stopRequest;
    protected AsyncResult pullRequest;
    protected final ByteBuf incomingBuffer;
    protected final AtomicLong incomingSequence;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/qpid/jms/provider/amqp/AmqpConsumer$ScheduledStopRequest.class */
    public static final class ScheduledStopRequest implements AsyncResult {
        private final ScheduledFuture<?> sheduledStopTask;
        private final AsyncResult origRequest;

        public ScheduledStopRequest(ScheduledFuture<?> scheduledFuture, AsyncResult asyncResult) {
            this.sheduledStopTask = scheduledFuture;
            this.origRequest = asyncResult;
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public void onFailure(Throwable th) {
            this.sheduledStopTask.cancel(false);
            this.origRequest.onFailure(th);
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public void onSuccess() {
            if (this.sheduledStopTask.cancel(false)) {
                this.origRequest.onSuccess();
            }
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public boolean isComplete() {
            return this.origRequest.isComplete();
        }
    }

    public AmqpConsumer(AmqpSession amqpSession, JmsConsumerInfo jmsConsumerInfo, Receiver receiver) {
        super(jmsConsumerInfo, receiver, amqpSession);
        this.delivered = new LinkedHashMap();
        this.incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
        this.incomingSequence = new AtomicLong(0L);
        this.session = amqpSession;
    }

    public void start(AsyncResult asyncResult) {
        sendFlowIfNeeded();
        asyncResult.onSuccess();
    }

    public void stop(AsyncResult asyncResult) {
        Receiver endpoint = getEndpoint();
        if (endpoint.getRemoteCredit() > 0) {
            this.stopRequest = asyncResult;
            endpoint.drain(0);
        } else if (endpoint.getQueued() == 0) {
            asyncResult.onSuccess();
        } else {
            this.stopRequest = asyncResult;
        }
    }

    private void stopOnSchedule(long j, final AsyncResult asyncResult) {
        LOG.trace("Consumer {} scheduling stop", getConsumerId());
        this.stopRequest = new ScheduledStopRequest(getSession().schedule(new Runnable() { // from class: org.apache.qpid.jms.provider.amqp.AmqpConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                AmqpConsumer.LOG.trace("Consumer {} running scheduled stop", AmqpConsumer.this.getConsumerId());
                if (AmqpConsumer.this.getEndpoint().getRemoteCredit() != 0) {
                    AmqpConsumer.this.stop(asyncResult);
                    AmqpConsumer.this.session.getProvider().pumpToProtonTransport(asyncResult);
                }
            }
        }, j), asyncResult);
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource, org.apache.qpid.jms.provider.amqp.AmqpEventSink
    public void processFlowUpdates(AmqpProvider amqpProvider) throws IOException {
        if (this.stopRequest != null) {
            Receiver endpoint = getEndpoint();
            if (endpoint.getRemoteCredit() <= 0 && endpoint.getQueued() == 0) {
                this.stopRequest.onSuccess();
                this.stopRequest = null;
            }
        }
        if (this.pullRequest != null) {
            Receiver endpoint2 = getEndpoint();
            if (endpoint2.getRemoteCredit() <= 0 && endpoint2.getQueued() == 0) {
                this.pullRequest.onSuccess();
                this.pullRequest = null;
            }
        }
        LOG.trace("Consumer {} flow updated, remote credit = {}", getConsumerId(), Integer.valueOf(getEndpoint().getRemoteCredit()));
        super.processFlowUpdates(amqpProvider);
    }

    public void acknowledge(ProviderConstants.ACK_TYPE ack_type) {
        LOG.trace("Session Acknowledge for consumer {} with ack type {}", getResourceInfo().getId(), ack_type);
        for (Delivery delivery : this.delivered.values()) {
            switch (ack_type) {
                case ACCEPTED:
                    delivery.disposition(Accepted.getInstance());
                    break;
                case RELEASED:
                    delivery.disposition(Released.getInstance());
                    break;
                case REJECTED:
                    delivery.disposition(AmqpSupport.REJECTED);
                    break;
                case MODIFIED_FAILED:
                    delivery.disposition(AmqpSupport.MODIFIED_FAILED);
                    break;
                case MODIFIED_FAILED_UNDELIVERABLE:
                    delivery.disposition(AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE);
                    break;
                default:
                    throw new IllegalArgumentException("Invalid acknowledgement type specified: " + ack_type);
            }
            delivery.settle();
        }
        this.delivered.clear();
    }

    public void acknowledge(JmsInboundMessageDispatch jmsInboundMessageDispatch, ProviderConstants.ACK_TYPE ack_type) throws JMSException {
        Delivery delivery;
        if (jmsInboundMessageDispatch.getProviderHint() instanceof Delivery) {
            delivery = (Delivery) jmsInboundMessageDispatch.getProviderHint();
        } else {
            delivery = this.delivered.get(jmsInboundMessageDispatch);
            if (delivery == null) {
                LOG.warn("Received Ack for unknown message: {}", jmsInboundMessageDispatch);
                return;
            }
        }
        if (ack_type.equals(ProviderConstants.ACK_TYPE.DELIVERED)) {
            LOG.debug("Delivered Ack of message: {}", jmsInboundMessageDispatch);
            if (!isPresettle()) {
                this.delivered.put(jmsInboundMessageDispatch, delivery);
            }
            delivery.setDefaultDeliveryState(AmqpSupport.MODIFIED_FAILED);
            sendFlowIfNeeded();
            return;
        }
        if (!ack_type.equals(ProviderConstants.ACK_TYPE.ACCEPTED)) {
            if (ack_type.equals(ProviderConstants.ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE)) {
                deliveryFailedUndeliverable(delivery);
                return;
            }
            if (ack_type.equals(ProviderConstants.ACK_TYPE.EXPIRED)) {
                deliveryFailedUndeliverable(delivery);
                return;
            } else if (!ack_type.equals(ProviderConstants.ACK_TYPE.RELEASED)) {
                LOG.warn("Unsupported Ack Type for message: {}", jmsInboundMessageDispatch);
                return;
            } else {
                delivery.disposition(Released.getInstance());
                delivery.settle();
                return;
            }
        }
        if (isPresettle() || this.delivered.remove(jmsInboundMessageDispatch) == null) {
            sendFlowIfNeeded();
        }
        LOG.debug("Accepted Ack of message: {}", jmsInboundMessageDispatch);
        if (delivery.isSettled()) {
            return;
        }
        if (!this.session.isTransacted() || getResourceInfo().isBrowser()) {
            delivery.disposition(Accepted.getInstance());
            delivery.settle();
            return;
        }
        if (this.session.isTransactionFailed()) {
            LOG.trace("Skipping ack of message {} in failed transaction.", jmsInboundMessageDispatch);
            return;
        }
        Binary amqpTransactionId = this.session.getTransactionContext().getAmqpTransactionId();
        if (amqpTransactionId != null) {
            TransactionalState transactionalState = new TransactionalState();
            transactionalState.setOutcome(Accepted.getInstance());
            transactionalState.setTxnId(amqpTransactionId);
            delivery.disposition(transactionalState);
            delivery.settle();
            this.session.getTransactionContext().registerTxConsumer(this);
        }
    }

    private void sendFlowIfNeeded() {
        if (getResourceInfo().getPrefetchSize() == 0 || isStopping()) {
            return;
        }
        int credit = getEndpoint().getCredit();
        if (credit <= getResourceInfo().getPrefetchSize() * 0.3d) {
            int prefetchSize = getResourceInfo().getPrefetchSize() - credit;
            LOG.trace("Consumer {} granting additional credit: {}", getConsumerId(), Integer.valueOf(prefetchSize));
            getEndpoint().flow(prefetchSize);
        }
    }

    public void recover() throws Exception {
        LOG.debug("Session Recover for consumer: {}", getResourceInfo().getId());
        Set<JmsInboundMessageDispatch> keySet = this.delivered.keySet();
        ListIterator listIterator = new ArrayList(keySet).listIterator(keySet.size());
        while (listIterator.hasPrevious()) {
            JmsInboundMessageDispatch jmsInboundMessageDispatch = (JmsInboundMessageDispatch) listIterator.previous();
            jmsInboundMessageDispatch.getMessage().getFacade().setRedeliveryCount(jmsInboundMessageDispatch.getMessage().getFacade().getRedeliveryCount() + 1);
            jmsInboundMessageDispatch.setEnqueueFirst(true);
            deliver(jmsInboundMessageDispatch);
        }
        this.delivered.clear();
    }

    public void pull(long j, AsyncResult asyncResult) {
        LOG.trace("Pull on consumer {} with timeout = {}", getConsumerId(), Long.valueOf(j));
        if (j < 0) {
            if (getEndpoint().getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", getConsumerId());
                getEndpoint().flow(1);
            }
            this.pullRequest = asyncResult;
            return;
        }
        if (j == 0) {
            if (getEndpoint().getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", getConsumerId());
                getEndpoint().flow(1);
            }
            stop(asyncResult);
            return;
        }
        if (j > 0) {
            if (getEndpoint().getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", getConsumerId());
                getEndpoint().flow(1);
            }
            stopOnSchedule(j, asyncResult);
        }
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource, org.apache.qpid.jms.provider.amqp.AmqpEventSink
    public void processDeliveryUpdates(AmqpProvider amqpProvider) throws IOException {
        Delivery current;
        do {
            current = getEndpoint().current();
            if (current != null) {
                if (!current.isReadable() || current.isPartial()) {
                    LOG.trace("{} has a partial incoming Message(s), deferring.", this);
                    current = null;
                } else {
                    LOG.trace("{} has incoming Message(s).", this);
                    try {
                        if (processDelivery(current) && this.pullRequest != null) {
                            this.pullRequest.onSuccess();
                            this.pullRequest = null;
                        }
                    } catch (Exception e) {
                        throw IOExceptionSupport.create(e);
                    }
                }
            } else if (getEndpoint().getRemoteCredit() <= 0 && this.stopRequest != null) {
                this.stopRequest.onSuccess();
                this.stopRequest = null;
            }
        } while (current != null);
        super.processDeliveryUpdates(amqpProvider);
    }

    private boolean processDelivery(Delivery delivery) throws Exception {
        delivery.setDefaultDeliveryState(Released.getInstance());
        try {
            JmsMessage createJmsMessage = AmqpJmsMessageBuilder.createJmsMessage(this, decodeIncomingMessage(delivery));
            getEndpoint().advance();
            createJmsMessage.onDispatch();
            JmsInboundMessageDispatch jmsInboundMessageDispatch = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
            jmsInboundMessageDispatch.setMessage(createJmsMessage);
            jmsInboundMessageDispatch.setConsumerId(getResourceInfo().getId());
            jmsInboundMessageDispatch.setProviderHint(delivery);
            jmsInboundMessageDispatch.setMessageId(createJmsMessage.getFacade().getProviderMessageIdObject());
            delivery.setContext(jmsInboundMessageDispatch);
            deliver(jmsInboundMessageDispatch);
            return true;
        } catch (Exception e) {
            LOG.warn("Error on transform: {}", e.getMessage());
            deliveryFailedUndeliverable(delivery);
            return false;
        }
    }

    protected long getNextIncomingSequenceNumber() {
        return this.incomingSequence.incrementAndGet();
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource
    protected void closeOrDetachEndpoint() {
        if (getResourceInfo().isDurable()) {
            getEndpoint().detach();
        } else {
            getEndpoint().close();
        }
    }

    public AmqpConnection getConnection() {
        return this.session.getConnection();
    }

    public AmqpSession getSession() {
        return this.session;
    }

    public JmsConsumerId getConsumerId() {
        return getResourceInfo().getId();
    }

    public JmsDestination getDestination() {
        return getResourceInfo().getDestination();
    }

    public boolean isPresettle() {
        return this.presettle || getResourceInfo().isBrowser();
    }

    public boolean isStopping() {
        return this.stopRequest != null;
    }

    public void setPresettle(boolean z) {
        this.presettle = z;
    }

    public String toString() {
        return "AmqpConsumer { " + getResourceInfo().getId() + " }";
    }

    protected void deliveryFailedUndeliverable(Delivery delivery) {
        delivery.disposition(AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE);
        delivery.settle();
        sendFlowIfNeeded();
    }

    protected void deliver(JmsInboundMessageDispatch jmsInboundMessageDispatch) throws Exception {
        ProviderListener providerListener = this.session.getProvider().getProviderListener();
        if (providerListener == null) {
            LOG.error("Provider listener is not set, message will be dropped: {}", jmsInboundMessageDispatch);
            return;
        }
        if (jmsInboundMessageDispatch.getMessage() != null) {
            LOG.debug("Dispatching received message: {}", jmsInboundMessageDispatch);
        } else {
            LOG.debug("Dispatching end of pull/browse to: {}", jmsInboundMessageDispatch.getConsumerId());
        }
        providerListener.onInboundMessage(jmsInboundMessageDispatch);
    }

    protected Message decodeIncomingMessage(Delivery delivery) {
        while (true) {
            int recv = getEndpoint().recv(this.incomingBuffer.array(), this.incomingBuffer.writerIndex(), this.incomingBuffer.writableBytes());
            if (recv <= 0) {
                try {
                    Message create = Message.Factory.create();
                    create.decode(this.incomingBuffer.array(), 0, this.incomingBuffer.readableBytes());
                    this.incomingBuffer.clear();
                    return create;
                } catch (Throwable th) {
                    this.incomingBuffer.clear();
                    throw th;
                }
            }
            this.incomingBuffer.writerIndex(this.incomingBuffer.writerIndex() + recv);
            if (!this.incomingBuffer.isWritable()) {
                this.incomingBuffer.capacity((int) (this.incomingBuffer.capacity() * 1.5d));
            }
        }
    }

    public void preCommit() {
    }

    public void preRollback() {
    }

    public void postCommit() {
    }

    public void postRollback() {
    }
}
