package org.apache.activemq.apollo.mqtt;

import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.apollo.broker.AbstractRetainedDeliveryConsumer;
import org.apache.activemq.apollo.broker.AbstractSessionSinkFilter;
import org.apache.activemq.apollo.broker.BindAddress;
import org.apache.activemq.apollo.broker.BindableDeliveryProducer;
import org.apache.activemq.apollo.broker.BrokerConnection;
import org.apache.activemq.apollo.broker.ConnectAddress;
import org.apache.activemq.apollo.broker.Consumed$;
import org.apache.activemq.apollo.broker.CreditWindowFilter;
import org.apache.activemq.apollo.broker.Delivered$;
import org.apache.activemq.apollo.broker.Delivery;
import org.apache.activemq.apollo.broker.Delivery$;
import org.apache.activemq.apollo.broker.DeliveryProducer;
import org.apache.activemq.apollo.broker.DeliveryProducerRoute;
import org.apache.activemq.apollo.broker.DeliveryResult;
import org.apache.activemq.apollo.broker.DeliverySession;
import org.apache.activemq.apollo.broker.DestinationAddress;
import org.apache.activemq.apollo.broker.DestinationParser;
import org.apache.activemq.apollo.broker.MutableSink;
import org.apache.activemq.apollo.broker.RetainRemove$;
import org.apache.activemq.apollo.broker.RetainSet$;
import org.apache.activemq.apollo.broker.Session;
import org.apache.activemq.apollo.broker.SessionSink;
import org.apache.activemq.apollo.broker.SessionSinkMux;
import org.apache.activemq.apollo.broker.SimpleAddress;
import org.apache.activemq.apollo.broker.Sink;
import org.apache.activemq.apollo.broker.SubscriptionAddress;
import org.apache.activemq.apollo.broker.Undelivered$;
import org.apache.activemq.apollo.broker.VirtualHost;
import org.apache.activemq.apollo.broker.protocol.RawMessage;
import org.apache.activemq.apollo.broker.protocol.RawMessageCodec$;
import org.apache.activemq.apollo.broker.security.SecurityContext;
import org.apache.activemq.apollo.broker.store.StoreUOW;
import org.apache.activemq.apollo.filter.FilterException;
import org.apache.activemq.apollo.mqtt.MqttSessionManager;
import org.apache.activemq.apollo.util.Fn1;
import org.apache.activemq.apollo.util.LRUCache;
import org.apache.activemq.apollo.util.LongCounter;
import org.apache.activemq.apollo.util.Scala2Java;
import org.apache.activemq.apollo.util.UnitFn1;
import org.apache.activemq.apollo.util.UnitFn2;
import org.apache.activemq.apollo.util.path.Path$;
import org.apache.activemq.apollo.util.path.PathMap;
import org.apache.activemq.apollo.util.path.PathParser;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtdispatch.CustomDispatchSource;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.EventAggregator;
import org.fusesource.hawtdispatch.Task;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.DISCONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.MessageSupport;
import org.fusesource.mqtt.codec.PINGREQ;
import org.fusesource.mqtt.codec.PINGRESP;
import org.fusesource.mqtt.codec.PUBACK;
import org.fusesource.mqtt.codec.PUBCOMP;
import org.fusesource.mqtt.codec.PUBLISH;
import org.fusesource.mqtt.codec.PUBREC;
import org.fusesource.mqtt.codec.PUBREL;
import org.fusesource.mqtt.codec.SUBACK;
import org.fusesource.mqtt.codec.SUBSCRIBE;
import org.fusesource.mqtt.codec.UNSUBACK;
import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import scala.Option;
import scala.Tuple2;

/* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSession.class */
public class MqttSession {
    public final MqttSessionManager.HostState host_state;
    public final UTF8Buffer client_id;
    public final MqttSessionManager.SessionState session_state;
    public final DispatchQueue queue;
    public MqttProtocolHandler handler;
    public SecurityContext security_context;
    public CONNECT connect_message;
    MqttConsumer _mqtt_consumer;
    static final /* synthetic */ boolean $assertionsDisabled;
    public boolean manager_disconnected = false;
    public boolean clean_session = false;
    public DestinationParser destination_parser = MqttProtocol.destination_parser;
    boolean publish_body = false;
    final HashMap<Short, Request> in_flight_publishes = new HashMap<>();
    LRUCache<UTF8Buffer, MqttProducerRoute> producerRoutes = new LRUCache<UTF8Buffer, MqttProducerRoute>(10) { // from class: org.apache.activemq.apollo.mqtt.MqttSession.15
        protected void onCacheEviction(final Map.Entry<UTF8Buffer, MqttProducerRoute> entry) {
            MqttSession.this.host().dispatch_queue().execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.15.1
                public void run() {
                    MqttSession.this.host().router().disconnect(new ConnectAddress[]{((MqttProducerRoute) entry.getValue()).address}, (BindableDeliveryProducer) entry.getValue());
                }
            });
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.activemq.apollo.mqtt.MqttSession$18, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSession$18.class */
    public class AnonymousClass18 extends Task {
        final /* synthetic */ SimpleAddress val$destination;
        final /* synthetic */ DeliveryProducerRoute val$prodcuer;
        final /* synthetic */ Task val$complete_close;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* renamed from: org.apache.activemq.apollo.mqtt.MqttSession$18$1, reason: invalid class name */
        /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSession$18$1.class */
        public class AnonymousClass1 extends Task {
            AnonymousClass1() {
            }

