package com.hivemq.client.internal.mqtt.handler.subscribe;

import com.hivemq.client.internal.annotations.CallByThread;
import com.hivemq.client.internal.logging.InternalLogger;
import com.hivemq.client.internal.logging.InternalLoggerFactory;
import com.hivemq.client.internal.mqtt.MqttClientConnectionConfig;
import com.hivemq.client.internal.mqtt.datatypes.MqttVariableByteInteger;
import com.hivemq.client.internal.mqtt.exceptions.MqttClientStateExceptions;
import com.hivemq.client.internal.mqtt.handler.MqttSessionAwareHandler;
import com.hivemq.client.internal.mqtt.handler.disconnect.MqttDisconnectUtil;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishFlows;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttSubscribedPublishFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubOrUnsubWithFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubscribeWithFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttUnsubscribeWithFlow;
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
import com.hivemq.client.internal.mqtt.message.MqttCommonReasonCode;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttStatefulSubscribe;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribe;
import com.hivemq.client.internal.mqtt.message.subscribe.suback.MqttSubAck;
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttStatefulUnsubscribe;
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribe;
import com.hivemq.client.internal.mqtt.message.unsubscribe.unsuback.MqttUnsubAck;
import com.hivemq.client.internal.mqtt.message.unsubscribe.unsuback.mqtt3.Mqtt3UnsubAckView;
import com.hivemq.client.internal.util.Ranges;
import com.hivemq.client.internal.util.collections.ImmutableList;
import com.hivemq.client.internal.util.collections.IntMap;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5UnsubAckException;
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAckReasonCode;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import org.jctools.queues.MpscLinkedQueue;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@ClientScope
/* loaded from: input_file:com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubscriptionHandler.class */
public class MqttSubscriptionHandler extends MqttSessionAwareHandler implements Runnable {

    @NotNull
    public static final String NAME = "subscription";
    public static final int MAX_SUB_PENDING = 10;

    @NotNull
    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(MqttSubscriptionHandler.class);

    @NotNull
    private final MqttIncomingPublishFlows incomingPublishFlows;

    @Nullable
    private MqttSubOrUnsubWithFlow.Stateful firstPending;

    @Nullable
    private MqttSubOrUnsubWithFlow.Stateful lastPending;

    @Nullable
    private MqttSubOrUnsubWithFlow.Stateful resendPending;

    @Nullable
    private MqttSubOrUnsubWithFlow.Stateful currentPending;

    @Nullable
    private Ranges subscriptionIdentifiers;

    @NotNull
    private final MpscLinkedQueue<MqttSubOrUnsubWithFlow> queued = MpscLinkedQueue.newMpscLinkedQueue();

    @NotNull
    private final AtomicInteger queuedCounter = new AtomicInteger();

    @NotNull
    private final IntMap<MqttSubOrUnsubWithFlow.Stateful> pending = IntMap.range(65526, 65535);

    @NotNull
    private final Ranges packetIdentifiers = new Ranges(65526, 65535);

    /* JADX INFO: Access modifiers changed from: package-private */
    @Inject
    public MqttSubscriptionHandler(@NotNull MqttIncomingPublishFlows mqttIncomingPublishFlows) {
        this.incomingPublishFlows = mqttIncomingPublishFlows;
    }

    @Override // com.hivemq.client.internal.mqtt.handler.MqttSessionAwareHandler
    public void onSessionStartOrResume(@NotNull MqttClientConnectionConfig mqttClientConnectionConfig, @NotNull EventLoop eventLoop) {
        super.onSessionStartOrResume(mqttClientConnectionConfig, eventLoop);
        if (mqttClientConnectionConfig.areSubscriptionIdentifiersAvailable() && this.subscriptionIdentifiers == null) {
            this.subscriptionIdentifiers = new Ranges(1, MqttVariableByteInteger.FOUR_BYTES_MAX_VALUE);
        }
        if (this.firstPending != null || this.queuedCounter.get() > 0) {
            this.resendPending = this.firstPending;
            eventLoop.execute(this);
        }
    }

