/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.jms.provider.amqp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.ProviderConstants;
import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.provider.amqp.AmqpAbstractResource;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpJmsNoLocalType;
import org.apache.qpid.jms.provider.amqp.AmqpJmsSelectorType;
import org.apache.qpid.jms.provider.amqp.AmqpProvider;
import org.apache.qpid.jms.provider.amqp.AmqpSession;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageBuilder;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpConsumer
extends AmqpAbstractResource<JmsConsumerInfo, Receiver> {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
    protected static final Symbol COPY = Symbol.getSymbol((String)"copy");
    protected static final Symbol JMS_NO_LOCAL_SYMBOL = Symbol.valueOf((String)"no-local");
    protected static final Symbol JMS_SELECTOR_SYMBOL = Symbol.valueOf((String)"jms-selector");
    private static final int INITIAL_BUFFER_CAPACITY = 131072;
    private static final Modified MODIFIED_FAILED = new Modified();
    private static final Modified MODIFIED_UNDELIVERABLE = new Modified();
    protected final AmqpSession session;
    protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch, Delivery>();
    protected boolean presettle;
    private final ByteBuf incomingBuffer = Unpooled.buffer((int)131072);
    private final AtomicLong incomingSequence = new AtomicLong(0L);
    private AsyncResult stopRequest;
    private PullRequest pullRequest;

    public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) {
        super(info);
        this.session = session;
        ((JmsConsumerInfo)this.resource).getConsumerId().setProviderHint(this);
    }

    public void start(AsyncResult request) {
        ((Receiver)this.getEndpoint()).flow(((JmsConsumerInfo)this.resource).getPrefetchSize());
        request.onSuccess();
    }

    public void stop(AsyncResult request) {
        Receiver receiver = (Receiver)this.getEndpoint();
        if (receiver.getRemoteCredit() <= 0) {
            if (receiver.getQueued() == 0) {
                request.onSuccess();
            } else {
                this.stopRequest = request;
            }
        } else {
            this.stopRequest = request;
            receiver.drain(0);
        }
    }

    @Override
    public void processFlowUpdates(AmqpProvider provider) throws IOException {
        Receiver receiver;
        if (this.stopRequest != null && (receiver = (Receiver)this.getEndpoint()).getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
            this.stopRequest.onSuccess();
            this.stopRequest = null;
        }
        if (this.pullRequest != null && (receiver = (Receiver)this.getEndpoint()).getRemoteCredit() <= 0) {
            if (receiver.getQueued() <= 0) {
                this.pullRequest.onFailure(null);
            } else {
                this.pullRequest.onSuccess();
            }
            this.pullRequest = null;
        }
        LOG.trace("Consumer {} flow updated, remote credit = {}", (Object)this.getConsumerId(), (Object)((Receiver)this.getEndpoint()).getRemoteCredit());
        super.processFlowUpdates(provider);
    }

    @Override
    protected void doOpen() {
        JmsDestination destination = ((JmsConsumerInfo)this.resource).getDestination();
        String subscription = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, this.session.getConnection());
        Source source = new Source();
        source.setAddress(subscription);
        Target target = new Target();
        this.configureSource(source);
        String receiverName = "qpid-jms:receiver:" + this.getConsumerId() + ":" + subscription;
        if (((JmsConsumerInfo)this.resource).getSubscriptionName() != null && !((JmsConsumerInfo)this.resource).getSubscriptionName().isEmpty()) {
            receiverName = ((JmsConsumerInfo)this.resource).getSubscriptionName();
        }
        Receiver receiver = this.session.getProtonSession().receiver(receiverName);
        receiver.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        receiver.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
        if (this.isPresettle()) {
            receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
        } else {
            receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        }
        receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        this.setEndpoint(receiver);
        super.doOpen();
    }

    @Override
    protected void doOpenCompletion() {
        org.apache.qpid.proton.amqp.transport.Source source = ((Receiver)this.getEndpoint()).getRemoteSource();
        if (source != null) {
            super.doOpenCompletion();
        }
    }

    @Override
    protected Exception getOpenAbortException() {
        org.apache.qpid.proton.amqp.transport.Source source = ((Receiver)this.getEndpoint()).getRemoteSource();
        if (source != null) {
            return super.getOpenAbortException();
        }
        return new InvalidDestinationException("Link creation was refused");
    }

    @Override
    public void opened() {
        this.session.addResource(this);
        super.opened();
    }

    @Override
    public void closed() {
        this.session.removeResource(this);
        super.closed();
    }

    protected void configureSource(Source source) {
        HashMap<Symbol, Object> filters = new HashMap<Symbol, Object>();
        Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
        if (((JmsConsumerInfo)this.resource).getSubscriptionName() != null && !((JmsConsumerInfo)this.resource).getSubscriptionName().isEmpty()) {
            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
            source.setDurable(TerminusDurability.UNSETTLED_STATE);
            source.setDistributionMode(COPY);
        } else {
            source.setDurable(TerminusDurability.NONE);
            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
        }
        Symbol typeCapability = AmqpDestinationHelper.INSTANCE.toTypeCapability(((JmsConsumerInfo)this.resource).getDestination());
        if (typeCapability != null) {
            source.setCapabilities(new Symbol[]{typeCapability});
        }
        source.setOutcomes(outcomes);
        source.setDefaultOutcome((Outcome)MODIFIED_FAILED);
        if (((JmsConsumerInfo)this.resource).isNoLocal()) {
            filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL);
        }
        if (((JmsConsumerInfo)this.resource).getSelector() != null && !((JmsConsumerInfo)this.resource).getSelector().trim().equals("")) {
            filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(((JmsConsumerInfo)this.resource).getSelector()));
        }
        if (!filters.isEmpty()) {
            source.setFilter(filters);
        }
    }

    public void acknowledge() {
        LOG.trace("Session Acknowledge for consumer: {}", (Object)((JmsConsumerInfo)this.resource).getConsumerId());
        for (Delivery delivery : this.delivered.values()) {
            delivery.disposition((DeliveryState)Accepted.getInstance());
            delivery.settle();
        }
        this.delivered.clear();
    }

    public void acknowledge(JmsInboundMessageDispatch envelope, ProviderConstants.ACK_TYPE ackType) throws JMSException {
        Delivery delivery = null;
        if (envelope.getProviderHint() instanceof Delivery) {
            delivery = (Delivery)envelope.getProviderHint();
        } else {
            delivery = this.delivered.get(envelope);
            if (delivery == null) {
                LOG.warn("Received Ack for unknown message: {}", (Object)envelope);
                return;
            }
        }
        if (ackType.equals((Object)ProviderConstants.ACK_TYPE.DELIVERED)) {
            LOG.debug("Delivered Ack of message: {}", (Object)envelope);
            if (!this.isPresettle()) {
                this.delivered.put(envelope, delivery);
            }
            this.setDefaultDeliveryState(delivery, (DeliveryState)MODIFIED_FAILED);
            this.sendFlowIfNeeded();
        } else if (ackType.equals((Object)ProviderConstants.ACK_TYPE.CONSUMED)) {
            if (this.isPresettle() || this.delivered.remove(envelope) == null) {
                this.sendFlowIfNeeded();
            }
            LOG.debug("Consumed Ack of message: {}", (Object)envelope);
            if (!delivery.isSettled()) {
                if (this.session.isTransacted() && !this.isBrowser()) {
                    Binary txnId = this.session.getTransactionContext().getAmqpTransactionId();
                    if (txnId != null) {
                        TransactionalState txState = new TransactionalState();
                        txState.setOutcome((Outcome)Accepted.getInstance());
                        txState.setTxnId(txnId);
                        delivery.disposition((DeliveryState)txState);
                        delivery.settle();
                        this.session.getTransactionContext().registerTxConsumer(this);
                    }
                } else {
                    delivery.disposition((DeliveryState)Accepted.getInstance());
                    delivery.settle();
                }
            }
        } else if (ackType.equals((Object)ProviderConstants.ACK_TYPE.POISONED)) {
            this.deliveryFailed(delivery);
        } else if (ackType.equals((Object)ProviderConstants.ACK_TYPE.EXPIRED)) {
            this.deliveryFailed(delivery);
        } else if (ackType.equals((Object)ProviderConstants.ACK_TYPE.RELEASED)) {
            delivery.disposition((DeliveryState)Released.getInstance());
            delivery.settle();
        } else {
            LOG.warn("Unsupported Ack Type for message: {}", (Object)envelope);
        }
    }

    private void sendFlowIfNeeded() {
        if (((JmsConsumerInfo)this.resource).getPrefetchSize() == 0) {
            return;
        }
        int currentCredit = ((Receiver)this.getEndpoint()).getCredit();
        if ((double)currentCredit <= (double)((JmsConsumerInfo)this.resource).getPrefetchSize() * 0.2) {
            ((Receiver)this.getEndpoint()).flow(((JmsConsumerInfo)this.resource).getPrefetchSize() - currentCredit);
        }
    }

    public void recover() throws Exception {
        LOG.debug("Session Recover for consumer: {}", (Object)((JmsConsumerInfo)this.resource).getConsumerId());
        Set<JmsInboundMessageDispatch> values = this.delivered.keySet();
        ArrayList<JmsInboundMessageDispatch> envelopes = new ArrayList<JmsInboundMessageDispatch>(values);
        ListIterator<JmsInboundMessageDispatch> reverseIterator = envelopes.listIterator(values.size());
        while (reverseIterator.hasPrevious()) {
            JmsInboundMessageDispatch envelope = reverseIterator.previous();
            envelope.getMessage().getFacade().setRedeliveryCount(envelope.getMessage().getFacade().getRedeliveryCount() + 1);
            envelope.setEnqueueFirst(true);
            this.deliver(envelope);
        }
        this.delivered.clear();
    }

    public void pull(long timeout) {
        LOG.trace("Pull called on consumer {} with timeout = {}", (Object)this.getConsumerId(), (Object)timeout);
        if (((JmsConsumerInfo)this.resource).getPrefetchSize() == 0 && ((Receiver)this.getEndpoint()).getCredit() == 0) {
            if (timeout < 0L) {
                ((Receiver)this.getEndpoint()).flow(1);
            } else if (timeout == 0L) {
                this.pullRequest = new PullRequest();
                ((Receiver)this.getEndpoint()).drain(1);
            } else if (timeout > 0L) {
                ScheduledFuture<?> future = this.getSession().schedule(new Runnable(){

                    @Override
                    public void run() {
                        if (((Receiver)AmqpConsumer.this.getEndpoint()).getRemoteCredit() != 0) {
                            ((Receiver)AmqpConsumer.this.getEndpoint()).drain(0);
                            AmqpConsumer.this.session.getProvider().pumpToProtonTransport();
                        }
                    }
                }, timeout);
                ((Receiver)this.getEndpoint()).flow(1);
                this.pullRequest = new TimedPullRequest(future);
            }
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
        Delivery incoming = null;
        do {
            if ((incoming = ((Receiver)this.getEndpoint()).current()) != null) {
                if (incoming.isReadable() && !incoming.isPartial()) {
                    LOG.trace("{} has incoming Message(s).", (Object)this);
                    if (this.pullRequest != null) {
                        this.pullRequest.onSuccess();
                        this.pullRequest = null;
                    }
                    try {
                        this.processDelivery(incoming);
                    }
                    catch (Exception e) {
                        throw IOExceptionSupport.create(e);
                    }
                } else {
                    LOG.trace("{} has a partial incoming Message(s), deferring.", (Object)this);
                    incoming = null;
                    continue;
                }
            }
            if (((Receiver)this.getEndpoint()).getRemoteCredit() > 0) continue;
            if (this.stopRequest != null) {
                this.stopRequest.onSuccess();
                this.stopRequest = null;
            }
            if (this.pullRequest == null) continue;
            this.pullRequest.onFailure(null);
            this.pullRequest = null;
        } while (incoming != null);
        super.processDeliveryUpdates(provider);
    }

    private void processDelivery(Delivery incoming) throws Exception {
        this.setDefaultDeliveryState(incoming, (DeliveryState)Released.getInstance());
        Message amqpMessage = this.decodeIncomingMessage(incoming);
        JmsMessage message = null;
        try {
            message = AmqpJmsMessageBuilder.createJmsMessage(this, amqpMessage);
        }
        catch (Exception e) {
            LOG.warn("Error on transform: {}", (Object)e.getMessage());
            this.deliveryFailed(incoming);
            return;
        }
        ((Receiver)this.getEndpoint()).advance();
        message.onDispatch();
        JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(this.getNextIncomingSequenceNumber());
        envelope.setMessage(message);
        envelope.setConsumerId(((JmsConsumerInfo)this.resource).getConsumerId());
        envelope.setProviderHint(incoming);
        envelope.setMessageId(message.getFacade().getProviderMessageIdObject());
        incoming.setContext((Object)envelope);
        this.deliver(envelope);
    }

    private void setDefaultDeliveryState(Delivery incoming, DeliveryState state) {
        try {
            Method m = incoming.getClass().getMethod("setDefaultDeliveryState", DeliveryState.class);
            m.invoke((Object)incoming, state);
        }
        catch (Exception e) {
            LOG.trace("Exception while setting defaultDeliveryState", (Throwable)e);
        }
    }

    protected long getNextIncomingSequenceNumber() {
        return this.incomingSequence.incrementAndGet();
    }

    @Override
    protected void doClose() {
        if (((JmsConsumerInfo)this.resource).isDurable()) {
            ((Receiver)this.getEndpoint()).detach();
        } else {
            ((Receiver)this.getEndpoint()).close();
        }
    }

    public AmqpConnection getConnection() {
        return this.session.getConnection();
    }

    public AmqpSession getSession() {
        return this.session;
    }

    public JmsConsumerId getConsumerId() {
        return ((JmsConsumerInfo)this.resource).getConsumerId();
    }

    public JmsDestination getDestination() {
        return ((JmsConsumerInfo)this.resource).getDestination();
    }

    public Receiver getProtonReceiver() {
        return (Receiver)this.getEndpoint();
    }

    public boolean isBrowser() {
        return false;
    }

    public boolean isPresettle() {
        return this.presettle;
    }

    public void setPresettle(boolean presettle) {
        this.presettle = presettle;
    }

    public String toString() {
        return "AmqpConsumer { " + ((JmsConsumerInfo)this.resource).getConsumerId() + " }";
    }

    protected void deliveryFailed(Delivery incoming) {
        incoming.disposition((DeliveryState)MODIFIED_UNDELIVERABLE);
        incoming.settle();
        this.sendFlowIfNeeded();
    }

    protected void deliver(JmsInboundMessageDispatch envelope) throws Exception {
        ProviderListener listener = this.session.getProvider().getProviderListener();
        if (listener != null) {
            if (envelope.getMessage() != null) {
                LOG.debug("Dispatching received message: {}", (Object)envelope);
            } else {
                LOG.debug("Dispatching end of pull/browse to: {}", (Object)envelope.getConsumerId());
            }
            listener.onInboundMessage(envelope);
        } else {
            LOG.error("Provider listener is not set, message will be dropped: {}", (Object)envelope);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Message decodeIncomingMessage(Delivery incoming) {
        int count;
        while ((count = ((Receiver)this.getEndpoint()).recv(this.incomingBuffer.array(), this.incomingBuffer.writerIndex(), this.incomingBuffer.writableBytes())) > 0) {
            this.incomingBuffer.writerIndex(this.incomingBuffer.writerIndex() + count);
            if (this.incomingBuffer.isWritable()) continue;
            this.incomingBuffer.capacity((int)((double)this.incomingBuffer.capacity() * 1.5));
        }
        try {
            Message protonMessage = Message.Factory.create();
            protonMessage.decode(this.incomingBuffer.array(), 0, this.incomingBuffer.readableBytes());
            Message message = protonMessage;
            return message;
        }
        finally {
            this.incomingBuffer.clear();
        }
    }

    public void preCommit() {
    }

    public void preRollback() {
    }

    public void postCommit() throws Exception {
    }

    public void postRollback() throws Exception {
    }

    static {
        MODIFIED_FAILED.setDeliveryFailed(Boolean.valueOf(true));
        MODIFIED_UNDELIVERABLE.setDeliveryFailed(Boolean.valueOf(true));
        MODIFIED_UNDELIVERABLE.setUndeliverableHere(Boolean.valueOf(true));
    }

    private class TimedPullRequest
    extends PullRequest {
        private final ScheduledFuture<?> completionTask;

        public TimedPullRequest(ScheduledFuture<?> completionTask) {
            this.completionTask = completionTask;
        }

        @Override
        public void onSuccess() {
            this.completionTask.cancel(false);
        }
    }

    private class PullRequest
    implements AsyncResult {
        private PullRequest() {
        }

        @Override
        public void onFailure(Throwable result) {
            JmsInboundMessageDispatch pullDone = new JmsInboundMessageDispatch(AmqpConsumer.this.getNextIncomingSequenceNumber());
            pullDone.setConsumerId(AmqpConsumer.this.getConsumerId());
            try {
                AmqpConsumer.this.deliver(pullDone);
            }
            catch (Exception e) {
                AmqpConsumer.this.getSession().reportError(IOExceptionSupport.create(e));
            }
        }

        @Override
        public void onSuccess() {
        }

        @Override
        public boolean isComplete() {
            return false;
        }
    }
}

