package org.apache.activemq.apollo.mqtt;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.activemq.apollo.broker.BindAddress;
import org.apache.activemq.apollo.broker.SimpleAddress;
import org.apache.activemq.apollo.broker.SubscriptionAddress;
import org.apache.activemq.apollo.broker.VirtualHost;
import org.apache.activemq.apollo.broker.store.Store;
import org.apache.activemq.apollo.broker.store.StoreUOW;
import org.apache.activemq.apollo.mqtt.SessionPB;
import org.apache.activemq.apollo.mqtt.TopicPB;
import org.apache.activemq.apollo.util.Fn0;
import org.apache.activemq.apollo.util.Scala2Java;
import org.apache.activemq.apollo.util.UnitFn0;
import org.apache.activemq.apollo.util.UnitFn1;
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtbuf.proto.InvalidProtocolBufferException;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.Task;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import scala.Tuple2;
import scala.collection.Seq;

/* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSessionManager.class */
public class MqttSessionManager {
    public static final Scala2Java.Logger log = MqttProtocolHandler.log;
    static DispatchQueue queue = Dispatch.createQueue("session manager");

    /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSessionManager$HostState.class */
    public static class HostState {
        public final VirtualHost host;
        public final HashMap<UTF8Buffer, SessionState> session_states = new HashMap<>();
        public final HashMap<UTF8Buffer, MqttSession> sessions = new HashMap<>();
        public boolean loaded = false;

        public HostState(VirtualHost virtualHost) {
            this.host = virtualHost;
        }

