package io.ballerina.messaging.broker.core;

import io.ballerina.messaging.broker.core.task.Task;
import io.ballerina.messaging.broker.core.util.MessageTracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ballerina/messaging/broker/core/MessageDeliveryTask.class */
final class MessageDeliveryTask extends Task {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageDeliveryTask.class);
    private final QueueHandler queueHandler;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageDeliveryTask(QueueHandler queueHandler) {
        this.queueHandler = queueHandler;
    }

    @Override // io.ballerina.messaging.broker.core.task.Task
    public void onAdd() {
    }

    @Override // io.ballerina.messaging.broker.core.task.Task
    public void onRemove() {
    }

    @Override // io.ballerina.messaging.broker.core.task.Task
    public String getId() {
        return this.queueHandler.getQueue().getName();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Task.TaskHint call() throws Exception {
        CyclicConsumerIterator cyclicConsumerIterator = this.queueHandler.getCyclicConsumerIterator();
        if (!cyclicConsumerIterator.hasNext()) {
            return Task.TaskHint.IDLE;
        }
        int i = 0;
        Consumer consumer = null;
        while (true) {
            Object obj = consumer;
            Consumer next = cyclicConsumerIterator.next();
            if (next.isReady()) {
                Message dequeue = this.queueHandler.dequeue();
                if (dequeue == null) {
                    break;
                }
                LOGGER.debug("Sending message {} to {}", dequeue, next);
                MessageTracer.trace(dequeue, this.queueHandler, MessageTracer.DELIVER);
                next.send(dequeue);
                i++;
                if (i == 1000) {
                    break;
                }
                consumer = next;
            } else {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Consumer {} is not ready for consuming messages from {}", next, this.queueHandler.getQueue().getName());
                }
                if (next.equals(obj)) {
                    return Task.TaskHint.IDLE;
                }
                consumer = next;
            }
        }
        return i > 0 ? Task.TaskHint.ACTIVE : Task.TaskHint.IDLE;
    }
}
