/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.client.internal.mqtt;

import com.hivemq.client.internal.mqtt.MqttAsyncClient;
import com.hivemq.client.internal.mqtt.MqttBlockingClient;
import com.hivemq.client.internal.mqtt.MqttClientConfig;
import com.hivemq.client.internal.mqtt.handler.auth.MqttReAuthCompletable;
import com.hivemq.client.internal.mqtt.handler.connect.MqttConnAckSingle;
import com.hivemq.client.internal.mqtt.handler.disconnect.MqttDisconnectCompletable;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttGlobalIncomingPublishFlowable;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttSubscribedPublishFlowable;
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckFlowable;
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingle;
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingleFlowable;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubAckSingle;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttUnsubAckSingle;
import com.hivemq.client.internal.mqtt.message.connect.MqttConnect;
import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnect;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribe;
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribe;
import com.hivemq.client.internal.mqtt.util.MqttChecks;
import com.hivemq.client.internal.util.Checks;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt5.Mqtt5RxClient;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5Disconnect;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
import com.hivemq.client.rx.FlowableWithSingle;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.functions.Function;
import io.reactivex.internal.fuseable.ScalarCallable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class MqttRxClient
implements Mqtt5RxClient {
    @NotNull
    private static final Function<Mqtt5Publish, MqttPublish> PUBLISH_MAPPER = MqttChecks::publish;
    @NotNull
    private final MqttClientConfig clientConfig;

    public MqttRxClient(@NotNull MqttClientConfig clientConfig) {
        this.clientConfig = clientConfig;
    }

    @Override
    @NotNull
    public Single<Mqtt5ConnAck> connect(@Nullable Mqtt5Connect connect) {
        return this.connect(MqttChecks.connect(connect));
    }

    @NotNull
    Single<Mqtt5ConnAck> connect(@NotNull MqttConnect connect) {
        return this.connectUnsafe(connect).observeOn(this.clientConfig.getExecutorConfig().getApplicationScheduler());
    }

    @NotNull
    Single<Mqtt5ConnAck> connectUnsafe(@NotNull MqttConnect connect) {
        return new MqttConnAckSingle(this.clientConfig, connect);
    }

    @Override
    @NotNull
    public Single<Mqtt5SubAck> subscribe(@Nullable Mqtt5Subscribe subscribe) {
        return this.subscribe(MqttChecks.subscribe(subscribe));
    }

    @NotNull
    Single<Mqtt5SubAck> subscribe(@NotNull MqttSubscribe subscribe) {
        return this.subscribeUnsafe(subscribe).observeOn(this.clientConfig.getExecutorConfig().getApplicationScheduler());
    }

    @NotNull
    Single<Mqtt5SubAck> subscribeUnsafe(@NotNull MqttSubscribe subscribe) {
        return new MqttSubAckSingle(subscribe, this.clientConfig);
    }

    @Override
    @NotNull
    public FlowableWithSingle<Mqtt5Publish, Mqtt5SubAck> subscribeStream(@Nullable Mqtt5Subscribe subscribe) {
        return this.subscribeStream(MqttChecks.subscribe(subscribe));
    }

    @NotNull
    FlowableWithSingle<Mqtt5Publish, Mqtt5SubAck> subscribeStream(@NotNull MqttSubscribe subscribe) {
        return this.subscribeStreamUnsafe(subscribe).observeOnBoth(this.clientConfig.getExecutorConfig().getApplicationScheduler(), true);
    }

    @NotNull
    FlowableWithSingle<Mqtt5Publish, Mqtt5SubAck> subscribeStreamUnsafe(@NotNull MqttSubscribe subscribe) {
        return new MqttSubscribedPublishFlowable(subscribe, this.clientConfig);
    }

    @Override
    @NotNull
    public Flowable<Mqtt5Publish> publishes(@Nullable MqttGlobalPublishFilter filter) {
        Checks.notNull(filter, "Global publish filter");
        return this.publishesUnsafe(filter).observeOn(this.clientConfig.getExecutorConfig().getApplicationScheduler(), true);
    }

    @NotNull
    Flowable<Mqtt5Publish> publishesUnsafe(@NotNull MqttGlobalPublishFilter filter) {
        return new MqttGlobalIncomingPublishFlowable(filter, this.clientConfig);
    }

    @Override
    @NotNull
    public Single<Mqtt5UnsubAck> unsubscribe(@Nullable Mqtt5Unsubscribe unsubscribe) {
        return this.unsubscribe(MqttChecks.unsubscribe(unsubscribe));
    }

    @NotNull
    Single<Mqtt5UnsubAck> unsubscribe(@NotNull MqttUnsubscribe unsubscribe) {
        return this.unsubscribeUnsafe(unsubscribe).observeOn(this.clientConfig.getExecutorConfig().getApplicationScheduler());
    }

    @NotNull
    Single<Mqtt5UnsubAck> unsubscribeUnsafe(@NotNull MqttUnsubscribe unsubscribe) {
        return new MqttUnsubAckSingle(unsubscribe, this.clientConfig);
    }

    @NotNull
    Single<Mqtt5PublishResult> publish(@NotNull MqttPublish publish) {
        return this.publishUnsafe(publish).observeOn(this.clientConfig.getExecutorConfig().getApplicationScheduler());
    }

    @NotNull
    Single<Mqtt5PublishResult> publishUnsafe(@NotNull MqttPublish publish) {
        return new MqttAckSingle(this.clientConfig, publish);
    }

    @Override
    @NotNull
    public Flowable<Mqtt5PublishResult> publish(@Nullable Flowable<Mqtt5Publish> publishFlowable) {
        Checks.notNull(publishFlowable, "Publish flowable");
        return this.publish(publishFlowable, PUBLISH_MAPPER);
    }

    @NotNull
    public <P> Flowable<Mqtt5PublishResult> publish(@NotNull Flowable<P> publishFlowable, @NotNull Function<P, MqttPublish> publishMapper) {
        Scheduler applicationScheduler = this.clientConfig.getExecutorConfig().getApplicationScheduler();
        if (publishFlowable instanceof ScalarCallable) {
            MqttPublish mqttPublish;
            Object publish = ((ScalarCallable)publishFlowable).call();
            if (publish == null) {
                return Flowable.empty();
            }
            try {
                mqttPublish = (MqttPublish)publishMapper.apply(publish);
            }
            catch (Throwable t) {
                return Flowable.error((Throwable)t);
            }
            return new MqttAckSingleFlowable(this.clientConfig, mqttPublish).observeOn(applicationScheduler, true);
        }
        return new MqttAckFlowable(this.clientConfig, (Flowable<MqttPublish>)publishFlowable.subscribeOn(applicationScheduler).map(publishMapper)).observeOn(applicationScheduler, true);
    }

    @Override
    @NotNull
    public Completable reauth() {
        return this.reauthUnsafe().observeOn(this.clientConfig.getExecutorConfig().getApplicationScheduler());
    }

    @NotNull
    Completable reauthUnsafe() {
        return new MqttReAuthCompletable(this.clientConfig);
    }

    @Override
    @NotNull
    public Completable disconnect(@Nullable Mqtt5Disconnect disconnect) {
        return this.disconnect(MqttChecks.disconnect(disconnect));
    }

    @NotNull
    Completable disconnect(@NotNull MqttDisconnect disconnect) {
        return this.disconnectUnsafe(disconnect).observeOn(this.clientConfig.getExecutorConfig().getApplicationScheduler());
    }

    @NotNull
    Completable disconnectUnsafe(@NotNull MqttDisconnect disconnect) {
        return new MqttDisconnectCompletable(this.clientConfig, disconnect);
    }

    @Override
    @NotNull
    public MqttClientConfig getConfig() {
        return this.clientConfig;
    }

    @Override
    @NotNull
    public MqttAsyncClient toAsync() {
        return new MqttAsyncClient(this);
    }

    @Override
    @NotNull
    public MqttBlockingClient toBlocking() {
        return new MqttBlockingClient(this);
    }
}

