/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.jms.provider.amqp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.ProviderConstants;
import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.provider.amqp.AmqpAbstractResource;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpProvider;
import org.apache.qpid.jms.provider.amqp.AmqpSession;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageBuilder;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpConsumer
extends AmqpAbstractResource<JmsConsumerInfo, Receiver> {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
    private static final int INITIAL_BUFFER_CAPACITY = 131072;
    protected final AmqpSession session;
    protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch, Delivery>();
    protected boolean presettle;
    protected AsyncResult stopRequest;
    protected AsyncResult pullRequest;
    protected final ByteBuf incomingBuffer = Unpooled.buffer((int)131072);
    protected final AtomicLong incomingSequence = new AtomicLong(0L);

    public AmqpConsumer(AmqpSession session, JmsConsumerInfo info, Receiver receiver) {
        super(info, receiver, session);
        this.session = session;
    }

    public void start(AsyncResult request) {
        this.sendFlowIfNeeded();
        request.onSuccess();
    }

    public void stop(AsyncResult request) {
        Receiver receiver = (Receiver)this.getEndpoint();
        if (receiver.getRemoteCredit() <= 0) {
            if (receiver.getQueued() == 0) {
                request.onSuccess();
            } else {
                this.stopRequest = request;
            }
        } else {
            this.stopRequest = request;
            receiver.drain(0);
        }
    }

    private void stopOnSchedule(long timeout, final AsyncResult request) {
        LOG.trace("Consumer {} scheduling stop", (Object)this.getConsumerId());
        ScheduledFuture<?> future = this.getSession().schedule(new Runnable(){

            @Override
            public void run() {
                LOG.trace("Consumer {} running scheduled stop", (Object)AmqpConsumer.this.getConsumerId());
                if (((Receiver)AmqpConsumer.this.getEndpoint()).getRemoteCredit() != 0) {
                    AmqpConsumer.this.stop(request);
                    AmqpConsumer.this.session.getProvider().pumpToProtonTransport(request);
                }
            }
        }, timeout);
        this.stopRequest = new ScheduledStopRequest(future, request);
    }

    @Override
    public void processFlowUpdates(AmqpProvider provider) throws IOException {
        Receiver receiver;
        if (this.stopRequest != null && (receiver = (Receiver)this.getEndpoint()).getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
            this.stopRequest.onSuccess();
            this.stopRequest = null;
        }
        if (this.pullRequest != null && (receiver = (Receiver)this.getEndpoint()).getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
            this.pullRequest.onSuccess();
            this.pullRequest = null;
        }
        LOG.trace("Consumer {} flow updated, remote credit = {}", (Object)this.getConsumerId(), (Object)((Receiver)this.getEndpoint()).getRemoteCredit());
        super.processFlowUpdates(provider);
    }

    public void acknowledge(ProviderConstants.ACK_TYPE ackType) {
        LOG.trace("Session Acknowledge for consumer {} with ack type {}", (Object)((JmsConsumerInfo)this.getResourceInfo()).getId(), (Object)ackType);
        for (Delivery delivery : this.delivered.values()) {
            switch (ackType) {
                case ACCEPTED: {
                    delivery.disposition((DeliveryState)Accepted.getInstance());
                    break;
                }
                case RELEASED: {
                    delivery.disposition((DeliveryState)Released.getInstance());
                    break;
                }
                case REJECTED: {
                    delivery.disposition((DeliveryState)AmqpSupport.REJECTED);
                    break;
                }
                case MODIFIED_FAILED: {
                    delivery.disposition((DeliveryState)AmqpSupport.MODIFIED_FAILED);
                    break;
                }
                case MODIFIED_FAILED_UNDELIVERABLE: {
                    delivery.disposition((DeliveryState)AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Invalid acknowledgement type specified: " + (Object)((Object)ackType));
                }
            }
            delivery.settle();
        }
        this.delivered.clear();
    }

    public void acknowledge(JmsInboundMessageDispatch envelope, ProviderConstants.ACK_TYPE ackType) throws JMSException {
        Delivery delivery = null;
        if (envelope.getProviderHint() instanceof Delivery) {
            delivery = (Delivery)envelope.getProviderHint();
        } else {
            delivery = this.delivered.get(envelope);
            if (delivery == null) {
                LOG.warn("Received Ack for unknown message: {}", (Object)envelope);
                return;
            }
        }
        if (ackType.equals((Object)ProviderConstants.ACK_TYPE.DELIVERED)) {
            LOG.debug("Delivered Ack of message: {}", (Object)envelope);
            if (!this.isPresettle()) {
                this.delivered.put(envelope, delivery);
            }
            delivery.setDefaultDeliveryState((DeliveryState)AmqpSupport.MODIFIED_FAILED);
            this.sendFlowIfNeeded();
        } else if (ackType.equals((Object)ProviderConstants.ACK_TYPE.ACCEPTED)) {
            if (this.isPresettle() || this.delivered.remove(envelope) == null) {
                this.sendFlowIfNeeded();
            }
            LOG.debug("Accepted Ack of message: {}", (Object)envelope);
            if (!delivery.isSettled()) {
                if (this.session.isTransacted() && !((JmsConsumerInfo)this.getResourceInfo()).isBrowser()) {
                    if (this.session.isTransactionFailed()) {
                        LOG.trace("Skipping ack of message {} in failed transaction.", (Object)envelope);
                        return;
                    }
                    Binary txnId = this.session.getTransactionContext().getAmqpTransactionId();
                    if (txnId != null) {
                        TransactionalState txState = new TransactionalState();
                        txState.setOutcome((Outcome)Accepted.getInstance());
                        txState.setTxnId(txnId);
                        delivery.disposition((DeliveryState)txState);
                        delivery.settle();
                        this.session.getTransactionContext().registerTxConsumer(this);
                    }
                } else {
                    delivery.disposition((DeliveryState)Accepted.getInstance());
                    delivery.settle();
                }
            }
        } else if (ackType.equals((Object)ProviderConstants.ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE)) {
            this.deliveryFailedUndeliverable(delivery);
        } else if (ackType.equals((Object)ProviderConstants.ACK_TYPE.EXPIRED)) {
            this.deliveryFailedUndeliverable(delivery);
        } else if (ackType.equals((Object)ProviderConstants.ACK_TYPE.RELEASED)) {
            delivery.disposition((DeliveryState)Released.getInstance());
            delivery.settle();
        } else {
            LOG.warn("Unsupported Ack Type for message: {}", (Object)envelope);
        }
    }

    private void sendFlowIfNeeded() {
        if (((JmsConsumerInfo)this.getResourceInfo()).getPrefetchSize() == 0 || this.isStopping()) {
            return;
        }
        int currentCredit = ((Receiver)this.getEndpoint()).getCredit();
        if ((double)currentCredit <= (double)((JmsConsumerInfo)this.getResourceInfo()).getPrefetchSize() * 0.3) {
            int newCredit = ((JmsConsumerInfo)this.getResourceInfo()).getPrefetchSize() - currentCredit;
            LOG.trace("Consumer {} granting additional credit: {}", (Object)this.getConsumerId(), (Object)newCredit);
            ((Receiver)this.getEndpoint()).flow(newCredit);
        }
    }

    public void recover() throws Exception {
        LOG.debug("Session Recover for consumer: {}", (Object)((JmsConsumerInfo)this.getResourceInfo()).getId());
        Set<JmsInboundMessageDispatch> values = this.delivered.keySet();
        ArrayList<JmsInboundMessageDispatch> envelopes = new ArrayList<JmsInboundMessageDispatch>(values);
        ListIterator<JmsInboundMessageDispatch> reverseIterator = envelopes.listIterator(values.size());
        while (reverseIterator.hasPrevious()) {
            JmsInboundMessageDispatch envelope = reverseIterator.previous();
            envelope.getMessage().getFacade().setRedeliveryCount(envelope.getMessage().getFacade().getRedeliveryCount() + 1);
            envelope.setEnqueueFirst(true);
            this.deliver(envelope);
        }
        this.delivered.clear();
    }

    public void pull(long timeout, AsyncResult request) {
        LOG.trace("Pull on consumer {} with timeout = {}", (Object)this.getConsumerId(), (Object)timeout);
        if (timeout < 0L) {
            if (((Receiver)this.getEndpoint()).getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", (Object)this.getConsumerId());
                ((Receiver)this.getEndpoint()).flow(1);
            }
            this.pullRequest = request;
        } else if (timeout == 0L) {
            if (((Receiver)this.getEndpoint()).getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", (Object)this.getConsumerId());
                ((Receiver)this.getEndpoint()).flow(1);
            }
            this.stop(request);
        } else if (timeout > 0L) {
            if (((Receiver)this.getEndpoint()).getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", (Object)this.getConsumerId());
                ((Receiver)this.getEndpoint()).flow(1);
            }
            this.stopOnSchedule(timeout, request);
        }
    }

    /*
     * Unable to fully structure code
     */
    @Override
    public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
        incoming = null;
        do {
            if ((incoming = ((Receiver)this.getEndpoint()).current()) == null) ** GOTO lbl16
            if (incoming.isReadable() && !incoming.isPartial()) {
                AmqpConsumer.LOG.trace("{} has incoming Message(s).", (Object)this);
                try {
                    if (!this.processDelivery(incoming) || this.pullRequest == null) ** GOTO lbl19
                    this.pullRequest.onSuccess();
                    this.pullRequest = null;
                }
                catch (Exception e) {
                    throw IOExceptionSupport.create(e);
                }
            } else {
                AmqpConsumer.LOG.trace("{} has a partial incoming Message(s), deferring.", (Object)this);
                incoming = null;
                continue;
lbl16:
                // 1 sources

                if (((Receiver)this.getEndpoint()).getRemoteCredit() > 0 || this.stopRequest == null) continue;
                this.stopRequest.onSuccess();
                this.stopRequest = null;
            }
lbl19:
            // 5 sources

        } while (incoming != null);
        super.processDeliveryUpdates(provider);
    }

    private boolean processDelivery(Delivery incoming) throws Exception {
        incoming.setDefaultDeliveryState((DeliveryState)Released.getInstance());
        Message amqpMessage = this.decodeIncomingMessage(incoming);
        JmsMessage message = null;
        try {
            message = AmqpJmsMessageBuilder.createJmsMessage(this, amqpMessage);
        }
        catch (Exception e) {
            LOG.warn("Error on transform: {}", (Object)e.getMessage());
            this.deliveryFailedUndeliverable(incoming);
            return false;
        }
        ((Receiver)this.getEndpoint()).advance();
        message.onDispatch();
        JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(this.getNextIncomingSequenceNumber());
        envelope.setMessage(message);
        envelope.setConsumerId(((JmsConsumerInfo)this.getResourceInfo()).getId());
        envelope.setProviderHint(incoming);
        envelope.setMessageId(message.getFacade().getProviderMessageIdObject());
        incoming.setContext((Object)envelope);
        this.deliver(envelope);
        return true;
    }

    protected long getNextIncomingSequenceNumber() {
        return this.incomingSequence.incrementAndGet();
    }

    @Override
    protected void closeOrDetachEndpoint() {
        if (((JmsConsumerInfo)this.getResourceInfo()).isDurable()) {
            ((Receiver)this.getEndpoint()).detach();
        } else {
            ((Receiver)this.getEndpoint()).close();
        }
    }

    public AmqpConnection getConnection() {
        return this.session.getConnection();
    }

    public AmqpSession getSession() {
        return this.session;
    }

    public JmsConsumerId getConsumerId() {
        return ((JmsConsumerInfo)this.getResourceInfo()).getId();
    }

    public JmsDestination getDestination() {
        return ((JmsConsumerInfo)this.getResourceInfo()).getDestination();
    }

    public boolean isPresettle() {
        return this.presettle || ((JmsConsumerInfo)this.getResourceInfo()).isBrowser();
    }

    public boolean isStopping() {
        return this.stopRequest != null;
    }

    public void setPresettle(boolean presettle) {
        this.presettle = presettle;
    }

    public String toString() {
        return "AmqpConsumer { " + ((JmsConsumerInfo)this.getResourceInfo()).getId() + " }";
    }

    protected void deliveryFailedUndeliverable(Delivery incoming) {
        incoming.disposition((DeliveryState)AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE);
        incoming.settle();
        this.sendFlowIfNeeded();
    }

    protected void deliver(JmsInboundMessageDispatch envelope) throws Exception {
        ProviderListener listener = this.session.getProvider().getProviderListener();
        if (listener != null) {
            if (envelope.getMessage() != null) {
                LOG.debug("Dispatching received message: {}", (Object)envelope);
            } else {
                LOG.debug("Dispatching end of pull/browse to: {}", (Object)envelope.getConsumerId());
            }
            listener.onInboundMessage(envelope);
        } else {
            LOG.error("Provider listener is not set, message will be dropped: {}", (Object)envelope);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Message decodeIncomingMessage(Delivery incoming) {
        int count;
        while ((count = ((Receiver)this.getEndpoint()).recv(this.incomingBuffer.array(), this.incomingBuffer.writerIndex(), this.incomingBuffer.writableBytes())) > 0) {
            this.incomingBuffer.writerIndex(this.incomingBuffer.writerIndex() + count);
            if (this.incomingBuffer.isWritable()) continue;
            this.incomingBuffer.capacity((int)((double)this.incomingBuffer.capacity() * 1.5));
        }
        try {
            Message protonMessage = Message.Factory.create();
            protonMessage.decode(this.incomingBuffer.array(), 0, this.incomingBuffer.readableBytes());
            Message message = protonMessage;
            return message;
        }
        finally {
            this.incomingBuffer.clear();
        }
    }

    public void preCommit() {
    }

    public void preRollback() {
    }

    public void postCommit() {
    }

    public void postRollback() {
    }

    protected static final class ScheduledStopRequest
    implements AsyncResult {
        private final ScheduledFuture<?> sheduledStopTask;
        private final AsyncResult origRequest;

        public ScheduledStopRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) {
            this.sheduledStopTask = completionTask;
            this.origRequest = origRequest;
        }

        @Override
        public void onFailure(Throwable t) {
            this.sheduledStopTask.cancel(false);
            this.origRequest.onFailure(t);
        }

        @Override
        public void onSuccess() {
            boolean cancelled = this.sheduledStopTask.cancel(false);
            if (cancelled) {
                this.origRequest.onSuccess();
            }
        }

        @Override
        public boolean isComplete() {
            return this.origRequest.isComplete();
        }
    }
}

