package org.apache.activemq.apollo.mqtt;

import java.io.IOException;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.activemq.apollo.broker.AbstractSinkFilter;
import org.apache.activemq.apollo.broker.AbstractSinkMapper;
import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.broker.Consumed$;
import org.apache.activemq.apollo.broker.DestinationParser;
import org.apache.activemq.apollo.broker.OverflowSink;
import org.apache.activemq.apollo.broker.Sink;
import org.apache.activemq.apollo.broker.SinkMux;
import org.apache.activemq.apollo.broker.VirtualHost;
import org.apache.activemq.apollo.broker.protocol.AbstractProtocolHandler;
import org.apache.activemq.apollo.broker.protocol.ProtocolFilter3;
import org.apache.activemq.apollo.broker.protocol.ProtocolFilter3$;
import org.apache.activemq.apollo.broker.security.SecurityContext;
import org.apache.activemq.apollo.dto.AcceptingConnectorDTO;
import org.apache.activemq.apollo.dto.ProtocolDTO;
import org.apache.activemq.apollo.mqtt.dto.MqttConnectionStatusDTO;
import org.apache.activemq.apollo.mqtt.dto.MqttDTO;
import org.apache.activemq.apollo.util.Fn0;
import org.apache.activemq.apollo.util.Log$;
import org.apache.activemq.apollo.util.LongCounter;
import org.apache.activemq.apollo.util.Scala2Java;
import org.apache.activemq.apollo.util.UnitFn1;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.Task;
import org.fusesource.hawtdispatch.transport.HeartBeatMonitor;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.MQTTProtocolCodec;
import org.fusesource.mqtt.codec.MessageSupport;

/* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttProtocolHandler.class */
public class MqttProtocolHandler extends AbstractProtocolHandler {
    Scala2Java.Logger connection_log = log;
    MqttDTO config = null;
    final ArrayList<ProtocolFilter3> protocol_filters = new ArrayList<>();
    final SecurityContext security_context = new SecurityContext();
    SinkMux<Request> sink_manager = null;
    Sink<Request> connection_sink = null;
    MQTTProtocolCodec codec = null;
    boolean closed = false;
    Fn0<String> status = WAITING_ON_CLIENT_REQUEST;
    boolean dead = false;
    UnitFn1<Object> command_handler = connect_handler();
    CONNECT connect_message = null;
    HeartBeatMonitor heart_beat_monitor = new HeartBeatMonitor();
    VirtualHost host = null;
    LongCounter messages_sent = new LongCounter(0);
    LongCounter messages_received = new LongCounter(0);
    int subscription_count = 0;
    public static final Scala2Java.Logger log = new Scala2Java.Logger(Log$.MODULE$.apply(MqttProtocolHandler.class));
    static Fn0<String> WAITING_ON_CLIENT_REQUEST = new Fn0<String>() { // from class: org.apache.activemq.apollo.mqtt.MqttProtocolHandler.1
        /* renamed from: apply, reason: merged with bridge method [inline-methods] */
        public String m4apply() {
            return "client request";
        }
    };
    public static UnitFn1<Object> dead_handler = new UnitFn1<Object>() { // from class: org.apache.activemq.apollo.mqtt.MqttProtocolHandler.4
        public void call(Object obj) {
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.activemq.apollo.mqtt.MqttProtocolHandler$11, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttProtocolHandler$11.class */
    public class AnonymousClass11 extends Task {
        final /* synthetic */ CONNACK val$connack;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* renamed from: org.apache.activemq.apollo.mqtt.MqttProtocolHandler$11$1, reason: invalid class name */
        /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttProtocolHandler$11$1.class */
        public class AnonymousClass1 extends Task {
            AnonymousClass1() {
            }

            public void run() {
                MqttProtocolHandler.this.resume_read();
                if (MqttProtocolHandler.this.host == null) {
                    AnonymousClass11.this.val$connack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                    MqttProtocolHandler.this.async_die((MessageSupport.Message) AnonymousClass11.this.val$connack, "Default virtual host not found.");
                    return;
                }
                if (!MqttProtocolHandler.this.host.service_state().is_started()) {
                    AnonymousClass11.this.val$connack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                    MqttProtocolHandler.this.async_die((MessageSupport.Message) AnonymousClass11.this.val$connack, "Default virtual host stopped.");
                    return;
                }
                MqttProtocolHandler.this.connection_log = new Scala2Java.Logger(MqttProtocolHandler.this.host.connection_log());
                if (MqttProtocolHandler.this.host.authenticator() == null || MqttProtocolHandler.this.host.authorizer() == null) {
                    MqttProtocolHandler.this.on_host_connected(MqttProtocolHandler.this.host);
                } else {
                    MqttProtocolHandler.this._suspend_read("authenticating and authorizing connect");
                    MqttProtocolHandler.this.host.authenticator().authenticate(MqttProtocolHandler.this.security_context, Scala2Java.toScala(new UnitFn1<String>() { // from class: org.apache.activemq.apollo.mqtt.MqttProtocolHandler.11.1.1
                        public void call(final String str) {
                            MqttProtocolHandler.this.queue().execute(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttProtocolHandler.11.1.1.1
                                public void run() {
                                    if (str != null) {
                                        AnonymousClass11.this.val$connack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
                                        MqttProtocolHandler.this.async_die((MessageSupport.Message) AnonymousClass11.this.val$connack, str + ". Credentials=" + MqttProtocolHandler.this.security_context.credential_dump());
                                    } else if (!MqttProtocolHandler.this.host.authorizer().can(MqttProtocolHandler.this.security_context, "connect", MqttProtocolHandler.this.connection().connector()).booleanValue()) {
                                        AnonymousClass11.this.val$connack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
                                        MqttProtocolHandler.this.async_die((MessageSupport.Message) AnonymousClass11.this.val$connack, String.format("Not authorized to connect to connector '%s'. Principals=", MqttProtocolHandler.this.connection().connector().id(), MqttProtocolHandler.this.security_context.principal_dump()));
                                    } else if (MqttProtocolHandler.this.host.authorizer().can(MqttProtocolHandler.this.security_context, "connect", MqttProtocolHandler.this.host).booleanValue()) {
                                        MqttProtocolHandler.this.resume_read();
                                        MqttProtocolHandler.this.on_host_connected(MqttProtocolHandler.this.host);
                                    } else {
                                        AnonymousClass11.this.val$connack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
                                        MqttProtocolHandler.this.async_die((MessageSupport.Message) AnonymousClass11.this.val$connack, String.format("Not authorized to connect to virtual host '%s'. Principals=", MqttProtocolHandler.this.host.id(), MqttProtocolHandler.this.security_context.principal_dump()));
                                    }
                                }
                            });
                        }
                    }));
                }
            }
        }

        AnonymousClass11(CONNACK connack) {
            this.val$connack = connack;
        }

        public void run() {
            MqttProtocolHandler.this.host = MqttProtocolHandler.this.connection().connector().broker().get_default_virtual_host();
            MqttProtocolHandler.this.queue().execute(new AnonymousClass1());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/apollo/mqtt/MqttProtocolHandler$Break.class */
    public class Break extends RuntimeException {
        Break() {
        }
    }

    public static <T> T received(T t) {
        log.trace("received: %s", new Object[]{t});
        return t;
    }

    public String protocol() {
        return "mqtt";
    }

    public Broker broker() {
        return connection().connector().broker();
    }

    public DispatchQueue queue() {
        return connection().dispatch_queue();
    }

    public DestinationParser destination_parser() {
        DestinationParser destinationParser = MqttProtocol.destination_parser;
        if (this.config.queue_prefix != null || this.config.path_separator != null || this.config.any_child_wildcard != null || this.config.any_descendant_wildcard != null || this.config.regex_wildcard_start != null || this.config.regex_wildcard_end != null || this.config.part_pattern != null) {
            destinationParser = new DestinationParser().copy(destinationParser);
            if (this.config.queue_prefix != null) {
                destinationParser.queue_prefix_$eq(this.config.queue_prefix);
            }
            if (this.config.path_separator != null) {
                destinationParser.path_separator_$eq(this.config.path_separator);
            }
            if (this.config.any_child_wildcard != null) {
                destinationParser.any_child_wildcard_$eq(this.config.any_child_wildcard);
            }
            if (this.config.any_descendant_wildcard != null) {
                destinationParser.any_descendant_wildcard_$eq(this.config.any_descendant_wildcard);
            }
            if (this.config.regex_wildcard_start != null) {
                destinationParser.regex_wildcard_start_$eq(this.config.regex_wildcard_start);
            }
            if (this.config.regex_wildcard_end != null) {
                destinationParser.regex_wildcard_end_$eq(this.config.regex_wildcard_end);
            }
            if (this.config.part_pattern != null) {
                destinationParser.part_pattern_$eq(Pattern.compile(this.config.part_pattern));
            }
        }
        return destinationParser;
    }

    public String session_id() {
        return this.security_context.session_id();
    }

    private static MqttDTO find_config(AcceptingConnectorDTO acceptingConnectorDTO) {
        for (ProtocolDTO protocolDTO : acceptingConnectorDTO.protocols) {
            if (protocolDTO instanceof MqttDTO) {
                return (MqttDTO) protocolDTO;
            }
        }
        return new MqttDTO();
    }

    public void on_transport_connected() {
        this.codec = connection().transport().getProtocolCodec();
        this.config = find_config(connection().connector().config());
        this.codec.setMaxMessageLength(Scala2Java.get(this.config.max_message_length, this.codec.getMaxMessageLength()));
        this.protocol_filters.clear();
        this.protocol_filters.addAll(ProtocolFilter3$.MODULE$.create_filters(this.config.protocol_filters, this));
        this.security_context.local_address_$eq(connection().transport().getLocalAddress());
        this.security_context.remote_address_$eq(connection().transport().getRemoteAddress());
        this.security_context.connector_id_$eq(connection().connector().id());
        this.security_context.certificates_$eq(connection().certificates());
        this.connection_log = new Scala2Java.Logger(connection().connector().broker().connection_log());
        final Sink sink = new AbstractSinkMapper<Request, Object>() { // from class: org.apache.activemq.apollo.mqtt.MqttProtocolHandler.2
            public Sink<Object> downstream() {
                return MqttProtocolHandler.this.connection().transport_sink();
            }

            public MQTTFrame passing(Request request) {
                MqttProtocolHandler.log.trace("sent: %s", new Object[]{request.message});
                request.delivered = true;
                if (request.id == 0 && request.ack != null) {
                    request.ack.apply(Consumed$.MODULE$);
                }
                return request.frame;
            }
        };
        if (!this.protocol_filters.isEmpty()) {
            sink = new AbstractSinkFilter<Request, Request>() { // from class: org.apache.activemq.apollo.mqtt.MqttProtocolHandler.3
                public Sink<Request> downstream() {
                    return sink;
                }

                public Request filter(Request request) {
                    Request request2 = request;
                    Iterator<ProtocolFilter3> it = MqttProtocolHandler.this.protocol_filters.iterator();
                    while (it.hasNext()) {
                        request2 = (Request) it.next().filter_outbound(request2);
                        if (request2 == null) {
                            break;
                        }
                    }
                    return request2;
                }
            };
        }
        this.sink_manager = new SinkMux<>(sink);
        this.connection_sink = new OverflowSink(this.sink_manager.open());
        resume_read();
    }

    public void on_transport_disconnected() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.dead = true;
        this.command_handler = dead_handler;
        this.security_context.logout(Scala2Java.toScala(new UnitFn1<Throwable>() { // from class: org.apache.activemq.apollo.mqtt.MqttProtocolHandler.5
            public void call(Throwable th) {
                if (th != null) {
                    MqttProtocolHandler.this.connection_log.info(th, "MQTT connection '%s' log out error: %s", new Object[]{MqttProtocolHandler.this.security_context.remote_address(), th.toString()});
                }
            }
        }));
        this.heart_beat_monitor.stop();
        if (!connection().stopped()) {
            connection().stop(Dispatch.NOOP);
        }
        log.trace("mqtt protocol resources released", new Object[0]);
    }

    public void on_transport_failure(IOException iOException) {
        if (this.dead) {
            return;
        }
        this.command_handler.apply("failure");
        this.dead = true;
        this.command_handler = dead_handler;
        if (connection().stopped()) {
            return;
        }
        this.connection_log.info(iOException, "Shutting connection '%s'  down due to: %s", new Object[]{this.security_context.remote_address(), iOException});
        super.on_transport_failure(iOException);
    }

    public void _suspend_read(final String str) {
        suspend_read(new Fn0<String>() { // from class: org.apache.activemq.apollo.mqtt.MqttProtocolHandler.6
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public String m5apply() {
                return str;
            }
        });
    }

    public void suspend_read(Fn0<String> fn0) {
        this.status = fn0;
        connection().transport().suspendRead();
        this.heart_beat_monitor.suspendRead();
    }

    public void resume_read() {
        this.status = WAITING_ON_CLIENT_REQUEST;
        connection().transport().resumeRead();
        this.heart_beat_monitor.resumeRead();
    }

    public long die_delay() {
        return Scala2Java.get(this.config.die_delay, 5000L);
    }

    public void async_die(String str) {
        async_die(str, (Throwable) null);
    }

    public void async_die(String str, Throwable th) {
        try {
            die(str, th);
        } catch (Break e) {
        }
    }

    public void async_die(MessageSupport.Message message, String str) {
        try {
            die(message, str, null);
        } catch (Break e) {
        }
    }

    public <T> T die(String str) {
        return (T) die(null, str, null);
    }

    public <T> T die(String str, Throwable th) {
        return (T) die(null, str, th);
    }

    public <T> T die(MessageSupport.Message message, String str) {
        return (T) die(message, str, null);
    }

    public <T> T die(MessageSupport.Message message, String str, Throwable th) {
        if (th != null) {
            this.connection_log.info(th, "MQTT connection '%s' error: %s", new Object[]{this.security_context.remote_address(), str, th});
        } else {
            this.connection_log.info("MQTT connection '%s' error: %s", new Object[]{this.security_context.remote_address(), str});
        }
        return (T) die(message);
    }

    public <T> T die(MessageSupport.Message message) {
        if (!this.dead) {
            this.command_handler.apply("failure");
            this.dead = true;
            this.command_handler = dead_handler;
            this.status = new Fn0<String>() { // from class: org.apache.activemq.apollo.mqtt.MqttProtocolHandler.7
                /* renamed from: apply, reason: merged with bridge method [inline-methods] */
                public String m6apply() {
                    return "shuting down";
                }
            };
            if (message != null) {
                connection().transport().resumeRead();
                this.connection_sink.offer(new Request((short) 0, message, null));
                queue().executeAfter(die_delay(), TimeUnit.MILLISECONDS, new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttProtocolHandler.8
                    public void run() {
                        MqttProtocolHandler.this.connection().stop(Dispatch.NOOP);
                    }
                });
            } else {
                connection().stop(Dispatch.NOOP);
            }
        }
        throw new Break();
    }

    public void on_transport_command(Object obj) {
        try {
            if (!this.protocol_filters.isEmpty()) {
                Iterator<ProtocolFilter3> it = this.protocol_filters.iterator();
                while (it.hasNext()) {
                    obj = it.next().filter_inbound(obj);
                    if (obj == null) {
                        return;
                    }
                }
            }
            this.command_handler.apply(obj);
        } catch (Break e) {
        } catch (Exception e2) {
            String str = "Internal Server Error: " + e2;
            if (this.connection_log != log) {
                log.warn(e2, str, new Object[0]);
            }
            async_die(str, e2);
        }
    }

    public UnitFn1<Object> connect_handler() {
        return new UnitFn1<Object>() { // from class: org.apache.activemq.apollo.mqtt.MqttProtocolHandler.9
            public void call(Object obj) {
                if (!(obj instanceof MQTTFrame)) {
                    if ("failure".equals(obj)) {
                        return;
                    }
                    MqttProtocolHandler.this.die("Internal Server Error: unexpected mqtt command: " + obj.getClass());
                    return;
                }
                MQTTFrame mQTTFrame = (MQTTFrame) obj;
                try {
                    if (mQTTFrame.messageType() == 1) {
                        MqttProtocolHandler.this.connect_message = (CONNECT) MqttProtocolHandler.received(new CONNECT().decode(mQTTFrame));
                        MqttProtocolHandler.this.on_mqtt_connect();
                    } else {
                        MqttProtocolHandler.this.die("Expecting an MQTT CONNECT message, but got: " + mQTTFrame.getClass());
                    }
                } catch (ProtocolException e) {
                    MqttProtocolHandler.this.die("Internal Server Error: bad mqtt command: " + mQTTFrame);
                }
            }
        };
    }

    public void on_mqtt_connect() {
        CONNACK connack = new CONNACK();
        switch (this.connect_message.version()) {
            case 3:
            case 4:
                break;
            default:
                connack.code(CONNACK.Code.CONNECTION_REFUSED_UNACCEPTED_PROTOCOL_VERSION);
                die((MessageSupport.Message) connack, "Unsupported protocol version: " + this.connect_message.version());
                break;
        }
        if ((this.connect_message.clientId() == null || this.connect_message.clientId().length == 0) && !this.connect_message.cleanSession()) {
            die((MessageSupport.Message) connack, "A clean session must be requested when no client id is provided.");
        }
        UTF8Buffer clientId = this.connect_message.clientId();
        this.security_context.user_$eq(Scala2Java.toString(this.connect_message.userName()));
        this.security_context.password_$eq(Scala2Java.toString(this.connect_message.password()));
        this.security_context.session_id_$eq(clientId.toString());
        final short keepAlive = this.connect_message.keepAlive();
        if (keepAlive > 0) {
            this.heart_beat_monitor.setReadInterval(((long) (keepAlive * 1.5d)) * 1000);
            this.heart_beat_monitor.setOnDead(new Task() { // from class: org.apache.activemq.apollo.mqtt.MqttProtocolHandler.10
                public void run() {
                    MqttProtocolHandler.this.async_die("Missed keep alive set to " + ((int) keepAlive) + " seconds");
                }
            });
        }
        this.heart_beat_monitor.suspendRead();
        this.heart_beat_monitor.setTransport(connection().transport());
        this.heart_beat_monitor.start();
        _suspend_read("virtual host lookup");
        broker().dispatch_queue().execute(new AnonymousClass11(connack));
    }

    public void on_host_connected(VirtualHost virtualHost) {
        MqttSessionManager.attach(virtualHost, this.connect_message.clientId(), this);
    }

    /* renamed from: create_connection_status, reason: merged with bridge method [inline-methods] */
    public MqttConnectionStatusDTO m3create_connection_status(boolean z) {
        MqttConnectionStatusDTO mqttConnectionStatusDTO = new MqttConnectionStatusDTO();
        mqttConnectionStatusDTO.protocol_version = "3.1";
        mqttConnectionStatusDTO.messages_sent = this.messages_sent.get();
        mqttConnectionStatusDTO.messages_received = this.messages_received.get();
        mqttConnectionStatusDTO.subscription_count = this.subscription_count;
        mqttConnectionStatusDTO.waiting_on = (String) this.status.apply();
        return mqttConnectionStatusDTO;
    }
}