        public void on_load(final Task task) {
            if (this.loaded) {
                task.run();
            } else if (this.host.store() != null) {
                MqttSessionManager.queue.suspend();
                this.host.store().get_prefixed_map_entries(new AsciiBuffer("mqtt:"), Scala2Java.toScala(new UnitFn1<Seq<Tuple2<Buffer, Buffer>>>() { // from class: org.apache.activemq.apollo.mqtt.MqttSessionManager.HostState.1
                    public void call(final Seq<Tuple2<Buffer, Buffer>> seq) {
                        MqttSessionManager.queue.resume();
                        MqttSessionManager.queue.execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSessionManager.HostState.1.1
                            public void run() {
                                Iterator it = Scala2Java.toIterable(seq).iterator();
                                while (it.hasNext()) {
                                    try {
                                        SessionPB.Buffer m20parseUnframed = SessionPB.FACTORY.m20parseUnframed((Buffer) ((Tuple2) it.next())._2());
                                        SessionState sessionState = new SessionState();
                                        sessionState.strategy.create(HostState.this.host.store(), m20parseUnframed.getClientId());
                                        if (m20parseUnframed.hasReceivedMessageIds()) {
                                            Iterator<Integer> it2 = m20parseUnframed.getReceivedMessageIdsList().iterator();
                                            while (it2.hasNext()) {
                                                sessionState.received_message_ids.add(Short.valueOf(it2.next().shortValue()));
                                            }
                                        }
                                        if (m20parseUnframed.hasSubscriptions()) {
                                            for (TopicPB.Getter getter : m20parseUnframed.getSubscriptionsList()) {
                                                SimpleAddress apply = SimpleAddress.apply(getter.getAddress().toString());
                                                sessionState.subscriptions.put(getter.getName(), new Tuple2<>(new Topic(getter.getName(), QoS.values()[getter.getQos()]), apply));
                                            }
                                        }
                                        HostState.this.session_states.put(m20parseUnframed.getClientId(), sessionState);
                                    } catch (InvalidProtocolBufferException e) {
                                        MqttSessionManager.log.warn(e, "Could not load a stored MQTT session", new Object[0]);
                                    }
                                }
                                HostState.this.loaded = true;
                                task.run();
                            }
                        });
                    }
                }));
            } else {
                this.loaded = true;
                task.run();
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSessionManager$SessionState.class */
    static class SessionState {
        SubscriptionAddress durable_sub = null;
        HashMap<UTF8Buffer, Tuple2<Topic, BindAddress>> subscriptions = new HashMap<>();
        HashSet<Short> received_message_ids = new HashSet<>();
        StorageStrategy strategy = new NoopStrategy();

        /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSessionManager$SessionState$NoopStrategy.class */
        class NoopStrategy implements StorageStrategy {
            NoopStrategy() {
            }

            @Override // org.apache.activemq.apollo.mqtt.MqttSessionManager.StorageStrategy
            public void create(Store store, UTF8Buffer uTF8Buffer) {
                if (store != null) {
                    SessionState.this.strategy = new StoreStrategy(store, uTF8Buffer);
                }
            }

            @Override // org.apache.activemq.apollo.mqtt.MqttSessionManager.StorageStrategy
            public void update(Task task) {
                task.run();
            }

            @Override // org.apache.activemq.apollo.mqtt.MqttSessionManager.StorageStrategy
            public void destroy(Task task) {
                task.run();
            }
        }

        /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSessionManager$SessionState$StoreStrategy.class */
        class StoreStrategy implements StorageStrategy {
            public final Store store;
            public final UTF8Buffer client_id;
            public final UTF8Buffer session_key;

            public StoreStrategy(Store store, UTF8Buffer uTF8Buffer) {
                this.store = store;
                this.client_id = uTF8Buffer;
                this.session_key = new UTF8Buffer("mqtt:" + uTF8Buffer);
            }

            @Override // org.apache.activemq.apollo.mqtt.MqttSessionManager.StorageStrategy
            public void create(Store store, UTF8Buffer uTF8Buffer) {
            }

            @Override // org.apache.activemq.apollo.mqtt.MqttSessionManager.StorageStrategy
            public void update(final Task task) {
                StoreUOW create_uow = this.store.create_uow();
                SessionPB.Bean bean = new SessionPB.Bean();
                bean.setClientId(this.client_id);
                Iterator<Short> it = SessionState.this.received_message_ids.iterator();
                while (it.hasNext()) {
                    bean.addReceivedMessageIds(Integer.valueOf(it.next().intValue()));
                }
                for (Tuple2<Topic, BindAddress> tuple2 : SessionState.this.subscriptions.values()) {
                    Topic topic = (Topic) tuple2._1();
                    BindAddress bindAddress = (BindAddress) tuple2._2();
                    TopicPB.Bean bean2 = new TopicPB.Bean();
                    bean2.setName(topic.name());
                    bean2.setQos(topic.qos().ordinal());
                    bean2.setAddress(new UTF8Buffer(bindAddress.toString()));
                    bean.addSubscriptions(bean2);
                }
                create_uow.put(this.session_key, bean.m27freeze().toUnframedBuffer());
                final DispatchQueue currentQueue = Dispatch.getCurrentQueue();
                create_uow.on_complete(Scala2Java.toScala(new UnitFn0() { // from class: org.apache.activemq.apollo.mqtt.MqttSessionManager.SessionState.StoreStrategy.1
                    public void call() {
                        currentQueue.execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSessionManager.SessionState.StoreStrategy.1.1
                            public void run() {
                                task.run();
                            }
                        });
                    }
                }));
                create_uow.release();
            }

            @Override // org.apache.activemq.apollo.mqtt.MqttSessionManager.StorageStrategy
            public void destroy(final Task task) {
                StoreUOW create_uow = this.store.create_uow();
                create_uow.put(this.session_key, (Buffer) null);
                final DispatchQueue currentQueue = Dispatch.getCurrentQueue();
                create_uow.on_complete(Scala2Java.toScala(new UnitFn0() { // from class: org.apache.activemq.apollo.mqtt.MqttSessionManager.SessionState.StoreStrategy.2
                    public void call() {
                        currentQueue.execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSessionManager.SessionState.StoreStrategy.2.1
                            public void run() {
                                SessionState.this.strategy = new NoopStrategy();
                                task.run();
                            }
                        });
                    }
                }));
                create_uow.release();
            }
        }

        SessionState() {
        }
    }

    /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttSessionManager$StorageStrategy.class */
    interface StorageStrategy {
        void update(Task task);

        void destroy(Task task);

        void create(Store store, UTF8Buffer uTF8Buffer);
    }

    public static void attach(final VirtualHost virtualHost, final UTF8Buffer uTF8Buffer, final MqttProtocolHandler mqttProtocolHandler) {
        queue.execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSessionManager.1
            public void run() {
                final HostState hostState = (HostState) virtualHost.plugin_state(Scala2Java.toScala(new Fn0<HostState>() { // from class: org.apache.activemq.apollo.mqtt.MqttSessionManager.1.1
                    /* renamed from: apply, reason: merged with bridge method [inline-methods] */
                    public HostState m12apply() {
                        return new HostState(virtualHost);
                    }
                }), HostState.class);
                hostState.on_load(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSessionManager.1.2
                    public void run() {
                        SessionState sessionState;
                        MqttSession mqttSession = hostState.sessions.get(uTF8Buffer);
                        if (mqttSession != null) {
                            mqttSession.connect(mqttProtocolHandler);
                            return;
                        }
                        if (mqttProtocolHandler.connect_message.cleanSession()) {
                            sessionState = hostState.session_states.remove(uTF8Buffer);
                            if (sessionState == null) {
                                sessionState = new SessionState();
                            }
                        } else {
                            sessionState = hostState.session_states.get(uTF8Buffer);
                            if (sessionState == null) {
                                sessionState = new SessionState();
                                hostState.session_states.put(uTF8Buffer, sessionState);
                            }
                        }
                        MqttSession mqttSession2 = new MqttSession(hostState, uTF8Buffer, sessionState);
                        mqttSession2.connect(mqttProtocolHandler);
                        hostState.sessions.put(uTF8Buffer, mqttSession2);
                    }
                });
            }
        });
    }

    public static void disconnect(final HostState hostState, final UTF8Buffer uTF8Buffer, final MqttProtocolHandler mqttProtocolHandler) {
        queue.execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSessionManager.2
            public void run() {
                MqttSession mqttSession = HostState.this.sessions.get(uTF8Buffer);
                if (mqttSession != null) {
                    mqttSession.disconnect(mqttProtocolHandler);
                }
            }
        });
    }

    public static void remove(final HostState hostState, final UTF8Buffer uTF8Buffer) {
        queue.execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttSessionManager.3
            public void run() {
                HostState.this.sessions.remove(uTF8Buffer);
            }
        });
    }
}
