package org.apache.qpid.jms.provider.amqp;

import java.io.IOException;
import java.util.ArrayList;
import java.util.ListIterator;
import java.util.concurrent.ScheduledFuture;
import javax.jms.JMSException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.ProviderConstants;
import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.provider.WrappedAsyncResult;
import org.apache.qpid.jms.provider.amqp.message.AmqpCodec;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/jms/provider/amqp/AmqpConsumer.class */
public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver> {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
    protected final AmqpSession session;
    protected AsyncResult stopRequest;
    protected AsyncResult pullRequest;
    protected long incomingSequence;
    protected long deliveredCount;
    protected boolean deferredClose;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/jms/provider/amqp/AmqpConsumer$DeferredCloseRequest.class */
    public final class DeferredCloseRequest implements AsyncResult {
        private DeferredCloseRequest() {
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public void onFailure(Throwable th) {
            AmqpConsumer.LOG.trace("Failed deferred close of consumer: {} - {}", AmqpConsumer.this.getConsumerId(), th.getMessage());
            AmqpConsumer.this.getParent().getProvider().fireNonFatalProviderException(JmsExceptionSupport.create(th));
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public void onSuccess() {
            AmqpConsumer.LOG.trace("Completed deferred close of consumer: {}", AmqpConsumer.this.getConsumerId());
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public boolean isComplete() {
            return AmqpConsumer.this.isClosed();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/jms/provider/amqp/AmqpConsumer$ScheduledRequest.class */
    public static final class ScheduledRequest implements AsyncResult {
        private final ScheduledFuture<?> sheduledTask;
        private final AsyncResult origRequest;

        public ScheduledRequest(ScheduledFuture<?> scheduledFuture, AsyncResult asyncResult) {
            this.sheduledTask = scheduledFuture;
            this.origRequest = asyncResult;
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public void onFailure(Throwable th) {
            this.sheduledTask.cancel(false);
            this.origRequest.onFailure(th);
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public void onSuccess() {
            if (this.sheduledTask.cancel(false)) {
                this.origRequest.onSuccess();
            }
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public boolean isComplete() {
            return this.origRequest.isComplete();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/jms/provider/amqp/AmqpConsumer$StopAndReleaseRequest.class */
    public final class StopAndReleaseRequest extends WrappedAsyncResult {
        public StopAndReleaseRequest(AsyncResult asyncResult) {
            super(asyncResult);
        }

        @Override // org.apache.qpid.jms.provider.WrappedAsyncResult, org.apache.qpid.jms.provider.AsyncResult
        public void onSuccess() {
            AmqpConsumer.this.releasePrefetch();
            super.onSuccess();
        }
    }

    public AmqpConsumer(AmqpSession amqpSession, JmsConsumerInfo jmsConsumerInfo, Receiver receiver) {
        super(jmsConsumerInfo, receiver, amqpSession);
        this.session = amqpSession;
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource
    public void close(AsyncResult asyncResult) {
        if (!shouldDeferClose()) {
            super.close(asyncResult);
        } else {
            this.deferredClose = true;
            stop(new StopAndReleaseRequest(asyncResult));
        }
    }

    public void start(AsyncResult asyncResult) {
        JmsConsumerInfo resourceInfo = getResourceInfo();
        if (resourceInfo.isListener() && resourceInfo.getPrefetchSize() == 0) {
            sendFlowForNoPrefetchListener();
        } else {
            sendFlowIfNeeded();
        }
        asyncResult.onSuccess();
    }

    public void stop(AsyncResult asyncResult) {
        Receiver endpoint = getEndpoint();
        if (endpoint.getRemoteCredit() <= 0) {
            if (endpoint.getQueued() == 0) {
                asyncResult.onSuccess();
                return;
            } else {
                this.stopRequest = asyncResult;
                return;
            }
        }
        this.stopRequest = asyncResult;
        endpoint.drain(0);
        if (getDrainTimeout() > 0) {
            this.stopRequest = new ScheduledRequest(getSession().schedule(new Runnable() { // from class: org.apache.qpid.jms.provider.amqp.AmqpConsumer.1
                @Override // java.lang.Runnable
                public void run() {
                    AmqpConsumer.LOG.trace("Consumer {} drain request timed out", AmqpConsumer.this.getConsumerId());
                    JMSException jmsOperationTimedOutException = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
                    if (AmqpConsumer.this.session.isTransacted() && AmqpConsumer.this.session.getTransactionContext().isInTransaction(AmqpConsumer.this.getConsumerId())) {
                        AmqpConsumer.this.stopRequest.onFailure(jmsOperationTimedOutException);
                        AmqpConsumer.this.stopRequest = null;
                    } else {
                        AmqpConsumer.this.closeResource(AmqpConsumer.this.session.getProvider(), jmsOperationTimedOutException, false);
                        AmqpConsumer.this.session.getProvider().pumpToProtonTransport();
                    }
                }
            }, getDrainTimeout()), this.stopRequest);
        }
    }

    private void stopOnSchedule(long j, final AsyncResult asyncResult) {
        LOG.trace("Consumer {} scheduling stop", getConsumerId());
        this.stopRequest = new ScheduledRequest(getSession().schedule(new Runnable() { // from class: org.apache.qpid.jms.provider.amqp.AmqpConsumer.2
            @Override // java.lang.Runnable
            public void run() {
                AmqpConsumer.LOG.trace("Consumer {} running scheduled stop", AmqpConsumer.this.getConsumerId());
                if (AmqpConsumer.this.getEndpoint().getRemoteCredit() != 0) {
                    AmqpConsumer.this.stop(asyncResult);
                    AmqpConsumer.this.session.getProvider().pumpToProtonTransport(asyncResult);
                }
            }
        }, j), asyncResult);
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource, org.apache.qpid.jms.provider.amqp.AmqpEventSink
    public void processFlowUpdates(AmqpProvider amqpProvider) throws IOException {
        if (this.stopRequest != null) {
            Receiver endpoint = getEndpoint();
            if (endpoint.getRemoteCredit() <= 0 && endpoint.getQueued() == 0) {
                this.stopRequest.onSuccess();
                this.stopRequest = null;
            }
        }
        if (this.pullRequest != null) {
            Receiver endpoint2 = getEndpoint();
            if (endpoint2.getRemoteCredit() <= 0 && endpoint2.getQueued() == 0) {
                this.pullRequest.onSuccess();
                this.pullRequest = null;
            }
        }
        LOG.trace("Consumer {} flow updated, remote credit = {}", getConsumerId(), Integer.valueOf(getEndpoint().getRemoteCredit()));
        super.processFlowUpdates(amqpProvider);
    }

    public void acknowledge(ProviderConstants.ACK_TYPE ack_type) {
        LOG.trace("Session Acknowledge for consumer {} with ack type {}", getResourceInfo().getId(), ack_type);
        Delivery head = getEndpoint().head();
        while (head != null) {
            Delivery delivery = head;
            head = head.next();
            if (!(delivery.getContext() instanceof JmsInboundMessageDispatch)) {
                LOG.debug("{} Found incomplete delivery with no context during recover processing", this);
            } else if (((JmsInboundMessageDispatch) delivery.getContext()).isDelivered()) {
                switch (ack_type) {
                    case ACCEPTED:
                        delivery.disposition(Accepted.getInstance());
                        break;
                    case RELEASED:
                        delivery.disposition(Released.getInstance());
                        break;
                    case REJECTED:
                        delivery.disposition(AmqpSupport.REJECTED);
                        break;
                    case MODIFIED_FAILED:
                        delivery.disposition(AmqpSupport.MODIFIED_FAILED);
                        break;
                    case MODIFIED_FAILED_UNDELIVERABLE:
                        delivery.disposition(AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE);
                        break;
                    default:
                        throw new IllegalArgumentException("Invalid acknowledgement type specified: " + ack_type);
                }
                delivery.settle();
                this.deliveredCount--;
            } else {
                continue;
            }
        }
        tryCompleteDeferredClose();
    }

    public void acknowledge(JmsInboundMessageDispatch jmsInboundMessageDispatch, ProviderConstants.ACK_TYPE ack_type) {
        if (!(jmsInboundMessageDispatch.getProviderHint() instanceof Delivery)) {
            LOG.warn("Received Ack for unknown message: {}", jmsInboundMessageDispatch);
            return;
        }
        Delivery delivery = (Delivery) jmsInboundMessageDispatch.getProviderHint();
        if (ack_type.equals(ProviderConstants.ACK_TYPE.DELIVERED)) {
            LOG.debug("Delivered Ack of message: {}", jmsInboundMessageDispatch);
            this.deliveredCount++;
            jmsInboundMessageDispatch.setDelivered(true);
            delivery.setDefaultDeliveryState(AmqpSupport.MODIFIED_FAILED);
            sendFlowIfNeeded();
            return;
        }
        if (ack_type.equals(ProviderConstants.ACK_TYPE.ACCEPTED)) {
            if (!jmsInboundMessageDispatch.isDelivered()) {
                sendFlowIfNeeded();
            }
            LOG.debug("Accepted Ack of message: {}", jmsInboundMessageDispatch);
            if (delivery.remotelySettled()) {
                delivery.settle();
            } else if (!this.session.isTransacted() || getResourceInfo().isBrowser()) {
                delivery.disposition(Accepted.getInstance());
                delivery.settle();
            } else if (this.session.isTransactionFailed()) {
                LOG.trace("Skipping ack of message {} in failed transaction.", jmsInboundMessageDispatch);
                return;
            } else if (this.session.getTransactionContext().getAmqpTransactionId() != null) {
                delivery.disposition(this.session.getTransactionContext().getTxnAcceptState());
                delivery.settle();
                this.session.getTransactionContext().registerTxConsumer(this);
            }
        } else if (ack_type.equals(ProviderConstants.ACK_TYPE.MODIFIED_FAILED)) {
            settleDelivery(delivery, AmqpSupport.MODIFIED_FAILED);
        } else if (ack_type.equals(ProviderConstants.ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE)) {
            settleDelivery(delivery, AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE);
        } else if (ack_type.equals(ProviderConstants.ACK_TYPE.REJECTED)) {
            settleDelivery(delivery, AmqpSupport.REJECTED);
        } else if (!ack_type.equals(ProviderConstants.ACK_TYPE.RELEASED)) {
            LOG.warn("Unsupported Ack Type for message: {}", jmsInboundMessageDispatch);
            return;
        } else {
            delivery.disposition(Released.getInstance());
            delivery.settle();
        }
        if (jmsInboundMessageDispatch.isDelivered()) {
            this.deliveredCount--;
        }
        tryCompleteDeferredClose();
    }

    private void sendFlowIfNeeded() {
        int prefetchSize = getResourceInfo().getPrefetchSize();
        if (prefetchSize == 0 || isStopping()) {
            return;
        }
        int credit = getEndpoint().getCredit();
        if (credit <= prefetchSize * 0.5d) {
            int prefetchedMessageCount = getResourceInfo().getPrefetchedMessageCount();
            if (credit + prefetchedMessageCount <= prefetchSize * 0.7d) {
                int i = (prefetchSize - credit) - prefetchedMessageCount;
                LOG.trace("Consumer {} granting additional credit: {}", getConsumerId(), Integer.valueOf(i));
                getEndpoint().flow(i);
            }
        }
    }

    private void sendFlowForNoPrefetchListener() {
        int credit = getEndpoint().getCredit();
        if (credit < 1) {
            int i = 1 - credit;
            LOG.trace("Consumer {} granting additional credit: {}", getConsumerId(), Integer.valueOf(i));
            getEndpoint().flow(i);
        }
    }

    public void recover() throws Exception {
        LOG.debug("Session Recover for consumer: {}", getResourceInfo().getId());
        ArrayList arrayList = new ArrayList();
        Delivery head = getEndpoint().head();
        while (head != null) {
            Delivery delivery = head;
            head = head.next();
            if (delivery.getContext() instanceof JmsInboundMessageDispatch) {
                JmsInboundMessageDispatch jmsInboundMessageDispatch = (JmsInboundMessageDispatch) delivery.getContext();
                if (jmsInboundMessageDispatch.isDelivered()) {
                    jmsInboundMessageDispatch.getMessage().getFacade().setRedeliveryCount(jmsInboundMessageDispatch.getMessage().getFacade().getRedeliveryCount() + 1);
                    jmsInboundMessageDispatch.setEnqueueFirst(true);
                    jmsInboundMessageDispatch.setDelivered(false);
                    arrayList.add(jmsInboundMessageDispatch);
                }
            } else {
                LOG.debug("{} Found incomplete delivery with no context during recover processing", this);
            }
        }
        ListIterator listIterator = arrayList.listIterator(arrayList.size());
        while (listIterator.hasPrevious()) {
            deliver((JmsInboundMessageDispatch) listIterator.previous());
        }
    }

    public void pull(long j, AsyncResult asyncResult) {
        LOG.trace("Pull on consumer {} with timeout = {}", getConsumerId(), Long.valueOf(j));
        if (j < 0) {
            if (getEndpoint().getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", getConsumerId());
                getEndpoint().flow(1);
            }
            this.pullRequest = asyncResult;
            return;
        }
        if (j == 0) {
            if (getEndpoint().getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", getConsumerId());
                getEndpoint().flow(1);
            }
            stop(asyncResult);
            return;
        }
        if (j > 0) {
            if (getEndpoint().getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", getConsumerId());
                getEndpoint().flow(1);
            }
            stopOnSchedule(j, asyncResult);
        }
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource, org.apache.qpid.jms.provider.amqp.AmqpEventSink
    public void processDeliveryUpdates(AmqpProvider amqpProvider, Delivery delivery) throws IOException {
        if (delivery.isReadable() && !delivery.isPartial()) {
            LOG.trace("{} has incoming Message(s).", this);
            try {
                if (processDelivery(delivery) && this.pullRequest != null) {
                    this.pullRequest.onSuccess();
                    this.pullRequest = null;
                }
            } catch (Exception e) {
                throw IOExceptionSupport.create(e);
            }
        }
        if (getEndpoint().current() == null && getEndpoint().getRemoteCredit() <= 0 && this.stopRequest != null) {
            this.stopRequest.onSuccess();
            this.stopRequest = null;
        }
        super.processDeliveryUpdates(amqpProvider, delivery);
    }

    private boolean processDelivery(Delivery delivery) throws Exception {
        delivery.setDefaultDeliveryState(Released.getInstance());
        try {
            JmsMessage asJmsMessage = AmqpCodec.decodeMessage(this, getEndpoint().recv()).asJmsMessage();
            try {
                asJmsMessage.onDispatch();
                JmsInboundMessageDispatch jmsInboundMessageDispatch = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
                jmsInboundMessageDispatch.setMessage(asJmsMessage);
                jmsInboundMessageDispatch.setConsumerId(getResourceInfo().getId());
                jmsInboundMessageDispatch.setProviderHint(delivery);
                jmsInboundMessageDispatch.setMessageId(asJmsMessage.getFacade().getProviderMessageIdObject());
                delivery.setContext(jmsInboundMessageDispatch);
                deliver(jmsInboundMessageDispatch);
                getEndpoint().advance();
                return true;
            } catch (Throwable th) {
                getEndpoint().advance();
                throw th;
            }
        } catch (Exception e) {
            LOG.warn("Error on transform: {}", e.getMessage());
            settleDelivery(delivery, AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE);
            return false;
        }
    }

    /*  JADX ERROR: Failed to decode insn: 0x0007: MOVE_MULTI, method: org.apache.qpid.jms.provider.amqp.AmqpConsumer.getNextIncomingSequenceNumber():long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:110)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    protected long getNextIncomingSequenceNumber() {
        /*
            r6 = this;
            r0 = r6
            r1 = r0
            long r1 = r1.incomingSequence
            r2 = 1
            long r1 = r1 + r2
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.incomingSequence = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.qpid.jms.provider.amqp.AmqpConsumer.getNextIncomingSequenceNumber():long");
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource
    protected void closeOrDetachEndpoint() {
        if (getResourceInfo().isDurable()) {
            getEndpoint().detach();
        } else {
            getEndpoint().close();
        }
    }

    public AmqpConnection getConnection() {
        return this.session.getConnection();
    }

    public AmqpSession getSession() {
        return this.session;
    }

    public JmsConsumerId getConsumerId() {
        return getResourceInfo().getId();
    }

    public JmsDestination getDestination() {
        return getResourceInfo().getDestination();
    }

    public boolean isStopping() {
        return this.stopRequest != null;
    }

    public int getDrainTimeout() {
        return this.session.getProvider().getDrainTimeout();
    }

    public String toString() {
        return "AmqpConsumer { " + getResourceInfo().getId() + " }";
    }

    protected void settleDelivery(Delivery delivery, DeliveryState deliveryState) {
        delivery.disposition(deliveryState);
        delivery.settle();
        sendFlowIfNeeded();
    }

    protected void deliver(JmsInboundMessageDispatch jmsInboundMessageDispatch) throws Exception {
        if (this.deferredClose) {
            return;
        }
        ProviderListener providerListener = this.session.getProvider().getProviderListener();
        if (providerListener == null) {
            LOG.error("Provider listener is not set, message will be dropped: {}", jmsInboundMessageDispatch);
        } else {
            LOG.debug("Dispatching received message: {}", jmsInboundMessageDispatch);
            providerListener.onInboundMessage(jmsInboundMessageDispatch);
        }
    }

    public void preCommit() {
    }

    public void preRollback() {
    }

    public void postCommit() {
        tryCompleteDeferredClose();
    }

    public void postRollback() {
        releasePrefetch();
        tryCompleteDeferredClose();
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource
    public void handleResourceClosure(AmqpProvider amqpProvider, Throwable th) {
        this.session.getConnection().getSubTracker().consumerRemoved(getResourceInfo());
        if (this.stopRequest != null) {
            if (th == null) {
                this.stopRequest.onSuccess();
            } else {
                this.stopRequest.onFailure(th);
            }
            this.stopRequest = null;
        }
        if (this.pullRequest != null) {
            if (th == null) {
                this.pullRequest.onSuccess();
            } else {
                this.pullRequest.onFailure(th);
            }
            this.pullRequest = null;
        }
    }

    private boolean shouldDeferClose() {
        return (getSession().isTransacted() && getSession().getTransactionContext().isInTransaction(getConsumerId())) || this.deliveredCount > 0;
    }

    private void tryCompleteDeferredClose() {
        if (this.deferredClose && this.deliveredCount == 0) {
            super.close(new DeferredCloseRequest());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void releasePrefetch() {
        Delivery head = getEndpoint().head();
        while (head != null) {
            Delivery delivery = head;
            head = head.next();
            if (!(delivery.getContext() instanceof JmsInboundMessageDispatch)) {
                LOG.debug("{} Found incomplete delivery with no context during release processing", this);
            } else if (!((JmsInboundMessageDispatch) delivery.getContext()).isDelivered()) {
                delivery.disposition(Released.getInstance());
                delivery.settle();
            }
        }
    }
}
