package org.apache.qpid.jms.provider.amqp;

import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import javax.jms.JMSException;
import org.apache.qpid.jms.JmsSendTimedOutException;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.class */
public class AmqpFixedProducer extends AmqpProducer {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpFixedProducer.class);
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
    private final AmqpTransferTagGenerator tagGenerator;
    private final Set<Delivery> sent;
    private final LinkedList<InFlightSend> blocked;
    private byte[] encodeBuffer;
    private boolean presettle;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/jms/provider/amqp/AmqpFixedProducer$InFlightSend.class */
    public class InFlightSend implements AsyncResult {
        public final JmsOutboundMessageDispatch envelope;
        public final AsyncResult request;
        public ScheduledFuture<?> requestTimeout;

        public InFlightSend(JmsOutboundMessageDispatch jmsOutboundMessageDispatch, AsyncResult asyncResult) {
            this.envelope = jmsOutboundMessageDispatch;
            this.request = asyncResult;
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public void onFailure(Throwable th) {
            if (this.requestTimeout != null) {
                this.requestTimeout.cancel(false);
                this.requestTimeout = null;
            }
            AmqpFixedProducer.this.blocked.remove(this);
            this.request.onFailure(th);
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public void onSuccess() {
            if (this.requestTimeout != null) {
                this.requestTimeout.cancel(false);
                this.requestTimeout = null;
            }
            AmqpFixedProducer.this.blocked.remove(this);
            this.request.onSuccess();
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public boolean isComplete() {
            return this.request.isComplete();
        }
    }

    public AmqpFixedProducer(AmqpSession amqpSession, JmsProducerInfo jmsProducerInfo) {
        super(amqpSession, jmsProducerInfo);
        this.tagGenerator = new AmqpTransferTagGenerator(true);
        this.sent = new LinkedHashSet();
        this.blocked = new LinkedList<>();
        this.encodeBuffer = new byte[8192];
        this.presettle = false;
    }

    public AmqpFixedProducer(AmqpSession amqpSession, JmsProducerInfo jmsProducerInfo, Sender sender) {
        super(amqpSession, jmsProducerInfo, sender);
        this.tagGenerator = new AmqpTransferTagGenerator(true);
        this.sent = new LinkedHashSet();
        this.blocked = new LinkedList<>();
        this.encodeBuffer = new byte[8192];
        this.presettle = false;
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource
    public void close(AsyncResult asyncResult) {
        if (this.blocked.isEmpty()) {
            super.close(asyncResult);
        } else {
            this.closeRequest = asyncResult;
        }
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpProducer
    public boolean send(JmsOutboundMessageDispatch jmsOutboundMessageDispatch, AsyncResult asyncResult) throws IOException, JMSException {
        if (getEndpoint().getCredit() > 0) {
            doSend(jmsOutboundMessageDispatch, asyncResult);
            return true;
        }
        LOG.trace("Holding Message send until credit is available.");
        jmsOutboundMessageDispatch.setSendAsync(false);
        InFlightSend inFlightSend = new InFlightSend(jmsOutboundMessageDispatch, asyncResult);
        if (getSendTimeout() > -1) {
            inFlightSend.requestTimeout = getParent().getProvider().scheduleRequestTimeout(inFlightSend, getSendTimeout(), new JmsSendTimedOutException("Timed out waiting for credit to send Message", jmsOutboundMessageDispatch.getMessage()));
        }
        this.blocked.addLast(inFlightSend);
        return false;
    }

    private void doSend(JmsOutboundMessageDispatch jmsOutboundMessageDispatch, AsyncResult asyncResult) throws IOException, JMSException {
        Delivery delivery;
        if (this.session.isTransacted() && this.session.isTransactionFailed()) {
            asyncResult.onSuccess();
            return;
        }
        LOG.trace("Producer sending message: {}", jmsOutboundMessageDispatch);
        JmsMessageFacade facade = jmsOutboundMessageDispatch.getMessage().getFacade();
        boolean z = jmsOutboundMessageDispatch.isPresettle() || isPresettle();
        if (z) {
            delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
        } else {
            byte[] nextTag = this.tagGenerator.getNextTag();
            delivery = getEndpoint().delivery(nextTag, 0, nextTag.length);
        }
        delivery.setContext(asyncResult);
        if (this.session.isTransacted()) {
            Binary amqpTransactionId = this.session.getTransactionContext().getAmqpTransactionId();
            TransactionalState transactionalState = new TransactionalState();
            transactionalState.setTxnId(amqpTransactionId);
            delivery.disposition(transactionalState);
        }
        encodeAndSend(((AmqpJmsMessageFacade) facade).getAmqpMessage(), delivery);
        if (z) {
            delivery.settle();
        } else {
            this.sent.add(delivery);
            getEndpoint().advance();
        }
        if (jmsOutboundMessageDispatch.isSendAsync() || z) {
            asyncResult.onSuccess();
        } else if (getSendTimeout() != -1) {
            InFlightSend inFlightSend = new InFlightSend(jmsOutboundMessageDispatch, asyncResult);
            inFlightSend.requestTimeout = getParent().getProvider().scheduleRequestTimeout(inFlightSend, getSendTimeout(), new JmsSendTimedOutException("Timed out waiting for disposition of sent Message", jmsOutboundMessageDispatch.getMessage()));
            delivery.setContext(inFlightSend);
        }
    }

    private void encodeAndSend(Message message, Delivery delivery) throws IOException {
        int encode;
        while (true) {
            try {
                encode = message.encode(this.encodeBuffer, 0, this.encodeBuffer.length);
                break;
            } catch (BufferOverflowException e) {
                this.encodeBuffer = new byte[this.encodeBuffer.length * 2];
            }
        }
        int i = 0;
        while (true) {
            int send = getEndpoint().send(this.encodeBuffer, i, encode - i);
            if (send > 0) {
                i += send;
                if (encode - i == 0) {
                    return;
                }
            } else {
                LOG.warn("{} failed to send any data from current Message.", this);
            }
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:11:0x0047, code lost:
    
        doSend(r0.envelope, r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:14:0x0053, code lost:
    
        r7 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:16:0x0058, code lost:
    
        throw org.apache.qpid.jms.util.IOExceptionSupport.create(r7);
     */
    /* JADX WARN: Code restructure failed: missing block: B:4:0x0016, code lost:
    
        if (getEndpoint().getCredit() > 0) goto L6;
     */
    /* JADX WARN: Code restructure failed: missing block: B:6:0x0025, code lost:
    
        if (getEndpoint().getCredit() <= 0) goto L30;
     */
    /* JADX WARN: Code restructure failed: missing block: B:8:0x002f, code lost:
    
        if (r4.blocked.isEmpty() != false) goto L31;
     */
    /* JADX WARN: Code restructure failed: missing block: B:9:0x0032, code lost:
    
        org.apache.qpid.jms.provider.amqp.AmqpFixedProducer.LOG.trace("Dispatching previously held send");
        r0 = r4.blocked.pop();
     */
    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource, org.apache.qpid.jms.provider.amqp.AmqpEventSink
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void processFlowUpdates(org.apache.qpid.jms.provider.amqp.AmqpProvider r5) throws java.io.IOException {
        /*
            r4 = this;
            r0 = r4
            java.util.LinkedList<org.apache.qpid.jms.provider.amqp.AmqpFixedProducer$InFlightSend> r0 = r0.blocked
            boolean r0 = r0.isEmpty()
            if (r0 != 0) goto L5c
            r0 = r4
            org.apache.qpid.proton.engine.Endpoint r0 = r0.getEndpoint()
            org.apache.qpid.proton.engine.Sender r0 = (org.apache.qpid.proton.engine.Sender) r0
            int r0 = r0.getCredit()
            if (r0 <= 0) goto L5c
        L19:
            r0 = r4
            org.apache.qpid.proton.engine.Endpoint r0 = r0.getEndpoint()
            org.apache.qpid.proton.engine.Sender r0 = (org.apache.qpid.proton.engine.Sender) r0
            int r0 = r0.getCredit()
            if (r0 <= 0) goto L5c
            r0 = r4
            java.util.LinkedList<org.apache.qpid.jms.provider.amqp.AmqpFixedProducer$InFlightSend> r0 = r0.blocked
            boolean r0 = r0.isEmpty()
            if (r0 != 0) goto L5c
            org.slf4j.Logger r0 = org.apache.qpid.jms.provider.amqp.AmqpFixedProducer.LOG
            java.lang.String r1 = "Dispatching previously held send"
            r0.trace(r1)
            r0 = r4
            java.util.LinkedList<org.apache.qpid.jms.provider.amqp.AmqpFixedProducer$InFlightSend> r0 = r0.blocked
            java.lang.Object r0 = r0.pop()
            org.apache.qpid.jms.provider.amqp.AmqpFixedProducer$InFlightSend r0 = (org.apache.qpid.jms.provider.amqp.AmqpFixedProducer.InFlightSend) r0
            r6 = r0
            r0 = r4
            r1 = r6
            org.apache.qpid.jms.message.JmsOutboundMessageDispatch r1 = r1.envelope     // Catch: javax.jms.JMSException -> L53
            r2 = r6
            r0.doSend(r1, r2)     // Catch: javax.jms.JMSException -> L53
            goto L59
        L53:
            r7 = move-exception
            r0 = r7
            java.io.IOException r0 = org.apache.qpid.jms.util.IOExceptionSupport.create(r0)
            throw r0
        L59:
            goto L19
        L5c:
            r0 = r4
            org.apache.qpid.proton.engine.Endpoint r0 = r0.getEndpoint()
            org.apache.qpid.proton.engine.Sender r0 = (org.apache.qpid.proton.engine.Sender) r0
            boolean r0 = r0.getDrain()
            if (r0 == 0) goto L78
            r0 = r4
            org.apache.qpid.proton.engine.Endpoint r0 = r0.getEndpoint()
            org.apache.qpid.proton.engine.Sender r0 = (org.apache.qpid.proton.engine.Sender) r0
            int r0 = r0.drained()
        L78:
            r0 = r4
            java.util.LinkedList<org.apache.qpid.jms.provider.amqp.AmqpFixedProducer$InFlightSend> r0 = r0.blocked
            boolean r0 = r0.isEmpty()
            if (r0 == 0) goto L98
            r0 = r4
            boolean r0 = r0.isAwaitingClose()
            if (r0 == 0) goto L98
            r0 = r4
            boolean r0 = r0.isClosed()
            if (r0 != 0) goto L98
            r0 = r4
            r1 = r4
            org.apache.qpid.jms.provider.AsyncResult r1 = r1.closeRequest
            super.close(r1)
        L98:
            r0 = r4
            r1 = r5
            super.processFlowUpdates(r1)
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.qpid.jms.provider.amqp.AmqpFixedProducer.processFlowUpdates(org.apache.qpid.jms.provider.amqp.AmqpProvider):void");
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource, org.apache.qpid.jms.provider.amqp.AmqpEventSink
    public void processDeliveryUpdates(AmqpProvider amqpProvider) throws IOException {
        Outcome outcome;
        ArrayList arrayList = new ArrayList();
        for (Delivery delivery : this.sent) {
            Outcome remoteState = delivery.getRemoteState();
            if (remoteState != null) {
                if (remoteState instanceof TransactionalState) {
                    LOG.trace("State of delivery is Transactional, retrieving outcome: {}", remoteState);
                    outcome = ((TransactionalState) remoteState).getOutcome();
                } else if (remoteState instanceof Outcome) {
                    outcome = remoteState;
                } else {
                    LOG.warn("Message send updated with unsupported state: {}", remoteState);
                    outcome = null;
                }
                AsyncResult asyncResult = (AsyncResult) delivery.getContext();
                Exception exc = null;
                if (outcome instanceof Accepted) {
                    LOG.trace("Outcome of delivery was accepted: {}", delivery);
                    if (asyncResult != null && !asyncResult.isComplete()) {
                        asyncResult.onSuccess();
                    }
                } else if (outcome instanceof Rejected) {
                    LOG.trace("Outcome of delivery was rejected: {}", delivery);
                    ErrorCondition error = ((Rejected) outcome).getError();
                    if (error == null) {
                        error = getEndpoint().getRemoteCondition();
                    }
                    exc = AmqpSupport.convertToException(getEndpoint(), error);
                } else if (outcome instanceof Released) {
                    LOG.trace("Outcome of delivery was released: {}", delivery);
                    exc = new JMSException("Delivery failed: released by receiver");
                } else if (outcome instanceof Modified) {
                    LOG.trace("Outcome of delivery was modified: {}", delivery);
                    exc = new JMSException("Delivery failed: failure at remote");
                }
                if (exc != null) {
                    if (asyncResult == null || asyncResult.isComplete()) {
                        this.connection.getProvider().fireNonFatalProviderException(exc);
                    } else {
                        asyncResult.onFailure(exc);
                    }
                }
                this.tagGenerator.returnTag(delivery.getTag());
                arrayList.add(delivery);
                delivery.settle();
            }
        }
        this.sent.removeAll(arrayList);
        super.processDeliveryUpdates(amqpProvider);
    }

    public AmqpSession getSession() {
        return this.session;
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpProducer
    public boolean isAnonymous() {
        return getResourceInfo().getDestination() == null;
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpProducer
    public void setPresettle(boolean z) {
        this.presettle = z;
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpProducer
    public boolean isPresettle() {
        return this.presettle;
    }

    public long getSendTimeout() {
        return getParent().getProvider().getSendTimeout();
    }

    public String toString() {
        return "AmqpFixedProducer { " + getProducerId() + " }";
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource
    public void remotelyClosed(AmqpProvider amqpProvider) {
        super.remotelyClosed(amqpProvider);
        Exception convertToException = AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition(), this.sent.isEmpty() ? null : new JMSException("Producer closed remotely before message transfer result was notified"));
        for (Delivery delivery : this.sent) {
            try {
                AsyncResult asyncResult = (AsyncResult) delivery.getContext();
                if (asyncResult != null && !asyncResult.isComplete()) {
                    asyncResult.onFailure(convertToException);
                }
                delivery.settle();
                this.tagGenerator.returnTag(delivery.getTag());
            } catch (Exception e) {
                LOG.debug("Caught exception when failing pending send during remote producer closure: {}", delivery, e);
            }
        }
        this.sent.clear();
    }
}