            public void run() {
                if (AnonymousClass18.this.val$prodcuer.targets().isEmpty()) {
                    AnonymousClass18.this.val$complete_close.run();
                    return;
                }
                Delivery delivery = new Delivery();
                delivery.message_$eq(new RawMessage(MqttSession.this.connect_message.willMessage()));
                delivery.size_$eq(MqttSession.this.connect_message.willMessage().length);
                delivery.persistent_$eq(MqttSession.this.connect_message.willQos().ordinal() > 0);
                if (MqttSession.this.connect_message.willRetain()) {
                    if (delivery.size() == 0) {
                        delivery.retain_$eq(RetainRemove$.MODULE$);
                    } else {
                        delivery.retain_$eq(RetainSet$.MODULE$);
                    }
                }
                delivery.ack_$eq(Scala2Java.toScala(new UnitFn2<DeliveryResult, StoreUOW>() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.18.1.1
                    public void call(DeliveryResult deliveryResult, StoreUOW storeUOW) {
                        MqttSession.this.host().dispatch_queue().execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.18.1.1.1
                            public void run() {
                                MqttSession.this.host().router().disconnect(new ConnectAddress[]{AnonymousClass18.this.val$destination}, AnonymousClass18.this.val$prodcuer);
                            }
                        });
                        AnonymousClass18.this.val$complete_close.run();
                    }
                }));
                MqttSession.this.handler.messages_received.incrementAndGet();
                AnonymousClass18.this.val$prodcuer.offer(delivery);
            }
        }

        AnonymousClass18(SimpleAddress simpleAddress, DeliveryProducerRoute deliveryProducerRoute, Task task) {
            this.val$destination = simpleAddress;
            this.val$prodcuer = deliveryProducerRoute;
            this.val$complete_close = task;
        }

        public void run() {
            MqttSession.this.host().router().connect(new ConnectAddress[]{this.val$destination}, this.val$prodcuer, MqttSession.this.security_context);
            MqttSession.this.queue.execute(new AnonymousClass1());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.activemq.apollo.mqtt.MqttSession$19, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSession$19.class */
    public class AnonymousClass19 extends Task {
        final /* synthetic */ SUBSCRIBE val$sub;

        AnonymousClass19(SUBSCRIBE subscribe) {
            this.val$sub = subscribe;
        }

        public void run() {
            MqttSession.this.queue.execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.19.1
                public void run() {
                    MqttSession.this.session_state.strategy.update(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.19.1.1
                        public void run() {
                            MessageSupport.Message suback = new SUBACK();
                            suback.messageId(AnonymousClass19.this.val$sub.messageId());
                            byte[] bArr = new byte[AnonymousClass19.this.val$sub.topics().length];
                            int i = 0;
                            for (Topic topic : AnonymousClass19.this.val$sub.topics()) {
                                bArr[i] = (byte) topic.qos().ordinal();
                                i++;
                            }
                            suback.grantedQos(bArr);
                            MqttSession.this.send(suback);
                        }
                    });
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.activemq.apollo.mqtt.MqttSession$23, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSession$23.class */
    public class AnonymousClass23 extends Task {
        final /* synthetic */ BindAddress[] val$addresses;
        final /* synthetic */ UNSUBSCRIBE val$unsubscribe;

        AnonymousClass23(BindAddress[] bindAddressArr, UNSUBSCRIBE unsubscribe) {
            this.val$addresses = bindAddressArr;
            this.val$unsubscribe = unsubscribe;
        }

        public void run() {
            if (MqttSession.this.clean_session) {
                MqttSession.this.host().router().unbind(this.val$addresses, MqttSession.this.mqtt_consumer(), false, MqttSession.this.security_context);
            } else if (MqttSession.this.mqtt_consumer().addresses.isEmpty()) {
                MqttSession.this.host().router().unbind(new BindAddress[]{MqttSession.this.session_state.durable_sub}, MqttSession.this.mqtt_consumer(), true, MqttSession.this.security_context);
                MqttSession.this.session_state.durable_sub = null;
            } else {
                MqttSession.this.host().router().bind(new BindAddress[]{MqttSession.this.session_state.durable_sub}, MqttSession.this.mqtt_consumer(), MqttSession.this.security_context, Scala2Java.noopFn1());
            }
            MqttSession.this.queue.execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.23.1
                public void run() {
                    MqttSession.this.session_state.strategy.update(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.23.1.1
                        public void run() {
                            MessageSupport.Message unsuback = new UNSUBACK();
                            unsuback.messageId(AnonymousClass23.this.val$unsubscribe.messageId());
                            MqttSession.this.send(unsuback);
                        }
                    });
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSession$AtLeastOnceProducerAck.class */
    public class AtLeastOnceProducerAck extends UnitFn2<DeliveryResult, StoreUOW> {
        public final PUBLISH publish;

        AtLeastOnceProducerAck(PUBLISH publish) {
            this.publish = publish;
        }

        @Override // 
        public void call(DeliveryResult deliveryResult, StoreUOW storeUOW) {
            MqttSession.this.queue.execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.AtLeastOnceProducerAck.1
                public void run() {
                    MessageSupport.Message puback = new PUBACK();
                    puback.messageId(AtLeastOnceProducerAck.this.publish.messageId());
                    MqttSession.this.send(puback);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSession$ExactlyOnceProducerAck.class */
    public class ExactlyOnceProducerAck extends AtLeastOnceProducerAck {
        ExactlyOnceProducerAck(PUBLISH publish) {
            super(publish);
        }

        @Override // org.apache.activemq.apollo.mqtt.MqttSession.AtLeastOnceProducerAck
        public void call(DeliveryResult deliveryResult, StoreUOW storeUOW) {
            MqttSession.this.queue.execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.ExactlyOnceProducerAck.1
                public void run() {
                    MqttSession.this.session_state.received_message_ids.add(Short.valueOf(ExactlyOnceProducerAck.this.publish.messageId()));
                    MqttSession.this.session_state.strategy.update(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.ExactlyOnceProducerAck.1.1
                        public void run() {
                            MessageSupport.Message pubrec = new PUBREC();
                            pubrec.messageId(ExactlyOnceProducerAck.this.publish.messageId());
                            MqttSession.this.send(pubrec);
                        }
                    });
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSession$IntPair.class */
    public class IntPair {
        int _1;
        int _2;

        IntPair(int i, int i2) {
            this._1 = i;
            this._2 = i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSession$MqttConsumer.class */
    public class MqttConsumer extends AbstractRetainedDeliveryConsumer {
        public HashMap<BindAddress, QoS> addresses = new HashMap<>();
        public PathMap wildcards = new PathMap();
        CustomDispatchSource<IntPair, IntPair> credit_window_source;
        MutableSink<Request> consumer_sink;
        public LongCounter next_seq_id;
        CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>> credit_window_filter;
        SessionSinkMux<Delivery> session_manager;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSession$MqttConsumer$MqttConsumerSession.class */
        public class MqttConsumerSession extends AbstractSessionSinkFilter<Delivery> implements DeliverySession {
            final DeliveryProducer producer;
            final SessionSink<Delivery> downstream;
            public boolean closed = false;
            static final /* synthetic */ boolean $assertionsDisabled;

            MqttConsumerSession(DeliveryProducer deliveryProducer) {
                deliveryProducer.dispatch_queue().assertExecuting();
                this.producer = deliveryProducer;
                this.downstream = MqttConsumer.this.session_manager.open(deliveryProducer.dispatch_queue());
                MqttConsumer.this.retain();
            }

            public SessionSink<Delivery> downstream_session_sink() {
                return this.downstream;
            }

            public DeliveryProducer producer() {
                return this.producer;
            }

            public String toString() {
                return MqttSession.this.handler == null ? "unconnected" : "connection to " + MqttSession.this.handler.connection().transport().getRemoteAddress();
            }

            /* renamed from: consumer, reason: merged with bridge method [inline-methods] */
            public MqttConsumer m10consumer() {
                return MqttSession.this.mqtt_consumer();
            }

            public void close() {
                this.producer.dispatch_queue().assertExecuting();
                if (this.closed) {
                    return;
                }
                this.closed = true;
                dispose();
            }

            public void dispose() {
                MqttConsumer.this.session_manager.close(downstream(), Scala2Java.toScala(new UnitFn1<Delivery>() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.MqttConsumer.MqttConsumerSession.1
                    public void call(Delivery delivery) {
                        if (delivery.ack() != null) {
                            delivery.ack().apply(Undelivered$.MODULE$, delivery.uow());
                        }
                    }
                }));
                MqttConsumer.this.release();
            }

            public boolean offer(Delivery delivery) {
                if (full()) {
                    return false;
                }
                delivery.message().retain();
                boolean offer = downstream().offer(delivery);
                if ($assertionsDisabled || offer) {
                    return true;
                }
                throw new AssertionError("offer should be accepted since it was not full");
            }

            static {
                $assertionsDisabled = !MqttSession.class.desiredAssertionStatus();
            }
        }

        MqttConsumer() {
            this.credit_window_source = Dispatch.createSource(new EventAggregator<IntPair, IntPair>() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.MqttConsumer.1
                public IntPair mergeEvent(IntPair intPair, IntPair intPair2) {
                    return intPair == null ? intPair2 : new IntPair(intPair._1 + intPair2._1, intPair._2 + intPair2._2);
                }

                public IntPair mergeEvents(IntPair intPair, IntPair intPair2) {
                    return mergeEvent(intPair, intPair2);
                }
            }, MqttSession.this.queue);
            this.credit_window_source.setEventHandler(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.MqttConsumer.2
                public void run() {
                    IntPair intPair = (IntPair) MqttConsumer.this.credit_window_source.getData();
                    MqttConsumer.this.credit_window_filter.credit(intPair._1, intPair._2);
                }
            });
            this.credit_window_source.resume();
            this.consumer_sink = new MutableSink<>();
            this.consumer_sink.downstream_$eq(Scala2Java.none());
            this.next_seq_id = new LongCounter(0L);
            this.credit_window_filter = new CreditWindowFilter<>(this.consumer_sink.flatMap(Scala2Java.toScala(new Fn1<Tuple2<Session<Delivery>, Delivery>, Option<Request>>() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.MqttConsumer.3
                public Option<Request> apply(Tuple2<Session<Delivery>, Delivery> tuple2) {
                    MqttSession.this.queue.assertExecuting();
                    Session session = (Session) tuple2._1();
                    final Delivery delivery = (Delivery) tuple2._2();
                    MqttConsumer.this.session_manager.delivered(session, delivery.size());
                    SimpleAddress simple = ((DestinationAddress) delivery.sender().head()).simple();
                    QoS qoS = MqttConsumer.this.addresses.get(simple);
                    if (qoS == null) {
                        qoS = (QoS) Scala2Java.head(MqttConsumer.this.wildcards.get(simple.path()));
                    }
                    if (qoS == null) {
                        MqttConsumer.this.acked(delivery, Consumed$.MODULE$);
                        return Scala2Java.none();
                    }
                    PUBLISH publish = new PUBLISH();
                    publish.topicName(new UTF8Buffer(MqttSession.this.destination_parser.encode_destination((DestinationAddress) delivery.sender().head())));
                    if (delivery.redeliveries() > 0) {
                        publish.dup(true);
                    }
                    if (delivery.message().codec() == RawMessageCodec$.MODULE$) {
                        publish.payload(delivery.message().payload());
                    } else if (MqttSession.this.publish_body) {
                        try {
                            publish.payload((Buffer) delivery.message().getBodyAs(Buffer.class));
                        } catch (FilterException e) {
                            MqttProtocolHandler.log.error(e, "Internal Server Error: Could not covert message body to a Buffer", new Object[0]);
                        }
                    } else {
                        publish.payload(delivery.message().encoded());
                    }
                    MqttSession.this.handler.messages_sent.incrementAndGet();
                    UnitFn1<DeliveryResult> unitFn1 = new UnitFn1<DeliveryResult>() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.MqttConsumer.3.1
                        public void call(DeliveryResult deliveryResult) {
                            MqttConsumer.this.acked(delivery, deliveryResult);
                        }
                    };
                    if (delivery.ack() == null || qoS == QoS.AT_MOST_ONCE) {
                        publish.qos(QoS.AT_MOST_ONCE);
                        return Scala2Java.some(new Request((short) 0, publish, unitFn1));
                    }
                    publish.qos(qoS);
                    short s = MqttConsumer.this.to_message_id(MqttSession.this.clean_session ? MqttConsumer.this.get_next_seq_id() : delivery.seq());
                    publish.messageId(s);
                    Request request = new Request(s, publish, unitFn1);
                    Request put = MqttSession.this.in_flight_publishes.put(Short.valueOf(s), request);
                    if (put != null) {
                        if (put.message == null) {
                            MqttSession.this.in_flight_publishes.remove(Short.valueOf(s));
                            MqttConsumer.this.acked(delivery, Consumed$.MODULE$);
                        } else {
                            MqttSession.this.handler.async_die("Client not acking regularly.", (Throwable) null);
                        }
                    }
                    return Scala2Java.some(request);
                }
            })), SessionDeliverySizer.INSTANCE);
            this.credit_window_filter.credit(MqttSession.this.handler.codec.getWriteBufferSize() * 2, 1);
            this.session_manager = new SessionSinkMux<Delivery>(this.credit_window_filter, MqttSession.this.queue, Delivery$.MODULE$, 1073741823, receive_buffer_size()) { // from class: org.apache.activemq.apollo.mqtt.MqttSession.MqttConsumer.4
                public long time_stamp() {
                    return MqttSession.this.host().broker().now();
                }
            };
        }

        public String toString() {
            return "mqtt client:" + MqttSession.this.client_id + " remote address: " + MqttSession.this.security_context.remote_address();
        }

        public DispatchQueue dispatch_queue() {
            return MqttSession.this.queue;
        }

        public long get_next_seq_id() {
            return this.next_seq_id.getAndIncrement();
        }

        short to_message_id(long j) {
            return (short) (32768 | (j & 32767));
        }

        public void acked(Delivery delivery, DeliveryResult deliveryResult) {
            MqttSession.this.queue.assertExecuting();
            this.credit_window_source.merge(new IntPair(delivery.size(), 1));
            if (delivery.ack() != null) {
                delivery.ack().apply(deliveryResult, (Object) null);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void super_dispose() {
            super.dispose();
        }

        protected void dispose() {
            MqttSession.this.queue.execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.MqttConsumer.5
                public void run() {
                    MqttConsumer.this.super_dispose();
                }
            });
        }

        public Option<BrokerConnection> connection() {
            return MqttSession.this.handler != null ? Scala2Java.some(MqttSession.this.handler.connection()) : Scala2Java.none();
        }

        public int receive_buffer_size() {
            return 65536;
        }

        public boolean is_persistent() {
            return false;
        }

        public boolean matches(Delivery delivery) {
            return true;
        }

        /* renamed from: connect, reason: merged with bridge method [inline-methods] */
        public MqttConsumerSession m8connect(DeliveryProducer deliveryProducer) {
            return new MqttConsumerSession(deliveryProducer);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSession$MqttProducerRoute.class */
    public class MqttProducerRoute extends DeliveryProducerRoute {
        public final SimpleAddress address;
        public final MqttProtocolHandler handler;
        boolean suspended;

        public MqttProducerRoute(SimpleAddress simpleAddress, MqttProtocolHandler mqttProtocolHandler) {
            super(MqttSession.this.host().router());
            this.suspended = false;
            this.address = simpleAddress;
            this.handler = mqttProtocolHandler;
            refiller_$eq(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.MqttProducerRoute.1
                public void run() {
                    if (MqttProducerRoute.this.suspended) {
                        MqttProducerRoute.this.suspended = false;
                        MqttProducerRoute.this.handler.resume_read();
                    }
                }
            });
        }

        public int send_buffer_size() {
            return this.handler.codec.getReadBufferSize();
        }

        public Option<BrokerConnection> connection() {
            return Scala2Java.some(this.handler.connection());
        }

        public DispatchQueue dispatch_queue() {
            return MqttSession.this.queue;
        }
    }

    public MqttSession(MqttSessionManager.HostState hostState, UTF8Buffer uTF8Buffer, MqttSessionManager.SessionState sessionState) {
        this.host_state = hostState;
        this.client_id = uTF8Buffer;
        this.queue = Dispatch.createQueue("mqtt: " + uTF8Buffer);
        this.session_state = sessionState;
    }

    public VirtualHost host() {
        return this.host_state.host;
    }

    public void connect(final MqttProtocolHandler mqttProtocolHandler) {
        this.queue.execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.1
            public void run() {
                if (MqttSession.this.manager_disconnected) {
                    MqttSessionManager.attach(MqttSession.this.host(), MqttSession.this.client_id, mqttProtocolHandler);
                    return;
                }
                MqttSession.this.queue.suspend();
                if (MqttSession.this.handler != null) {
                    MqttSession.this.detach();
                    MqttSession.this.handler = null;
                }
                MqttSession.this.queue.execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.1.1
                    public void run() {
                        MqttSession.this.handler = mqttProtocolHandler;
                        MqttSession.this.attach();
                    }
                });
                mqttProtocolHandler.connection()._set_dispatch_queue(MqttSession.this.queue, new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.1.2
                    public void run() {
                        MqttSession.this.queue.resume();
                    }
                });
            }
        });
    }

    public void disconnect(final MqttProtocolHandler mqttProtocolHandler) {
        this.queue.execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.2
            public void run() {
                if (MqttSession.this.handler == mqttProtocolHandler) {
                    MqttSessionManager.remove(MqttSession.this.host_state, MqttSession.this.client_id);
                    MqttSession.this.manager_disconnected = true;
                    MqttSession.this.detach();
                    MqttSession.this.handler = null;
                }
            }
        });
    }

    public void attach() {
        this.queue.assertExecuting();
        final MqttProtocolHandler mqttProtocolHandler = this.handler;
        this.clean_session = mqttProtocolHandler.connect_message.cleanSession();
        this.security_context = mqttProtocolHandler.security_context;
        mqttProtocolHandler.command_handler = new UnitFn1<Object>() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.3
            public void call(Object obj) {
                MqttSession.this.on_transport_command(obj);
            }
        };
        this.destination_parser = mqttProtocolHandler.destination_parser();
        mqtt_consumer().consumer_sink.downstream_$eq(Scala2Java.some(mqttProtocolHandler.sink_manager.open()));
        final Task task = new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.4
            public void run() {
                MqttSession.this.queue.assertExecuting();
                MqttSession.this.connect_message = mqttProtocolHandler.connect_message;
                MessageSupport.Message connack = new CONNACK();
                connack.code(CONNACK.Code.CONNECTION_ACCEPTED);
                MqttSession.this.send(connack);
            }
        };
        if (this.clean_session) {
            this.session_state.subscriptions.clear();
            if (this.session_state.durable_sub != null) {
                final DestinationAddress[] destinationAddressArr = {this.session_state.durable_sub};
                this.session_state.durable_sub = null;
                host().dispatch_queue().execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.7
                    public void run() {
                        MqttSession.this.host().router().delete(destinationAddressArr, MqttSession.this.security_context);
                    }
                });
            }
            this.session_state.strategy.destroy(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.8
                public void run() {
                    task.run();
                }
            });
            return;
        }
        this.session_state.strategy.create(host().store(), this.client_id);
        if (this.session_state.subscriptions.isEmpty()) {
            task.run();
        } else {
            mqttProtocolHandler._suspend_read("subscribing");
            subscribe(Scala2Java.map(this.session_state.subscriptions.values(), new Fn1<Tuple2<Topic, BindAddress>, Topic>() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.5
                public Topic apply(Tuple2<Topic, BindAddress> tuple2) {
                    return (Topic) tuple2._1();
                }
            }), new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.6
                public void run() {
                    mqttProtocolHandler.resume_read();
                    mqttProtocolHandler.queue().execute(task);
                }
            });
        }
    }

    public void detach() {
        this.queue.assertExecuting();
        if (!this.producerRoutes.isEmpty()) {
            final ArrayList arrayList = new ArrayList(this.producerRoutes.values());
            host().dispatch_queue().execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.9
                public void run() {
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        MqttProducerRoute mqttProducerRoute = (MqttProducerRoute) it.next();
                        MqttSession.this.host().router().disconnect(new ConnectAddress[]{mqttProducerRoute.address}, mqttProducerRoute);
                    }
                }
            });
            this.producerRoutes.clear();
        }
        if (this.clean_session) {
            if (!mqtt_consumer().addresses.isEmpty()) {
                final BindAddress[] bindAddressArr = (BindAddress[]) mqtt_consumer().addresses.keySet().toArray(new BindAddress[mqtt_consumer().addresses.size()]);
                host().dispatch_queue().execute(new Runnable() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.10
                    @Override // java.lang.Runnable
                    public void run() {
                        MqttSession.this.host().router().unbind(bindAddressArr, MqttSession.this.mqtt_consumer(), false, MqttSession.this.security_context);
                    }
                });
                mqtt_consumer().addresses.clear();
            }
            this.session_state.subscriptions.clear();
        } else if (this.session_state.durable_sub != null) {
            final BindAddress[] bindAddressArr2 = {this.session_state.durable_sub};
            host().dispatch_queue().execute(new Runnable() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.11
                @Override // java.lang.Runnable
                public void run() {
                    MqttSession.this.host().router().unbind(bindAddressArr2, MqttSession.this.mqtt_consumer(), false, MqttSession.this.security_context);
                }
            });
            mqtt_consumer().addresses.clear();
            this.session_state.durable_sub = null;
        }
        for (Request request : this.in_flight_publishes.values()) {
            if (request.ack != null) {
                request.ack.apply(request.delivered ? Delivered$.MODULE$ : Undelivered$.MODULE$);
            }
        }
        this.in_flight_publishes.clear();
        this.handler.sink_manager.close((Sink) mqtt_consumer().consumer_sink.downstream().get(), Scala2Java.noopFn1());
        mqtt_consumer().consumer_sink.downstream_$eq(Scala2Java.none());
        this.handler.on_transport_disconnected();
    }

    public SimpleAddress decode_destination(UTF8Buffer uTF8Buffer) {
        SimpleAddress decode_single_destination = this.destination_parser.decode_single_destination(uTF8Buffer.toString(), Scala2Java.toScala(new Fn1<String, SimpleAddress>() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.12
            public SimpleAddress apply(String str) {
                return new SimpleAddress("topic", MqttSession.this.destination_parser.decode_path(str));
            }
        }));
        if (decode_single_destination == null && this.handler != null) {
            this.handler.die("Invalid mqtt destination name: " + uTF8Buffer);
        }
        return decode_single_destination;
    }

    public void send(MessageSupport.Message message) {
        this.queue.assertExecuting();
        this.handler.connection_sink.offer(new Request((short) 0, message, null));
    }

    public void publish_completed(short s) {
        this.queue.assertExecuting();
        Request remove = this.in_flight_publishes.remove(Short.valueOf(s));
        if (remove == null) {
            this.in_flight_publishes.put(Short.valueOf(s), new Request(s, null, null));
        } else if (remove.ack != null) {
            remove.ack.apply(Consumed$.MODULE$);
        }
    }

    public void on_transport_command(Object obj) {
        try {
            if (obj.getClass() == MQTTFrame.class) {
                MQTTFrame mQTTFrame = (MQTTFrame) obj;
                switch (mQTTFrame.messageType()) {
                    case 3:
                        on_mqtt_publish((PUBLISH) MqttProtocolHandler.received(new PUBLISH().decode(mQTTFrame)));
                        break;
                    case 4:
                        publish_completed(((PUBACK) MqttProtocolHandler.received(new PUBACK().decode(mQTTFrame))).messageId());
                        break;
                    case 5:
                        send(new PUBREL().messageId(((PUBREC) MqttProtocolHandler.received(new PUBREC().decode(mQTTFrame))).messageId()));
                        break;
                    case 6:
                        final PUBREL pubrel = (PUBREL) MqttProtocolHandler.received(new PUBREL().decode(mQTTFrame));
                        this.session_state.received_message_ids.remove(Short.valueOf(pubrel.messageId()));
                        this.session_state.strategy.update(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.13
                            public void run() {
                                MqttSession.this.send(new PUBCOMP().messageId(pubrel.messageId()));
                            }
                        });
                        break;
                    case 7:
                        publish_completed(((PUBCOMP) MqttProtocolHandler.received(new PUBCOMP().decode(mQTTFrame))).messageId());
                        break;
                    case 8:
                        on_mqtt_subscribe((SUBSCRIBE) MqttProtocolHandler.received(new SUBSCRIBE().decode(mQTTFrame)));
                        break;
                    case 9:
                    case 11:
                    case 13:
                    default:
                        this.handler.die("Invalid MQTT message type: " + ((int) mQTTFrame.messageType()));
                        break;
                    case 10:
                        on_mqtt_unsubscribe((UNSUBSCRIBE) MqttProtocolHandler.received(new UNSUBSCRIBE().decode(mQTTFrame)));
                        break;
                    case 12:
                        MqttProtocolHandler.received(new PINGREQ().decode(mQTTFrame));
                        send(new PINGRESP());
                        break;
                    case 14:
                        MqttProtocolHandler.received(new DISCONNECT());
                        MqttSessionManager.disconnect(this.host_state, this.client_id, this.handler);
                        break;
                }
            } else if ("failure".equals(obj)) {
                publish_will(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.14
                    public void run() {
                        MqttSessionManager.disconnect(MqttSession.this.host_state, MqttSession.this.client_id, MqttSession.this.handler);
                    }
                });
            } else {
                this.handler.die("Internal Server Error: unexpected mqtt command: " + obj.getClass());
            }
        } catch (ProtocolException e) {
            this.handler.die("Internal Server Error: " + e);
        }
    }

    public void on_mqtt_publish(final PUBLISH publish) {
        if (publish.qos() == QoS.EXACTLY_ONCE && this.session_state.received_message_ids.contains(Short.valueOf(publish.messageId()))) {
            PUBREC pubrec = new PUBREC();
            pubrec.messageId(publish.messageId());
            send(pubrec);
            return;
        }
        this.handler.messages_received.incrementAndGet();
        this.queue.assertExecuting();
        MqttProducerRoute mqttProducerRoute = (MqttProducerRoute) this.producerRoutes.get(publish.topicName());
        if (mqttProducerRoute != null) {
            send_via_route(mqttProducerRoute, publish);
            return;
        }
        final SimpleAddress decode_destination = decode_destination(publish.topicName());
        final MqttProducerRoute mqttProducerRoute2 = new MqttProducerRoute(decode_destination, this.handler);
        mqttProducerRoute2.handler._suspend_read("route publish lookup");
        host().dispatch_queue().execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.16
            public void run() {
                MqttSession.this.host().router().connect(new ConnectAddress[]{decode_destination}, mqttProducerRoute2, MqttSession.this.security_context);
                MqttSession.this.queue.execute(new Runnable() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.16.1
                    @Override // java.lang.Runnable
                    public void run() {
                        if (mqttProducerRoute2.handler.connection().stopped()) {
                            return;
                        }
                        mqttProducerRoute2.handler.resume_read();
                        MqttSession.this.producerRoutes.put(publish.topicName(), mqttProducerRoute2);
                        MqttSession.this.send_via_route(mqttProducerRoute2, publish);
                    }
                });
            }
        });
    }

    public void send_via_route(MqttProducerRoute mqttProducerRoute, PUBLISH publish) {
        this.queue.assertExecuting();
        AtLeastOnceProducerAck atLeastOnceProducerAck = null;
        if (publish.qos() == QoS.AT_LEAST_ONCE) {
            atLeastOnceProducerAck = new AtLeastOnceProducerAck(publish);
        } else if (publish.qos() == QoS.EXACTLY_ONCE) {
            atLeastOnceProducerAck = new ExactlyOnceProducerAck(publish);
        }
        if (mqttProducerRoute.targets().isEmpty()) {
            if (atLeastOnceProducerAck != null) {
                atLeastOnceProducerAck.apply(null, null);
                return;
            }
            return;
        }
        Delivery delivery = new Delivery();
        delivery.message_$eq(new RawMessage(publish.payload()));
        delivery.persistent_$eq(publish.qos().ordinal() > 0);
        delivery.size_$eq(publish.payload().length);
        delivery.ack_$eq(Scala2Java.toScala(atLeastOnceProducerAck));
        if (publish.retain()) {
            if (delivery.size() == 0) {
                delivery.retain_$eq(RetainRemove$.MODULE$);
            } else {
                delivery.retain_$eq(RetainSet$.MODULE$);
            }
        }
        if (!$assertionsDisabled && mqttProducerRoute.full()) {
            throw new AssertionError();
        }
        mqttProducerRoute.offer(delivery);
        if (mqttProducerRoute.full()) {
            mqttProducerRoute.suspended = true;
            this.handler._suspend_read("blocked sending to: " + mqttProducerRoute.address);
        }
    }

    public void publish_will(Task task) {
        if (this.connect_message != null) {
            if (this.connect_message.willTopic() == null) {
                task.run();
                return;
            }
            host().dispatch_queue().execute(new AnonymousClass18(decode_destination(this.connect_message.willTopic()), new DeliveryProducerRoute(host().router()) { // from class: org.apache.activemq.apollo.mqtt.MqttSession.17
                {
                    refiller_$eq(Dispatch.NOOP);
                }

                public int send_buffer_size() {
                    return 65536;
                }

                public Option<BrokerConnection> connection() {
                    return MqttSession.this.handler != null ? Scala2Java.some(MqttSession.this.handler.connection()) : Scala2Java.none();
                }

                public DispatchQueue dispatch_queue() {
                    return MqttSession.this.queue;
                }
            }, task));
        }
    }

    public void on_mqtt_subscribe(SUBSCRIBE subscribe) {
        subscribe(Arrays.asList(subscribe.topics()), new AnonymousClass19(subscribe));
    }

    public void subscribe(Collection<Topic> collection, final Task task) {
        final ArrayList map = Scala2Java.map(collection, new Fn1<Topic, BindAddress>() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.20
            public BindAddress apply(Topic topic) {
                BindAddress decode_destination = MqttSession.this.decode_destination(topic.name());
                MqttSession.this.session_state.subscriptions.put(topic.name(), new Tuple2<>(topic, decode_destination));
                MqttSession.this.mqtt_consumer().addresses.put(decode_destination, topic.qos());
                if (PathParser.containsWildCards(decode_destination.path())) {
                    MqttSession.this.mqtt_consumer().wildcards.put(decode_destination.path(), topic.qos());
                }
                return decode_destination;
            }
        });
        this.handler.subscription_count = mqtt_consumer().addresses.size();
        if (!this.clean_session) {
            Set<BindAddress> keySet = mqtt_consumer().addresses.keySet();
            SubscriptionAddress subscriptionAddress = new SubscriptionAddress(Path$.MODULE$.create(this.client_id.toString()), (String) null, (BindAddress[]) keySet.toArray(new BindAddress[keySet.size()]));
            this.session_state.durable_sub = subscriptionAddress;
            map.clear();
            map.add(subscriptionAddress);
        }
        host().dispatch_queue().execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.21
            public void run() {
                Iterator it = map.iterator();
                while (it.hasNext()) {
                    MqttSession.this.host().router().bind(new BindAddress[]{(BindAddress) it.next()}, MqttSession.this.mqtt_consumer(), MqttSession.this.security_context, Scala2Java.noopFn1());
                }
                task.run();
            }
        });
    }

    public void on_mqtt_unsubscribe(UNSUBSCRIBE unsubscribe) {
        ArrayList flatMap = Scala2Java.flatMap(Arrays.asList(unsubscribe.topics()), new Fn1<UTF8Buffer, Option<BindAddress>>() { // from class: org.apache.activemq.apollo.mqtt.MqttSession.22
            public Option<BindAddress> apply(UTF8Buffer uTF8Buffer) {
                Tuple2<Topic, BindAddress> remove = MqttSession.this.session_state.subscriptions.remove(uTF8Buffer);
                if (remove == null) {
                    return Scala2Java.none();
                }
                Topic topic = (Topic) remove._1();
                BindAddress bindAddress = (BindAddress) remove._2();
                MqttSession.this.mqtt_consumer().addresses.remove(bindAddress);
                if (PathParser.containsWildCards(bindAddress.path())) {
                    MqttSession.this.mqtt_consumer().wildcards.remove(bindAddress.path(), topic.qos());
                }
                return Scala2Java.some(bindAddress);
            }
        });
        BindAddress[] bindAddressArr = (BindAddress[]) flatMap.toArray(new BindAddress[flatMap.size()]);
        this.handler.subscription_count = mqtt_consumer().addresses.size();
        if (!this.clean_session) {
            Set<BindAddress> keySet = mqtt_consumer().addresses.keySet();
            this.session_state.durable_sub = new SubscriptionAddress(Path$.MODULE$.create(this.client_id.toString()), (String) null, (BindAddress[]) keySet.toArray(new BindAddress[keySet.size()]));
        }
        host().dispatch_queue().execute(new AnonymousClass23(bindAddressArr, unsubscribe));
    }

    MqttConsumer mqtt_consumer() {
        if (this._mqtt_consumer == null) {
            this._mqtt_consumer = new MqttConsumer();
        }
        return this._mqtt_consumer;
    }

    static {
        $assertionsDisabled = !MqttSession.class.desiredAssertionStatus();
    }
}
