/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.client.internal.mqtt.handler.subscribe;

import com.hivemq.client.internal.annotations.CallByThread;
import com.hivemq.client.internal.logging.InternalLogger;
import com.hivemq.client.internal.logging.InternalLoggerFactory;
import com.hivemq.client.internal.mqtt.MqttClientConnectionConfig;
import com.hivemq.client.internal.mqtt.exceptions.MqttClientStateExceptions;
import com.hivemq.client.internal.mqtt.handler.MqttSessionAwareHandler;
import com.hivemq.client.internal.mqtt.handler.disconnect.MqttDisconnectUtil;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishFlows;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttSubscribedPublishFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubOrUnsubAckFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubOrUnsubWithFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubscribeWithFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubscriptionFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttUnsubscribeWithFlow;
import com.hivemq.client.internal.mqtt.handler.util.FlowWithEventLoop;
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
import com.hivemq.client.internal.mqtt.message.MqttCommonReasonCode;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttStatefulSubscribe;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribe;
import com.hivemq.client.internal.mqtt.message.subscribe.suback.MqttSubAck;
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttStatefulUnsubscribe;
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribe;
import com.hivemq.client.internal.mqtt.message.unsubscribe.unsuback.MqttUnsubAck;
import com.hivemq.client.internal.mqtt.message.unsubscribe.unsuback.mqtt3.Mqtt3UnsubAckView;
import com.hivemq.client.internal.util.Ranges;
import com.hivemq.client.internal.util.collections.ImmutableList;
import com.hivemq.client.internal.util.collections.IntMap;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5UnsubAckException;
import com.hivemq.client.mqtt.mqtt5.message.Mqtt5ReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import org.jctools.queues.MpscLinkedQueue;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@ClientScope
public class MqttSubscriptionHandler
extends MqttSessionAwareHandler
implements Runnable {
    @NotNull
    public static final String NAME = "subscription";
    public static final int MAX_SUB_PENDING = 10;
    @NotNull
    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(MqttSubscriptionHandler.class);
    @NotNull
    private final MqttIncomingPublishFlows incomingPublishFlows;
    @NotNull
    private final MpscLinkedQueue<MqttSubOrUnsubWithFlow> queued = MpscLinkedQueue.newMpscLinkedQueue();
    @NotNull
    private final AtomicInteger queuedCounter = new AtomicInteger();
    @NotNull
    private final IntMap<MqttSubOrUnsubWithFlow.Stateful> pending;
    @Nullable
    private MqttSubOrUnsubWithFlow.Stateful firstPending;
    @Nullable
    private MqttSubOrUnsubWithFlow.Stateful lastPending;
    @Nullable
    private MqttSubOrUnsubWithFlow.Stateful resendPending;
    @Nullable
    private MqttSubOrUnsubWithFlow.Stateful currentPending;
    @NotNull
    private final Ranges packetIdentifiers;
    @Nullable
    private Ranges subscriptionIdentifiers;

    @Inject
    MqttSubscriptionHandler(@NotNull MqttIncomingPublishFlows incomingPublishFlows) {
        this.incomingPublishFlows = incomingPublishFlows;
        int maxPacketIdentifier = 65535;
        int minPacketIdentifier = 65526;
        this.pending = IntMap.range(65526, 65535);
        this.packetIdentifiers = new Ranges(65526, 65535);
    }

    @Override
    public void onSessionStartOrResume(@NotNull MqttClientConnectionConfig connectionConfig, @NotNull EventLoop eventLoop) {
        super.onSessionStartOrResume(connectionConfig, eventLoop);
        if (connectionConfig.areSubscriptionIdentifiersAvailable() && this.subscriptionIdentifiers == null) {
            this.subscriptionIdentifiers = new Ranges(1, 0xFFFFFFF);
        }
        if (this.firstPending != null || this.queuedCounter.get() > 0) {
            this.resendPending = this.firstPending;
            eventLoop.execute((Runnable)this);
        }
    }

    public void subscribe(@NotNull MqttSubscribe subscribe, @NotNull MqttSubscriptionFlow<MqttSubAck> flow) {
        this.queued.offer((Object)new MqttSubscribeWithFlow(subscribe, flow));
        this.execute(flow.getEventLoop());
    }

    public void unsubscribe(@NotNull MqttUnsubscribe unsubscribe, @NotNull MqttSubOrUnsubAckFlow<MqttUnsubAck> flow) {
        this.queued.offer((Object)new MqttUnsubscribeWithFlow(unsubscribe, flow));
        this.execute(flow.getEventLoop());
    }

    private void execute(@NotNull EventLoop eventLoop) {
        if (this.queuedCounter.getAndIncrement() == 0) {
            eventLoop.execute((Runnable)this);
        }
    }

    @Override
    @CallByThread(value="Netty EventLoop")
    public void run() {
        if (!this.hasSession) {
            this.clearQueued(MqttClientStateExceptions.notConnected());
            return;
        }
        ChannelHandlerContext ctx = this.ctx;
        if (ctx == null) {
            return;
        }
        while (this.resendPending != null) {
            if (this.resendPending instanceof MqttSubscribeWithFlow.Stateful) {
                this.writeSubscribe(ctx, (MqttSubscribeWithFlow.Stateful)this.resendPending);
            } else {
                this.writeUnsubscribe(ctx, (MqttUnsubscribeWithFlow.Stateful)this.resendPending);
            }
            this.resendPending = this.resendPending.next;
        }
        int removedFromQueue = 0;
        while (true) {
            if (this.pending.size() == 10) {
                this.queuedCounter.getAndAdd(-removedFromQueue);
                return;
            }
            MqttSubOrUnsubWithFlow subOrUnsubWithFlow = (MqttSubOrUnsubWithFlow)this.queued.poll();
            if (subOrUnsubWithFlow == null) {
                if (this.queuedCounter.addAndGet(-removedFromQueue) == 0) {
                    return;
                }
                removedFromQueue = 0;
                continue;
            }
            int packetIdentifier = this.packetIdentifiers.getId();
            if (packetIdentifier == -1) {
                LOGGER.error("No Packet Identifier available for (UN)SUBSCRIBE. This must not happen and is a bug.");
                return;
            }
            this.writeSubscribeOrUnsubscribe(ctx, subOrUnsubWithFlow, packetIdentifier);
            ++removedFromQueue;
        }
    }

    private void writeSubscribeOrUnsubscribe(@NotNull ChannelHandlerContext ctx, @NotNull MqttSubOrUnsubWithFlow subOrUnsubWithFlow, int packetIdentifier) {
        if (!subOrUnsubWithFlow.getFlow().init()) {
            return;
        }
        if (subOrUnsubWithFlow instanceof MqttSubscribeWithFlow) {
            MqttSubscribeWithFlow subscribeWithFlow = (MqttSubscribeWithFlow)subOrUnsubWithFlow;
            int subscriptionIdentifier = this.subscriptionIdentifiers != null ? this.subscriptionIdentifiers.getId() : -1;
            MqttStatefulSubscribe statefulSubscribe = subscribeWithFlow.getMessage().createStateful(packetIdentifier, subscriptionIdentifier);
            MqttSubscribeWithFlow.Stateful statefulSubscribeWithFlow = new MqttSubscribeWithFlow.Stateful(statefulSubscribe, subscribeWithFlow.getFlow());
            this.addPending(statefulSubscribeWithFlow);
            if (this.writeSubscribe(ctx, statefulSubscribeWithFlow)) {
                this.incomingPublishFlows.subscribe(statefulSubscribe, statefulSubscribeWithFlow.getPublishFlow());
            }
        } else {
            MqttUnsubscribeWithFlow unsubscribeWithFlow = (MqttUnsubscribeWithFlow)subOrUnsubWithFlow;
            MqttStatefulUnsubscribe statefulUnsubscribe = unsubscribeWithFlow.getMessage().createStateful(packetIdentifier);
            MqttUnsubscribeWithFlow.Stateful statefulUnsubscribeWithFlow = new MqttUnsubscribeWithFlow.Stateful(statefulUnsubscribe, (MqttSubOrUnsubAckFlow<MqttUnsubAck>)unsubscribeWithFlow.getFlow());
            this.addPending(statefulUnsubscribeWithFlow);
            this.writeUnsubscribe(ctx, statefulUnsubscribeWithFlow);
        }
    }

    private void addPending(@NotNull MqttSubOrUnsubWithFlow.Stateful newPending) {
        this.pending.put(newPending.getMessage().getPacketIdentifier(), newPending);
        MqttSubOrUnsubWithFlow.Stateful lastPending = this.lastPending;
        if (lastPending == null) {
            this.firstPending = this.lastPending = newPending;
        } else {
            lastPending.next = newPending;
            newPending.prev = lastPending;
            this.lastPending = newPending;
        }
    }

    private boolean writeSubscribe(@NotNull ChannelHandlerContext ctx, @NotNull MqttSubscribeWithFlow.Stateful statefulSubscribeWithFlow) {
        MqttStatefulSubscribe statefulSubscribe = statefulSubscribeWithFlow.getMessage();
        this.currentPending = statefulSubscribeWithFlow;
        ctx.writeAndFlush((Object)statefulSubscribe, ctx.voidPromise());
        if (this.currentPending == null) {
            return false;
        }
        this.currentPending = null;
        return true;
    }

    private void writeUnsubscribe(@NotNull ChannelHandlerContext ctx, @NotNull MqttUnsubscribeWithFlow.Stateful statefulUnsubscribeWithFlow) {
        this.currentPending = statefulUnsubscribeWithFlow;
        ctx.writeAndFlush((Object)statefulUnsubscribeWithFlow.getMessage(), ctx.voidPromise());
        this.currentPending = null;
    }

    public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg) {
        if (msg instanceof MqttSubAck) {
            this.readSubAck(ctx, (MqttSubAck)msg);
        } else if (msg instanceof MqttUnsubAck) {
            this.readUnsubAck(ctx, (MqttUnsubAck)msg);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private void readSubAck(@NotNull ChannelHandlerContext ctx, @NotNull MqttSubAck subAck) {
        int packetIdentifier = subAck.getPacketIdentifier();
        MqttSubOrUnsubWithFlow.Stateful statefulSubOrUnsubWithFlow = this.pending.remove(packetIdentifier);
        if (statefulSubOrUnsubWithFlow == null) {
            MqttDisconnectUtil.disconnect(ctx.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "Unknown packet identifier for SUBACK");
            return;
        }
        if (!(statefulSubOrUnsubWithFlow instanceof MqttSubscribeWithFlow.Stateful)) {
            MqttDisconnectUtil.disconnect(ctx.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "SUBACK received for an UNSUBSCRIBE");
            return;
        }
        MqttSubscribeWithFlow.Stateful statefulSubscribeWithFlow = (MqttSubscribeWithFlow.Stateful)statefulSubOrUnsubWithFlow;
        MqttStatefulSubscribe subscribe = statefulSubscribeWithFlow.getMessage();
        MqttSubscriptionFlow<MqttSubAck> flow = statefulSubscribeWithFlow.getFlow();
        List reasonCodes = subAck.getReasonCodes();
        boolean countNotMatching = ((MqttSubscribe)subscribe.stateless()).getSubscriptions().size() != reasonCodes.size();
        boolean allErrors = MqttCommonReasonCode.allErrors((ImmutableList<? extends Mqtt5ReasonCode>)subAck.getReasonCodes());
        this.incomingPublishFlows.subAck(subscribe, subAck, statefulSubscribeWithFlow.getPublishFlow());
        if (!countNotMatching && !allErrors) {
            if (!flow.isCancelled()) {
                flow.onSuccess(subAck);
            } else {
                LOGGER.warn("Subscribe was successful but the SubAck flow has been cancelled");
            }
        } else {
            String errorMessage = countNotMatching ? "Count of Reason Codes in SUBACK does not match count of subscriptions in SUBSCRIBE" : "SUBACK contains only Error Codes";
            if (!flow.isCancelled()) {
                flow.onError(new Mqtt5SubAckException(subAck, errorMessage));
            } else {
                LOGGER.warn(errorMessage + " but the SubAck flow has been cancelled");
            }
        }
        this.completePending(ctx, statefulSubscribeWithFlow);
    }

    private void readUnsubAck(@NotNull ChannelHandlerContext ctx, @NotNull MqttUnsubAck unsubAck) {
        int packetIdentifier = unsubAck.getPacketIdentifier();
        MqttSubOrUnsubWithFlow.Stateful statefulSubOrUnsubWithFlow = this.pending.remove(packetIdentifier);
        if (statefulSubOrUnsubWithFlow == null) {
            MqttDisconnectUtil.disconnect(ctx.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "Unknown packet identifier for UNSUBACK");
            return;
        }
        if (!(statefulSubOrUnsubWithFlow instanceof MqttUnsubscribeWithFlow.Stateful)) {
            MqttDisconnectUtil.disconnect(ctx.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "UNSUBACK received for a SUBSCRIBE");
            return;
        }
        MqttUnsubscribeWithFlow.Stateful statefulUnsubscribeWithFlow = (MqttUnsubscribeWithFlow.Stateful)statefulSubOrUnsubWithFlow;
        MqttStatefulUnsubscribe unsubscribe = statefulUnsubscribeWithFlow.getMessage();
        MqttSubscriptionFlow flow = statefulUnsubscribeWithFlow.getFlow();
        List reasonCodes = unsubAck.getReasonCodes();
        boolean countNotMatching = ((MqttUnsubscribe)unsubscribe.stateless()).getTopicFilters().size() != reasonCodes.size();
        boolean allErrors = MqttCommonReasonCode.allErrors((ImmutableList<? extends Mqtt5ReasonCode>)unsubAck.getReasonCodes());
        if (reasonCodes == Mqtt3UnsubAckView.REASON_CODES_ALL_SUCCESS || !countNotMatching && !allErrors) {
            this.incomingPublishFlows.unsubscribe(unsubscribe, unsubAck);
            if (!((FlowWithEventLoop)((Object)flow)).isCancelled()) {
                ((MqttSubOrUnsubAckFlow)flow).onSuccess(unsubAck);
            } else {
                LOGGER.warn("Unsubscribe was successful but the UnsubAck flow has been cancelled");
            }
        } else {
            String errorMessage = countNotMatching ? "Count of Reason Codes in UNSUBACK does not match count of Topic Filters in UNSUBSCRIBE" : "UNSUBACK contains only Error Codes";
            if (!((FlowWithEventLoop)((Object)flow)).isCancelled()) {
                ((MqttSubOrUnsubAckFlow)flow).onError(new Mqtt5UnsubAckException(unsubAck, errorMessage));
            } else {
                LOGGER.warn(errorMessage + " but the UnsubAck flow has been cancelled");
            }
        }
        this.completePending(ctx, statefulUnsubscribeWithFlow);
    }

    private void completePending(@NotNull ChannelHandlerContext ctx, @NotNull MqttSubOrUnsubWithFlow.Stateful oldPending) {
        MqttSubOrUnsubWithFlow.Stateful prev = oldPending.prev;
        MqttSubOrUnsubWithFlow.Stateful next = oldPending.next;
        if (prev == null) {
            this.firstPending = next;
        } else {
            prev.next = next;
        }
        if (next == null) {
            this.lastPending = prev;
        } else {
            next.prev = prev;
        }
        int packetIdentifier = oldPending.getMessage().getPacketIdentifier();
        MqttSubOrUnsubWithFlow subOrUnsubWithFlow = (MqttSubOrUnsubWithFlow)this.queued.poll();
        if (subOrUnsubWithFlow == null) {
            this.packetIdentifiers.returnId(packetIdentifier);
        } else {
            this.queuedCounter.getAndDecrement();
            this.writeSubscribeOrUnsubscribe(ctx, subOrUnsubWithFlow, packetIdentifier);
        }
    }

    public void exceptionCaught(@NotNull ChannelHandlerContext ctx, @NotNull Throwable cause) {
        if (!(cause instanceof IOException) && this.currentPending != null) {
            this.pending.remove(this.currentPending.getMessage().getPacketIdentifier());
            this.currentPending.getFlow().onError(cause);
            this.completePending(ctx, this.currentPending);
            this.currentPending = null;
        } else {
            ctx.fireExceptionCaught(cause);
        }
    }

    @Override
    public void onSessionEnd(@NotNull Throwable cause) {
        super.onSessionEnd(cause);
        MqttSubOrUnsubWithFlow.Stateful current = this.firstPending;
        while (current != null) {
            this.packetIdentifiers.returnId(current.getMessage().getPacketIdentifier());
            if (!(current.getFlow() instanceof MqttSubscribedPublishFlow)) {
                current.getFlow().onError(cause);
            }
            current = current.next;
        }
        this.pending.clear();
        this.resendPending = null;
        this.lastPending = null;
        this.firstPending = null;
        this.subscriptionIdentifiers = null;
        this.clearQueued(cause);
    }

    private void clearQueued(@NotNull Throwable cause) {
        int polled = 0;
        while (true) {
            MqttSubOrUnsubWithFlow subOrUnsubWithFlow;
            if ((subOrUnsubWithFlow = (MqttSubOrUnsubWithFlow)this.queued.poll()) == null) {
                if (this.queuedCounter.addAndGet(-polled) == 0) break;
                polled = 0;
                continue;
            }
            if (subOrUnsubWithFlow.getFlow().init()) {
                subOrUnsubWithFlow.getFlow().onError(cause);
            }
            ++polled;
        }
    }
}

