/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.jms.provider.amqp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.Principal;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import org.apache.qpid.jms.JmsTemporaryDestination;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessageFactory;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsConnectionInfo;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.meta.JmsDefaultResourceVisitor;
import org.apache.qpid.jms.meta.JmsProducerId;
import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.meta.JmsResource;
import org.apache.qpid.jms.meta.JmsResourceVistor;
import org.apache.qpid.jms.meta.JmsSessionId;
import org.apache.qpid.jms.meta.JmsSessionInfo;
import org.apache.qpid.jms.meta.JmsTransactionInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.NoOpAsyncResult;
import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderClosedException;
import org.apache.qpid.jms.provider.ProviderConstants;
import org.apache.qpid.jms.provider.ProviderFuture;
import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.jms.provider.amqp.AmqpEventSink;
import org.apache.qpid.jms.provider.amqp.AmqpFixedProducer;
import org.apache.qpid.jms.provider.amqp.AmqpProducer;
import org.apache.qpid.jms.provider.amqp.AmqpResource;
import org.apache.qpid.jms.provider.amqp.AmqpResourceParent;
import org.apache.qpid.jms.provider.amqp.AmqpSaslAuthenticator;
import org.apache.qpid.jms.provider.amqp.AmqpSession;
import org.apache.qpid.jms.provider.amqp.AmqpTemporaryDestination;
import org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder;
import org.apache.qpid.jms.transports.SSLTransport;
import org.apache.qpid.jms.transports.TransportFactory;
import org.apache.qpid.jms.transports.TransportListener;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.CollectorImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.framing.TransportFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpProvider
implements Provider,
TransportListener,
AmqpResourceParent {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpProvider.class);
    private static final Logger TRACE_BYTES = LoggerFactory.getLogger((String)(AmqpConnection.class.getPackage().getName() + ".BYTES"));
    private static final Logger TRACE_FRAMES = LoggerFactory.getLogger((String)(AmqpConnection.class.getPackage().getName() + ".FRAMES"));
    private static final int DEFAULT_MAX_FRAME_SIZE = 0x100000;
    private static final int DEFAULT_CHANNEL_MAX = Short.MAX_VALUE;
    private static final AtomicInteger PROVIDER_SEQUENCE = new AtomicInteger();
    private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
    private ProviderListener listener;
    private AmqpConnection connection;
    private AmqpSaslAuthenticator authenticator;
    private org.apache.qpid.jms.transports.Transport transport;
    private String transportType = "tcp";
    private String vhost;
    private boolean traceFrames;
    private boolean traceBytes;
    private boolean saslLayer = true;
    private String[] saslMechanisms;
    private boolean presettleConsumers;
    private boolean presettleProducers;
    private long connectTimeout = 15000L;
    private long closeTimeout = 15000L;
    private int channelMax = Short.MAX_VALUE;
    private int idleTimeout = 60000;
    private long sessionOutoingWindow = -1L;
    private int maxFrameSize = 0x100000;
    private final URI remoteURI;
    private final AtomicBoolean closed = new AtomicBoolean();
    private final ScheduledExecutorService serializer;
    private final Transport protonTransport = Transport.Factory.create();
    private final Collector protonCollector = new CollectorImpl();
    private final Connection protonConnection = Connection.Factory.create();
    private AsyncResult connectionOpenRequest;
    private ScheduledFuture<?> nextIdleTimeoutCheck;

    public AmqpProvider(URI remoteURI) {
        this.remoteURI = remoteURI;
        this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable runner) {
                Thread serial = new Thread(runner);
                serial.setDaemon(true);
                serial.setName(AmqpProvider.this.getClass().getSimpleName() + ":(" + PROVIDER_SEQUENCE.incrementAndGet() + "):[" + AmqpProvider.this.getRemoteURI() + "]");
                return serial;
            }
        });
        this.updateTracer();
    }

    @Override
    public void connect() throws IOException {
        this.checkClosed();
        try {
            this.transport = TransportFactory.create(this.getTransportType(), this.getRemoteURI());
        }
        catch (Exception e) {
            throw IOExceptionSupport.create(e);
        }
        this.transport.setTransportListener(this);
        this.transport.connect();
    }

    @Override
    public void start() throws IOException, IllegalStateException {
        this.checkClosed();
        if (this.listener == null) {
            throw new IllegalStateException("No ProviderListener registered.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            final ProviderFuture request = new ProviderFuture(){

                @Override
                public void onFailure(Throwable result) {
                    this.onSuccess();
                }
            };
            this.serializer.execute(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        if (AmqpProvider.this.transport == null || !AmqpProvider.this.transport.isConnected()) {
                            request.onSuccess();
                        }
                        if (AmqpProvider.this.connection != null) {
                            AmqpProvider.this.connection.close(request);
                            AmqpProvider.this.pumpToProtonTransport(request);
                        } else {
                            request.onSuccess();
                        }
                    }
                    catch (Exception e) {
                        LOG.debug("Caught exception while closing proton connection: {}", (Object)e.getMessage());
                    }
                    finally {
                        if (AmqpProvider.this.nextIdleTimeoutCheck != null) {
                            LOG.trace("Cancelling scheduled IdleTimeoutCheck");
                            AmqpProvider.this.nextIdleTimeoutCheck.cancel(false);
                            AmqpProvider.this.nextIdleTimeoutCheck = null;
                        }
                    }
                }
            });
            try {
                if (this.closeTimeout < 0L) {
                    request.sync();
                } else {
                    request.sync(this.closeTimeout, TimeUnit.MILLISECONDS);
                }
            }
            catch (IOException e) {
                LOG.warn("Error caught while closing Provider: ", (Object)e.getMessage());
            }
            finally {
                if (this.transport != null) {
                    try {
                        this.transport.close();
                    }
                    catch (Exception e) {
                        LOG.debug("Caught exception while closing down Transport: {}", (Object)e.getMessage());
                    }
                }
                this.serializer.shutdown();
            }
        }
    }

    @Override
    public void create(final JmsResource resource, final AsyncResult request) throws IOException, JMSException {
        this.checkClosed();
        this.serializer.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    AmqpProvider.this.checkClosed();
                    resource.visit(new JmsResourceVistor(){

                        @Override
                        public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
                            AmqpProvider.this.connection.createSession(sessionInfo, request);
                        }

                        @Override
                        public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception {
                            AmqpSession session = AmqpProvider.this.connection.getSession(producerInfo.getParentId());
                            session.createProducer(producerInfo, request);
                        }

                        @Override
                        public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
                            AmqpSession session = AmqpProvider.this.connection.getSession(consumerInfo.getParentId());
                            session.createConsumer(consumerInfo, request);
                        }

                        @Override
                        public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
                            AmqpProvider.this.closeTimeout = connectionInfo.getCloseTimeout();
                            AmqpProvider.this.connectTimeout = connectionInfo.getConnectTimeout();
                            AmqpProvider.this.protonTransport.setEmitFlowEventOnSend(false);
                            if (AmqpProvider.this.getMaxFrameSize() > 0) {
                                AmqpProvider.this.protonTransport.setMaxFrameSize(AmqpProvider.this.getMaxFrameSize());
                            }
                            AmqpProvider.this.protonTransport.setChannelMax(AmqpProvider.this.getChannelMax());
                            AmqpProvider.this.protonTransport.setIdleTimeout(AmqpProvider.this.idleTimeout);
                            AmqpProvider.this.protonTransport.bind(AmqpProvider.this.protonConnection);
                            AmqpProvider.this.protonConnection.collect(AmqpProvider.this.protonCollector);
                            if (AmqpProvider.this.saslLayer) {
                                Sasl sasl = AmqpProvider.this.protonTransport.sasl();
                                sasl.client();
                                String hostname = AmqpProvider.this.getVhost();
                                if (hostname == null) {
                                    hostname = AmqpProvider.this.remoteURI.getHost();
                                } else if (hostname.isEmpty()) {
                                    hostname = null;
                                }
                                sasl.setRemoteHostname(hostname);
                                AmqpProvider.this.authenticator = new AmqpSaslAuthenticator(sasl, connectionInfo, AmqpProvider.this.getLocalPrincipal(), AmqpProvider.this.saslMechanisms);
                            }
                            AmqpConnectionBuilder builder = new AmqpConnectionBuilder(AmqpProvider.this, connectionInfo);
                            AsyncResult wrappedOpenRequest = new AsyncResult(){

                                @Override
                                public void onSuccess() {
                                    AmqpProvider.this.fireConnectionEstablished();
                                    request.onSuccess();
                                }

                                @Override
                                public void onFailure(Throwable result) {
                                    request.onFailure(result);
                                }

                                @Override
                                public boolean isComplete() {
                                    return request.isComplete();
                                }
                            };
                            AmqpProvider.this.connectionOpenRequest = wrappedOpenRequest;
                            builder.buildResource(wrappedOpenRequest);
                        }

                        @Override
                        public void processDestination(JmsTemporaryDestination destination) throws Exception {
                            if (destination.isTemporary()) {
                                AmqpProvider.this.connection.createTemporaryDestination(destination, request);
                            } else {
                                request.onSuccess();
                            }
                        }

                        @Override
                        public void processTransactionInfo(JmsTransactionInfo transactionInfo) throws Exception {
                            AmqpSession session = AmqpProvider.this.connection.getSession(transactionInfo.getSessionId());
                            session.begin(transactionInfo.getId(), request);
                        }
                    });
                    AmqpProvider.this.pumpToProtonTransport(request);
                }
                catch (Exception error) {
                    request.onFailure(error);
                }
            }
        });
    }

    @Override
    public void start(final JmsResource resource, final AsyncResult request) throws IOException {
        this.checkClosed();
        this.serializer.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    AmqpProvider.this.checkClosed();
                    resource.visit(new JmsDefaultResourceVisitor(){

                        @Override
                        public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
                            AmqpSession session = AmqpProvider.this.connection.getSession(consumerInfo.getParentId());
                            AmqpConsumer consumer = session.getConsumer(consumerInfo);
                            consumer.start(request);
                        }
                    });
                    AmqpProvider.this.pumpToProtonTransport(request);
                }
                catch (Exception error) {
                    request.onFailure(error);
                }
            }
        });
    }

    @Override
    public void stop(final JmsResource resource, final AsyncResult request) throws IOException {
        this.checkClosed();
        this.serializer.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    AmqpProvider.this.checkClosed();
                    resource.visit(new JmsDefaultResourceVisitor(){

                        @Override
                        public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
                            AmqpSession session = AmqpProvider.this.connection.getSession(consumerInfo.getParentId());
                            AmqpConsumer consumer = session.getConsumer(consumerInfo);
                            consumer.stop(request);
                        }
                    });
                    AmqpProvider.this.pumpToProtonTransport(request);
                }
                catch (Exception error) {
                    request.onFailure(error);
                }
            }
        });
    }

    @Override
    public void destroy(final JmsResource resource, final AsyncResult request) throws IOException {
        this.checkClosed();
        this.serializer.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    AmqpProvider.this.checkClosed();
                    resource.visit(new JmsDefaultResourceVisitor(){

                        @Override
                        public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
                            AmqpSession session = AmqpProvider.this.connection.getSession(sessionInfo.getId());
                            session.close(request);
                        }

                        @Override
                        public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception {
                            AmqpSession session = AmqpProvider.this.connection.getSession(producerInfo.getParentId());
                            AmqpProducer producer = session.getProducer(producerInfo);
                            producer.close(request);
                        }

                        @Override
                        public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
                            AmqpSession session = AmqpProvider.this.connection.getSession(consumerInfo.getParentId());
                            AmqpConsumer consumer = session.getConsumer(consumerInfo);
                            consumer.close(request);
                        }

                        @Override
                        public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
                            AmqpProvider.this.connection.close(request);
                        }

                        @Override
                        public void processDestination(JmsTemporaryDestination destination) throws Exception {
                            AmqpTemporaryDestination temporary = AmqpProvider.this.connection.getTemporaryDestination(destination);
                            if (temporary != null) {
                                temporary.close(request);
                            } else {
                                LOG.debug("Could not find temporary destination {} to delete.", (Object)destination);
                                request.onSuccess();
                            }
                        }
                    });
                    AmqpProvider.this.pumpToProtonTransport(request);
                }
                catch (Exception error) {
                    request.onFailure(error);
                }
            }
        });
    }

    @Override
    public void send(final JmsOutboundMessageDispatch envelope, final AsyncResult request) throws IOException {
        this.checkClosed();
        this.serializer.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    AmqpProvider.this.checkClosed();
                    JmsProducerId producerId = envelope.getProducerId();
                    AmqpProducer producer = null;
                    if (producerId.getProviderHint() instanceof AmqpFixedProducer) {
                        producer = (AmqpFixedProducer)producerId.getProviderHint();
                    } else {
                        AmqpSession session = AmqpProvider.this.connection.getSession(producerId.getParentId());
                        producer = session.getProducer(producerId);
                    }
                    boolean couldSend = ((AmqpProducer)producer).send(envelope, request);
                    AmqpProvider.this.pumpToProtonTransport(request);
                    if (couldSend && envelope.isSendAsync()) {
                        request.onSuccess();
                    }
                }
                catch (Exception error) {
                    request.onFailure(error);
                }
            }
        });
    }

    @Override
    public void acknowledge(final JmsSessionId sessionId, final ProviderConstants.ACK_TYPE ackType, final AsyncResult request) throws IOException {
        this.checkClosed();
        this.serializer.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    AmqpProvider.this.checkClosed();
                    AmqpSession amqpSession = AmqpProvider.this.connection.getSession(sessionId);
                    amqpSession.acknowledge(ackType);
                    AmqpProvider.this.pumpToProtonTransport(request);
                    request.onSuccess();
                }
                catch (Exception error) {
                    request.onFailure(error);
                }
            }
        });
    }

    @Override
    public void acknowledge(final JmsInboundMessageDispatch envelope, final ProviderConstants.ACK_TYPE ackType, final AsyncResult request) throws IOException {
        this.checkClosed();
        this.serializer.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    AmqpProvider.this.checkClosed();
                    JmsConsumerId consumerId = envelope.getConsumerId();
                    AmqpConsumer consumer = null;
                    if (consumerId.getProviderHint() instanceof AmqpConsumer) {
                        consumer = (AmqpConsumer)consumerId.getProviderHint();
                    } else {
                        AmqpSession session = AmqpProvider.this.connection.getSession(consumerId.getParentId());
                        consumer = session.getConsumer(consumerId);
                    }
                    consumer.acknowledge(envelope, ackType);
                    if (consumer.getSession().isAsyncAck()) {
                        request.onSuccess();
                        AmqpProvider.this.pumpToProtonTransport(request);
                    } else {
                        AmqpProvider.this.pumpToProtonTransport(request);
                        request.onSuccess();
                    }
                }
                catch (Exception error) {
                    request.onFailure(error);
                }
            }
        });
    }

    @Override
    public void commit(final JmsTransactionInfo transactionInfo, final AsyncResult request) throws IOException {
        this.checkClosed();
        this.serializer.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    AmqpProvider.this.checkClosed();
                    AmqpSession session = AmqpProvider.this.connection.getSession(transactionInfo.getSessionId());
                    session.commit(transactionInfo, request);
                    AmqpProvider.this.pumpToProtonTransport(request);
                }
                catch (Exception error) {
                    request.onFailure(error);
                }
            }
        });
    }

    @Override
    public void rollback(final JmsTransactionInfo transactionInfo, final AsyncResult request) throws IOException {
        this.checkClosed();
        this.serializer.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    AmqpProvider.this.checkClosed();
                    AmqpSession session = AmqpProvider.this.connection.getSession(transactionInfo.getSessionId());
                    session.rollback(transactionInfo, request);
                    AmqpProvider.this.pumpToProtonTransport(request);
                }
                catch (Exception error) {
                    request.onFailure(error);
                }
            }
        });
    }

    @Override
    public void recover(final JmsSessionId sessionId, final AsyncResult request) throws IOException {
        this.checkClosed();
        this.serializer.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    AmqpProvider.this.checkClosed();
                    AmqpSession session = AmqpProvider.this.connection.getSession(sessionId);
                    session.recover();
                    AmqpProvider.this.pumpToProtonTransport(request);
                    request.onSuccess();
                }
                catch (Exception error) {
                    request.onFailure(error);
                }
            }
        });
    }

    @Override
    public void unsubscribe(final String subscription, final AsyncResult request) throws IOException {
        this.checkClosed();
        this.serializer.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    AmqpProvider.this.checkClosed();
                    AmqpProvider.this.connection.unsubscribe(subscription, request);
                    AmqpProvider.this.pumpToProtonTransport(request);
                }
                catch (Exception error) {
                    request.onFailure(error);
                }
            }
        });
    }

    @Override
    public void pull(final JmsConsumerId consumerId, final long timeout, final AsyncResult request) throws IOException {
        this.checkClosed();
        this.serializer.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    AmqpProvider.this.checkClosed();
                    AmqpConsumer consumer = null;
                    if (consumerId.getProviderHint() instanceof AmqpConsumer) {
                        consumer = (AmqpConsumer)consumerId.getProviderHint();
                    } else {
                        AmqpSession session = AmqpProvider.this.connection.getSession(consumerId.getParentId());
                        consumer = session.getConsumer(consumerId);
                    }
                    consumer.pull(timeout, request);
                    AmqpProvider.this.pumpToProtonTransport(request);
                }
                catch (Exception error) {
                    request.onFailure(error);
                }
            }
        });
    }

    private void updateTracer() {
        if (this.isTraceFrames()) {
            ((TransportImpl)this.protonTransport).setProtocolTracer(new ProtocolTracer(){

                public void receivedFrame(TransportFrame transportFrame) {
                    TRACE_FRAMES.trace("RECV: {}", (Object)transportFrame.getBody());
                }

                public void sentFrame(TransportFrame transportFrame) {
                    TRACE_FRAMES.trace("SENT: {}", (Object)transportFrame.getBody());
                }
            });
        }
    }

    @Override
    public void onData(final ByteBuf input) {
        ReferenceCountUtil.retain((Object)input);
        this.serializer.execute(new Runnable(){

            @Override
            public void run() {
                if (AmqpProvider.this.isTraceBytes()) {
                    TRACE_BYTES.info("Received: {}", (Object)ByteBufUtil.hexDump((ByteBuf)input));
                }
                ByteBuffer source = input.nioBuffer();
                do {
                    ByteBuffer buffer = AmqpProvider.this.protonTransport.getInputBuffer();
                    int limit = Math.min(buffer.remaining(), source.remaining());
                    ByteBuffer duplicate = source.duplicate();
                    duplicate.limit(source.position() + limit);
                    buffer.put(duplicate);
                    AmqpProvider.this.protonTransport.processInput();
                    source.position(source.position() + limit);
                } while (source.hasRemaining());
                ReferenceCountUtil.release((Object)input);
                AmqpProvider.this.processUpdates();
                AmqpProvider.this.pumpToProtonTransport();
            }
        });
    }

    @Override
    public void onTransportError(final Throwable error) {
        if (!this.serializer.isShutdown()) {
            this.serializer.execute(new Runnable(){

                @Override
                public void run() {
                    LOG.info("Transport failed: {}", (Object)error.getMessage());
                    if (!AmqpProvider.this.closed.get()) {
                        AmqpProvider.this.protonTransport.close_head();
                        AmqpProvider.this.fireProviderException(error);
                        if (AmqpProvider.this.connection != null) {
                            AmqpProvider.this.connection.resourceClosed();
                        }
                    }
                }
            });
        }
    }

    @Override
    public void onTransportClosed() {
        if (!this.serializer.isShutdown()) {
            this.serializer.execute(new Runnable(){

                @Override
                public void run() {
                    LOG.debug("Transport connection remotely closed");
                    if (!AmqpProvider.this.closed.get()) {
                        AmqpProvider.this.protonTransport.close_head();
                        AmqpProvider.this.fireProviderException(new IOException("Transport connection remotely closed."));
                        if (AmqpProvider.this.connection != null) {
                            AmqpProvider.this.connection.resourceClosed();
                        }
                    }
                }
            });
        }
    }

    private void processUpdates() {
        try {
            Event protonEvent = null;
            while ((protonEvent = this.protonCollector.peek()) != null) {
                if (!protonEvent.getType().equals((Object)Event.Type.TRANSPORT)) {
                    LOG.trace("New Proton Event: {}", (Object)protonEvent.getType());
                }
                AmqpEventSink amqpEventSink = null;
                switch (protonEvent.getType()) {
                    case CONNECTION_REMOTE_CLOSE: {
                        amqpEventSink = (AmqpEventSink)protonEvent.getConnection().getContext();
                        amqpEventSink.processRemoteClose(this);
                        break;
                    }
                    case CONNECTION_REMOTE_OPEN: {
                        amqpEventSink = (AmqpEventSink)protonEvent.getConnection().getContext();
                        amqpEventSink.processRemoteOpen(this);
                        break;
                    }
                    case SESSION_REMOTE_CLOSE: {
                        amqpEventSink = (AmqpEventSink)protonEvent.getSession().getContext();
                        amqpEventSink.processRemoteClose(this);
                        break;
                    }
                    case SESSION_REMOTE_OPEN: {
                        amqpEventSink = (AmqpEventSink)protonEvent.getSession().getContext();
                        amqpEventSink.processRemoteOpen(this);
                        break;
                    }
                    case LINK_REMOTE_CLOSE: {
                        amqpEventSink = (AmqpEventSink)protonEvent.getLink().getContext();
                        amqpEventSink.processRemoteClose(this);
                        break;
                    }
                    case LINK_REMOTE_DETACH: {
                        amqpEventSink = (AmqpEventSink)protonEvent.getLink().getContext();
                        amqpEventSink.processRemoteDetach(this);
                        break;
                    }
                    case LINK_REMOTE_OPEN: {
                        amqpEventSink = (AmqpEventSink)protonEvent.getLink().getContext();
                        amqpEventSink.processRemoteOpen(this);
                        break;
                    }
                    case LINK_FLOW: {
                        amqpEventSink = (AmqpEventSink)protonEvent.getLink().getContext();
                        amqpEventSink.processFlowUpdates(this);
                        break;
                    }
                    case DELIVERY: {
                        amqpEventSink = (AmqpEventSink)protonEvent.getLink().getContext();
                        amqpEventSink.processDeliveryUpdates(this);
                        break;
                    }
                }
                this.protonCollector.pop();
            }
            this.processSaslAuthentication();
        }
        catch (Exception ex) {
            LOG.warn("Caught Exception during update processing: {}", (Object)ex.getMessage(), (Object)ex);
            this.fireProviderException(ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processSaslAuthentication() {
        if (this.authenticator == null) {
            return;
        }
        try {
            if (this.authenticator.authenticate()) {
                this.authenticator = null;
            }
        }
        catch (JMSSecurityException ex) {
            try {
                Transport t = this.protonConnection.getTransport();
                t.close_head();
            }
            finally {
                this.fireProviderException(ex);
            }
        }
    }

    protected boolean pumpToProtonTransport() {
        return this.pumpToProtonTransport(NOOP_REQUEST);
    }

    protected boolean pumpToProtonTransport(AsyncResult request) {
        try {
            boolean done = false;
            while (!done) {
                ByteBuffer toWrite = this.protonTransport.getOutputBuffer();
                if (toWrite != null && toWrite.hasRemaining()) {
                    ByteBuf outbound = this.transport.allocateSendBuffer(toWrite.remaining());
                    outbound.writeBytes(toWrite);
                    if (this.isTraceBytes()) {
                        TRACE_BYTES.info("Sending: {}", (Object)ByteBufUtil.hexDump((ByteBuf)outbound));
                    }
                    this.transport.send(outbound);
                    this.protonTransport.outputConsumed();
                    continue;
                }
                done = true;
            }
        }
        catch (IOException e) {
            this.fireProviderException(e);
            request.onFailure(e);
            return false;
        }
        return true;
    }

    void fireConnectionEstablished() {
        ProviderListener listener;
        this.connectionOpenRequest = null;
        long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        long deadline = this.protonTransport.tick(now);
        if (deadline > 0L) {
            long delay = deadline - now;
            LOG.trace("IdleTimeoutCheck being initiated, initial delay: {}", (Object)delay);
            this.nextIdleTimeoutCheck = this.serializer.schedule(new IdleTimeoutCheck(), delay, TimeUnit.MILLISECONDS);
        }
        if ((listener = this.listener) != null) {
            listener.onConnectionEstablished(this.remoteURI);
        }
    }

    void fireNonFatalProviderException(Exception ex) {
        ProviderListener listener = this.listener;
        if (listener != null) {
            listener.onProviderException(ex);
        }
    }

    void fireProviderException(Throwable ex) {
        ProviderListener listener;
        if (this.connectionOpenRequest != null) {
            this.connectionOpenRequest.onFailure(ex);
            this.connectionOpenRequest = null;
        }
        if ((listener = this.listener) != null) {
            listener.onConnectionFailure(IOExceptionSupport.create(ex));
        }
    }

    void fireResourceRemotelyClosed(JmsResource resource, Exception ex) {
        ProviderListener listener = this.listener;
        if (listener != null) {
            listener.onResourceRemotelyClosed(resource, ex);
        }
    }

    @Override
    public void addChildResource(AmqpResource resource) {
        if (resource instanceof AmqpConnection) {
            this.connection = (AmqpConnection)resource;
        }
    }

    @Override
    public void removeChildResource(AmqpResource resource) {
    }

    @Override
    public JmsMessageFactory getMessageFactory() {
        if (this.connection == null) {
            throw new RuntimeException("Message Factory is not accessible when not connected.");
        }
        return this.connection.getAmqpMessageFactory();
    }

    public void setTraceFrames(boolean trace) {
        this.traceFrames = trace;
        this.updateTracer();
    }

    public boolean isTraceFrames() {
        return this.traceFrames;
    }

    public void setTraceBytes(boolean trace) {
        this.traceBytes = trace;
    }

    public boolean isTraceBytes() {
        return this.traceBytes;
    }

    public boolean isSaslLayer() {
        return this.saslLayer;
    }

    public void setSaslLayer(boolean saslLayer) {
        this.saslLayer = saslLayer;
    }

    public String[] getSaslMechanisms() {
        return this.saslMechanisms;
    }

    public void setSaslMechanisms(String[] saslMechanisms) {
        this.saslMechanisms = saslMechanisms;
    }

    public String getVhost() {
        return this.vhost;
    }

    public void setVhost(String vhost) {
        this.vhost = vhost;
    }

    public int getIdleTimeout() {
        return this.idleTimeout;
    }

    public void setIdleTimeout(int idleTimeout) {
        this.idleTimeout = idleTimeout;
    }

    public int getMaxFrameSize() {
        return this.maxFrameSize;
    }

    public void setMaxFrameSize(int maxFrameSize) {
        this.maxFrameSize = maxFrameSize;
    }

    public long getSessionOutgoingWindow() {
        return this.sessionOutoingWindow;
    }

    public void setSessionOutgoingWindow(long sessionOutoingWindow) {
        this.sessionOutoingWindow = sessionOutoingWindow;
    }

    public long getCloseTimeout() {
        return this.closeTimeout;
    }

    public long getConnectTimeout() {
        return this.connectTimeout;
    }

    public long getRequestTimeout() {
        return this.connection != null ? ((JmsConnectionInfo)this.connection.getResourceInfo()).getRequestTimeout() : -1L;
    }

    public long getSendTimeout() {
        return this.connection != null ? ((JmsConnectionInfo)this.connection.getResourceInfo()).getSendTimeout() : -1L;
    }

    public void setPresettle(boolean presettle) {
        this.setPresettleConsumers(presettle);
        this.setPresettleProducers(presettle);
    }

    public boolean isPresettleConsumers() {
        return this.presettleConsumers;
    }

    public void setPresettleConsumers(boolean presettle) {
        this.presettleConsumers = presettle;
    }

    public boolean isPresettleProducers() {
        return this.presettleProducers;
    }

    public void setPresettleProducers(boolean presettle) {
        this.presettleProducers = presettle;
    }

    public String toString() {
        return "AmqpProvider: " + this.getRemoteURI().getHost() + ":" + this.getRemoteURI().getPort();
    }

    public int getChannelMax() {
        return this.channelMax;
    }

    public void setChannelMax(int channelMax) {
        this.channelMax = channelMax;
    }

    String getTransportType() {
        return this.transportType;
    }

    void setTransportType(String transportType) {
        this.transportType = transportType;
    }

    @Override
    public void setProviderListener(ProviderListener listener) {
        this.listener = listener;
    }

    @Override
    public ProviderListener getProviderListener() {
        return this.listener;
    }

    @Override
    public URI getRemoteURI() {
        return this.remoteURI;
    }

    public Transport getProtonTransport() {
        return this.protonTransport;
    }

    public Connection getProtonConnection() {
        return this.protonConnection;
    }

    ScheduledExecutorService getScheduler() {
        return this.serializer;
    }

    @Override
    public AmqpProvider getProvider() {
        return this;
    }

    public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, final Exception error) {
        if (this.getRequestTimeout() != -1L) {
            return this.serializer.schedule(new Runnable(){

                @Override
                public void run() {
                    request.onFailure(error);
                    AmqpProvider.this.pumpToProtonTransport();
                }
            }, this.getRequestTimeout(), TimeUnit.MILLISECONDS);
        }
        return null;
    }

    Principal getLocalPrincipal() {
        if (this.transport instanceof SSLTransport) {
            return ((SSLTransport)this.transport).getLocalPrincipal();
        }
        return null;
    }

    private void checkClosed() throws ProviderClosedException {
        if (this.closed.get()) {
            throw new ProviderClosedException("This Provider is already closed");
        }
    }

    private final class IdleTimeoutCheck
    implements Runnable {
        private IdleTimeoutCheck() {
        }

        @Override
        public void run() {
            boolean checkScheduled = false;
            if (AmqpProvider.this.connection.getLocalState() == EndpointState.ACTIVE) {
                long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
                long deadline = AmqpProvider.this.protonTransport.tick(now);
                boolean pumpSucceeded = AmqpProvider.this.pumpToProtonTransport();
                if (AmqpProvider.this.protonTransport.isClosed()) {
                    LOG.info("IdleTimeoutCheck closed the transport due to the peer exceeding our requested idle-timeout.");
                    if (pumpSucceeded) {
                        AmqpProvider.this.fireProviderException(new IOException("Transport closed due to the peer exceeding our requested idle-timeout"));
                    }
                } else if (deadline > 0L) {
                    long delay = deadline - now;
                    checkScheduled = true;
                    LOG.trace("IdleTimeoutCheck rescheduling with delay: {}", (Object)delay);
                    AmqpProvider.this.nextIdleTimeoutCheck = AmqpProvider.this.serializer.schedule(this, delay, TimeUnit.MILLISECONDS);
                }
            } else {
                LOG.trace("IdleTimeoutCheck skipping check, connection is not active.");
            }
            if (!checkScheduled) {
                AmqpProvider.this.nextIdleTimeoutCheck = null;
                LOG.trace("IdleTimeoutCheck exiting");
            }
        }
    }
}