    public void subscribe(@NotNull MqttSubscribe mqttSubscribe, @NotNull MqttSubscriptionFlow<MqttSubAck> mqttSubscriptionFlow) {
        this.queued.offer(new MqttSubscribeWithFlow(mqttSubscribe, mqttSubscriptionFlow));
        execute(mqttSubscriptionFlow.getEventLoop());
    }

    public void unsubscribe(@NotNull MqttUnsubscribe mqttUnsubscribe, @NotNull MqttSubOrUnsubAckFlow<MqttUnsubAck> mqttSubOrUnsubAckFlow) {
        this.queued.offer(new MqttUnsubscribeWithFlow(mqttUnsubscribe, mqttSubOrUnsubAckFlow));
        execute(mqttSubOrUnsubAckFlow.getEventLoop());
    }

    private void execute(@NotNull EventLoop eventLoop) {
        if (this.queuedCounter.getAndIncrement() == 0) {
            eventLoop.execute(this);
        }
    }

    @Override // java.lang.Runnable
    @CallByThread("Netty EventLoop")
    public void run() {
        if (!this.hasSession) {
            clearQueued(MqttClientStateExceptions.notConnected());
            return;
        }
        ChannelHandlerContext channelHandlerContext = this.ctx;
        if (channelHandlerContext == null) {
            return;
        }
        while (this.resendPending != null) {
            if (this.resendPending instanceof MqttSubscribeWithFlow.Stateful) {
                writeSubscribe(channelHandlerContext, (MqttSubscribeWithFlow.Stateful) this.resendPending);
            } else {
                writeUnsubscribe(channelHandlerContext, (MqttUnsubscribeWithFlow.Stateful) this.resendPending);
            }
            this.resendPending = this.resendPending.next;
        }
        int i = 0;
        while (this.pending.size() != 10) {
            MqttSubOrUnsubWithFlow mqttSubOrUnsubWithFlow = (MqttSubOrUnsubWithFlow) this.queued.poll();
            if (mqttSubOrUnsubWithFlow != null) {
                int id = this.packetIdentifiers.getId();
                if (id == -1) {
                    LOGGER.error("No Packet Identifier available for (UN)SUBSCRIBE. This must not happen and is a bug.");
                    return;
                } else {
                    writeSubscribeOrUnsubscribe(channelHandlerContext, mqttSubOrUnsubWithFlow, id);
                    i++;
                }
            } else if (this.queuedCounter.addAndGet(-i) == 0) {
                return;
            } else {
                i = 0;
            }
        }
        this.queuedCounter.getAndAdd(-i);
    }

    private void writeSubscribeOrUnsubscribe(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttSubOrUnsubWithFlow mqttSubOrUnsubWithFlow, int i) {
        if (mqttSubOrUnsubWithFlow.getFlow().init()) {
            if (!(mqttSubOrUnsubWithFlow instanceof MqttSubscribeWithFlow)) {
                MqttUnsubscribeWithFlow mqttUnsubscribeWithFlow = (MqttUnsubscribeWithFlow) mqttSubOrUnsubWithFlow;
                MqttUnsubscribeWithFlow.Stateful stateful = new MqttUnsubscribeWithFlow.Stateful(mqttUnsubscribeWithFlow.getMessage().createStateful(i), mqttUnsubscribeWithFlow.getFlow());
                addPending(stateful);
                writeUnsubscribe(channelHandlerContext, stateful);
                return;
            }
            MqttSubscribeWithFlow mqttSubscribeWithFlow = (MqttSubscribeWithFlow) mqttSubOrUnsubWithFlow;
            MqttStatefulSubscribe createStateful = mqttSubscribeWithFlow.getMessage().createStateful(i, this.subscriptionIdentifiers != null ? this.subscriptionIdentifiers.getId() : -1);
            MqttSubscribeWithFlow.Stateful stateful2 = new MqttSubscribeWithFlow.Stateful(createStateful, mqttSubscribeWithFlow.getFlow());
            addPending(stateful2);
            if (writeSubscribe(channelHandlerContext, stateful2)) {
                this.incomingPublishFlows.subscribe(createStateful, stateful2.getPublishFlow());
            }
        }
    }

