package com.hivemq.client.internal.mqtt.handler.publish.incoming;

import com.hivemq.client.internal.annotations.CallByThread;
import com.hivemq.client.internal.logging.InternalLogger;
import com.hivemq.client.internal.logging.InternalLoggerFactory;
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
import com.hivemq.client.internal.util.collections.ChunkedArrayQueue;
import com.hivemq.client.internal.util.collections.HandleList;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import java.util.Iterator;
import org.jetbrains.annotations.NotNull;

/* JADX INFO: Access modifiers changed from: package-private */
@ClientScope
/* loaded from: input_file:com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishService.class */
public class MqttIncomingPublishService {

    @NotNull
    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(MqttIncomingPublishService.class);
    private static final boolean QOS_0_DROP_LATEST = true;

    @NotNull
    private final MqttIncomingQosHandler incomingQosHandler;

    @NotNull
    private final ChunkedArrayQueue<QueueEntry> qos0Queue = new ChunkedArrayQueue<>(32);

    @NotNull
    private final ChunkedArrayQueue<QueueEntry> qos1Or2Queue = new ChunkedArrayQueue<>(32);
    private int referencedFlowCount;
    private int runIndex;
    private int blockingFlowCount;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishService$QueueEntry.class */
    public static class QueueEntry {

        @NotNull
        final MqttStatefulPublish publish;

        @NotNull
        final HandleList<MqttIncomingPublishFlow> flows;

        QueueEntry(@NotNull MqttStatefulPublish mqttStatefulPublish, @NotNull HandleList<MqttIncomingPublishFlow> handleList) {
            this.publish = mqttStatefulPublish;
            this.flows = handleList;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MqttIncomingPublishService(@NotNull MqttIncomingQosHandler mqttIncomingQosHandler) {
        this.incomingQosHandler = mqttIncomingQosHandler;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @CallByThread("Netty EventLoop")
    public void onPublishQos0(@NotNull MqttStatefulPublish mqttStatefulPublish, int i) {
        if (this.qos0Queue.size() >= i) {
            LOGGER.warn("QoS 0 publish message dropped.");
            this.qos0Queue.poll();
        }
        HandleList<MqttIncomingPublishFlow> onPublish = onPublish(mqttStatefulPublish);
        if (onPublish.isEmpty()) {
            return;
        }
        this.qos0Queue.offer(new QueueEntry(mqttStatefulPublish, onPublish));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @CallByThread("Netty EventLoop")
    public boolean onPublishQos1Or2(@NotNull MqttStatefulPublish mqttStatefulPublish, int i) {
        if (this.qos1Or2Queue.size() >= i) {
            return false;
        }
        HandleList<MqttIncomingPublishFlow> onPublish = onPublish(mqttStatefulPublish);
        if (this.qos1Or2Queue.isEmpty() && onPublish.isEmpty()) {
            this.incomingQosHandler.ack(mqttStatefulPublish);
            return true;
        }
        this.qos1Or2Queue.offer(new QueueEntry(mqttStatefulPublish, onPublish));
        return true;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @CallByThread("Netty EventLoop")
    @NotNull
    private HandleList<MqttIncomingPublishFlow> onPublish(@NotNull MqttStatefulPublish mqttStatefulPublish) {
        HandleList<MqttIncomingPublishFlow> findMatching = this.incomingQosHandler.getIncomingPublishFlows().findMatching(mqttStatefulPublish);
        if (findMatching.isEmpty()) {
            LOGGER.warn("No publish flow registered for {}.", mqttStatefulPublish);
        }
        drain();
        Iterator<MqttIncomingPublishFlow> it = findMatching.iterator();
        while (it.hasNext()) {
            if (it.next().reference() == 1) {
                this.referencedFlowCount++;
            }
        }
        emit((MqttPublish) mqttStatefulPublish.stateless(), findMatching);
        return findMatching;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    @CallByThread("Netty EventLoop")
    public void drain() {
        this.runIndex++;
        this.blockingFlowCount = 0;
        boolean z = true;
        Iterator<QueueEntry> it = this.qos1Or2Queue.iterator();
        while (it.hasNext()) {
            QueueEntry next = it.next();
            MqttStatefulPublish mqttStatefulPublish = next.publish;
            HandleList<MqttIncomingPublishFlow> handleList = next.flows;
            emit((MqttPublish) mqttStatefulPublish.stateless(), handleList);
            if (z && handleList.isEmpty()) {
                it.remove();
                this.incomingQosHandler.ack(mqttStatefulPublish);
            } else {
                z = false;
                if (this.blockingFlowCount == this.referencedFlowCount) {
                    return;
                }
            }
        }
        Iterator<QueueEntry> it2 = this.qos0Queue.iterator();
        while (it2.hasNext()) {
            QueueEntry next2 = it2.next();
            MqttStatefulPublish mqttStatefulPublish2 = next2.publish;
            HandleList<MqttIncomingPublishFlow> handleList2 = next2.flows;
            emit((MqttPublish) mqttStatefulPublish2.stateless(), handleList2);
            if (handleList2.isEmpty()) {
                it2.remove();
            } else if (this.blockingFlowCount == this.referencedFlowCount) {
                return;
            }
        }
    }

    @CallByThread("Netty EventLoop")
    private void emit(@NotNull MqttPublish mqttPublish, @NotNull HandleList<MqttIncomingPublishFlow> handleList) {
        Iterator<MqttIncomingPublishFlow> it = handleList.iterator();
        while (it.hasNext()) {
            MqttIncomingPublishFlow next = it.next();
            if (next.isCancelled()) {
                it.remove();
                if (next.dereference() == 0) {
                    this.referencedFlowCount--;
                }
            } else {
                long requested = next.requested(this.runIndex);
                if (requested > 0) {
                    next.onNext((Mqtt5Publish) mqttPublish);
                    it.remove();
                    if (next.dereference() == 0) {
                        this.referencedFlowCount--;
                        next.checkDone();
                    }
                } else if (requested == 0) {
                    this.blockingFlowCount++;
                    if (this.blockingFlowCount == this.referencedFlowCount) {
                        return;
                    }
                } else {
                    continue;
                }
            }
        }
    }
}