    private void addPending(@NotNull MqttSubOrUnsubWithFlow.Stateful stateful) {
        this.pending.put(stateful.getMessage().getPacketIdentifier(), stateful);
        MqttSubOrUnsubWithFlow.Stateful stateful2 = this.lastPending;
        if (stateful2 == null) {
            this.lastPending = stateful;
            this.firstPending = stateful;
        } else {
            stateful2.next = stateful;
            stateful.prev = stateful2;
            this.lastPending = stateful;
        }
    }

    private boolean writeSubscribe(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttSubscribeWithFlow.Stateful stateful) {
        MqttStatefulSubscribe message = stateful.getMessage();
        this.currentPending = stateful;
        channelHandlerContext.writeAndFlush(message, channelHandlerContext.voidPromise());
        if (this.currentPending == null) {
            return false;
        }
        this.currentPending = null;
        return true;
    }

    private void writeUnsubscribe(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttUnsubscribeWithFlow.Stateful stateful) {
        this.currentPending = stateful;
        channelHandlerContext.writeAndFlush(stateful.getMessage(), channelHandlerContext.voidPromise());
        this.currentPending = null;
    }

    public void channelRead(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull Object obj) {
        if (obj instanceof MqttSubAck) {
            readSubAck(channelHandlerContext, (MqttSubAck) obj);
        } else if (obj instanceof MqttUnsubAck) {
            readUnsubAck(channelHandlerContext, (MqttUnsubAck) obj);
        } else {
            channelHandlerContext.fireChannelRead(obj);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void readSubAck(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttSubAck mqttSubAck) {
        MqttSubOrUnsubWithFlow.Stateful remove = this.pending.remove(mqttSubAck.getPacketIdentifier());
        if (remove == null) {
            MqttDisconnectUtil.disconnect(channelHandlerContext.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "Unknown packet identifier for SUBACK");
            return;
        }
        if (!(remove instanceof MqttSubscribeWithFlow.Stateful)) {
            MqttDisconnectUtil.disconnect(channelHandlerContext.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "SUBACK received for an UNSUBSCRIBE");
            return;
        }
        MqttSubscribeWithFlow.Stateful stateful = (MqttSubscribeWithFlow.Stateful) remove;
        MqttStatefulSubscribe message = stateful.getMessage();
        MqttSubscriptionFlow<MqttSubAck> flow = stateful.getFlow();
        boolean z = ((MqttSubscribe) message.stateless()).getSubscriptions().size() != mqttSubAck.getReasonCodes().size();
        boolean allErrors = MqttCommonReasonCode.allErrors(mqttSubAck.getReasonCodes());
        this.incomingPublishFlows.subAck(message, mqttSubAck, stateful.getPublishFlow());
        if (z || allErrors) {
            String str = z ? "Count of Reason Codes in SUBACK does not match count of subscriptions in SUBSCRIBE" : "SUBACK contains only Error Codes";
            if (flow.isCancelled()) {
                LOGGER.warn(str + " but the SubAck flow has been cancelled");
            } else {
                flow.onError(new Mqtt5SubAckException(mqttSubAck, str));
            }
        } else if (flow.isCancelled()) {
            LOGGER.warn("Subscribe was successful but the SubAck flow has been cancelled");
        } else {
            flow.onSuccess(mqttSubAck);
        }
        completePending(channelHandlerContext, stateful);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void readUnsubAck(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttUnsubAck mqttUnsubAck) {
        MqttSubOrUnsubWithFlow.Stateful remove = this.pending.remove(mqttUnsubAck.getPacketIdentifier());
        if (remove == null) {
            MqttDisconnectUtil.disconnect(channelHandlerContext.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "Unknown packet identifier for UNSUBACK");
            return;
        }
        if (!(remove instanceof MqttUnsubscribeWithFlow.Stateful)) {
            MqttDisconnectUtil.disconnect(channelHandlerContext.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "UNSUBACK received for a SUBSCRIBE");
            return;
        }
        MqttUnsubscribeWithFlow.Stateful stateful = (MqttUnsubscribeWithFlow.Stateful) remove;
        MqttStatefulUnsubscribe message = stateful.getMessage();
        MqttSubOrUnsubAckFlow<MqttUnsubAck> flow = stateful.getFlow();
        ImmutableList<Mqtt5UnsubAckReasonCode> reasonCodes = mqttUnsubAck.getReasonCodes();
        boolean z = ((MqttUnsubscribe) message.stateless()).getTopicFilters().size() != reasonCodes.size();
        boolean allErrors = MqttCommonReasonCode.allErrors(mqttUnsubAck.getReasonCodes());
        if (reasonCodes == Mqtt3UnsubAckView.REASON_CODES_ALL_SUCCESS || !(z || allErrors)) {
            this.incomingPublishFlows.unsubscribe(message, mqttUnsubAck);
            if (flow.isCancelled()) {
                LOGGER.warn("Unsubscribe was successful but the UnsubAck flow has been cancelled");
            } else {
                flow.onSuccess(mqttUnsubAck);
            }
        } else {
            String str = z ? "Count of Reason Codes in UNSUBACK does not match count of Topic Filters in UNSUBSCRIBE" : "UNSUBACK contains only Error Codes";
            if (flow.isCancelled()) {
                LOGGER.warn(str + " but the UnsubAck flow has been cancelled");
            } else {
                flow.onError(new Mqtt5UnsubAckException(mqttUnsubAck, str));
            }
        }
        completePending(channelHandlerContext, stateful);
    }

    private void completePending(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttSubOrUnsubWithFlow.Stateful stateful) {
        MqttSubOrUnsubWithFlow.Stateful stateful2 = stateful.prev;
        MqttSubOrUnsubWithFlow.Stateful stateful3 = stateful.next;
        if (stateful2 == null) {
            this.firstPending = stateful3;
        } else {
            stateful2.next = stateful3;
        }
        if (stateful3 == null) {
            this.lastPending = stateful2;
        } else {
            stateful3.prev = stateful2;
        }
        int packetIdentifier = stateful.getMessage().getPacketIdentifier();
        MqttSubOrUnsubWithFlow mqttSubOrUnsubWithFlow = (MqttSubOrUnsubWithFlow) this.queued.poll();
        if (mqttSubOrUnsubWithFlow == null) {
            this.packetIdentifiers.returnId(packetIdentifier);
        } else {
            this.queuedCounter.getAndDecrement();
            writeSubscribeOrUnsubscribe(channelHandlerContext, mqttSubOrUnsubWithFlow, packetIdentifier);
        }
    }

    public void exceptionCaught(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull Throwable th) {
        if ((th instanceof IOException) || this.currentPending == null) {
            channelHandlerContext.fireExceptionCaught(th);
            return;
        }
        this.pending.remove(this.currentPending.getMessage().getPacketIdentifier());
        this.currentPending.getFlow().onError(th);
        completePending(channelHandlerContext, this.currentPending);
        this.currentPending = null;
    }

    @Override // com.hivemq.client.internal.mqtt.handler.MqttSessionAwareHandler
    public void onSessionEnd(@NotNull Throwable th) {
        super.onSessionEnd(th);
        MqttSubOrUnsubWithFlow.Stateful stateful = this.firstPending;
        while (true) {
            MqttSubOrUnsubWithFlow.Stateful stateful2 = stateful;
            if (stateful2 == null) {
                this.pending.clear();
                this.resendPending = null;
                this.lastPending = null;
                this.firstPending = null;
                this.subscriptionIdentifiers = null;
                clearQueued(th);
                return;
            }
            this.packetIdentifiers.returnId(stateful2.getMessage().getPacketIdentifier());
            if (!(stateful2.getFlow() instanceof MqttSubscribedPublishFlow)) {
                stateful2.getFlow().onError(th);
            }
            stateful = stateful2.next;
        }
    }

    private void clearQueued(@NotNull Throwable th) {
        int i = 0;
        while (true) {
            MqttSubOrUnsubWithFlow mqttSubOrUnsubWithFlow = (MqttSubOrUnsubWithFlow) this.queued.poll();
            if (mqttSubOrUnsubWithFlow != null) {
                if (mqttSubOrUnsubWithFlow.getFlow().init()) {
                    mqttSubOrUnsubWithFlow.getFlow().onError(th);
                }
                i++;
            } else if (this.queuedCounter.addAndGet(-i) == 0) {
                return;
            } else {
                i = 0;
            }
        }
    }
}
