package org.wso2.broker.core;

import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.broker.core.store.dao.SharedMessageStore;

/* loaded from: input_file:org/wso2/broker/core/QueueHandler.class */
public final class QueueHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueueHandler.class);
    private Queue queue;
    private final Queue redeliveryQueue;
    private final Queue unmodifiableQueueView;
    private final Set<Consumer> consumers = ConcurrentHashMap.newKeySet();
    private final CyclicConsumerIterator consumerIterator = new CyclicConsumerIterator();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/broker/core/QueueHandler$DbBackedQueueImpl.class */
    public static class DbBackedQueueImpl extends Queue {
        private final java.util.Queue<Message> memQueue;
        private final SharedMessageStore sharedMessageStore;

        DbBackedQueueImpl(String str, boolean z, SharedMessageStore sharedMessageStore) throws BrokerException {
            super(str, true, z);
            this.sharedMessageStore = sharedMessageStore;
            this.memQueue = new LinkedBlockingQueue();
            this.memQueue.addAll(sharedMessageStore.readStoredMessages(str));
        }

        @Override // org.wso2.broker.core.Queue
        public int capacity() {
            return Integer.MAX_VALUE;
        }

        @Override // org.wso2.broker.core.Queue
        public int size() {
            return this.memQueue.size();
        }

        @Override // org.wso2.broker.core.Queue
        public boolean enqueue(Message message) throws BrokerException {
            if (message.getMetadata().getByteProperty(Metadata.DELIVERY_MODE) == 2) {
                this.sharedMessageStore.attach(getName(), message.getMetadata().getInternalId());
            }
            return this.memQueue.offer(message);
        }

        @Override // org.wso2.broker.core.Queue
        public Message dequeue() {
            return this.memQueue.poll();
        }

        @Override // org.wso2.broker.core.Queue
        public void detach(Message message) {
            this.sharedMessageStore.detach(getName(), message);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/broker/core/QueueHandler$MemQueueImpl.class */
    public static class MemQueueImpl extends Queue {
        private final int capacity;
        private final java.util.Queue<Message> queue;

        MemQueueImpl(String str, int i, boolean z) {
            super(str, false, z);
            this.capacity = i;
            this.queue = new LinkedBlockingDeque(i);
        }

        @Override // org.wso2.broker.core.Queue
        public int capacity() {
            return this.capacity;
        }

        @Override // org.wso2.broker.core.Queue
        public int size() {
            return this.queue.size();
        }

        @Override // org.wso2.broker.core.Queue
        public boolean enqueue(Message message) {
            return this.queue.offer(message);
        }

        @Override // org.wso2.broker.core.Queue
        public Message dequeue() {
            return this.queue.poll();
        }

        @Override // org.wso2.broker.core.Queue
        public void detach(Message message) {
        }
    }

    /* loaded from: input_file:org/wso2/broker/core/QueueHandler$UnmodifiableQueueWrapper.class */
    private static class UnmodifiableQueueWrapper extends Queue {
        private final Queue queue;

        UnmodifiableQueueWrapper(Queue queue) {
            super(queue.getName(), queue.isDurable(), queue.isAutoDelete());
            this.queue = queue;
        }

        @Override // org.wso2.broker.core.Queue
        public int capacity() {
            return this.queue.capacity();
        }

        @Override // org.wso2.broker.core.Queue
        public int size() {
            return this.queue.size();
        }

        @Override // org.wso2.broker.core.Queue
        public boolean enqueue(Message message) {
            throw new UnsupportedOperationException("Queue " + this.queue.getName() + " is unmodifiable");
        }

        @Override // org.wso2.broker.core.Queue
        public Message dequeue() {
            throw new UnsupportedOperationException("Queue " + this.queue.getName() + " is unmodifiable");
        }

        @Override // org.wso2.broker.core.Queue
        public void detach(Message message) {
            throw new UnsupportedOperationException("Queue " + this.queue.getName() + " is unmodifiable");
        }
    }

    private QueueHandler(Queue queue) {
        this.queue = queue;
        this.unmodifiableQueueView = new UnmodifiableQueueWrapper(queue);
        this.redeliveryQueue = new MemQueueImpl(queue.getName(), 1000, false);
    }

    public static QueueHandler createNonDurableQueue(String str, int i, boolean z) {
        return new QueueHandler(new MemQueueImpl(str, i, z));
    }

    public static QueueHandler createDurableQueue(String str, boolean z, SharedMessageStore sharedMessageStore) throws BrokerException {
        return new QueueHandler(new DbBackedQueueImpl(str, z, sharedMessageStore));
    }

    public Queue getQueue() {
        return this.unmodifiableQueueView;
    }

    public Collection<Consumer> getConsumers() {
        return Collections.unmodifiableCollection(this.consumers);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean addConsumer(Consumer consumer) {
        return this.consumers.add(consumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean removeConsumer(Consumer consumer) {
        return this.consumers.remove(consumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean enqueue(Message message) throws BrokerException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Enqueuing message {} to queue {}", message, this.queue.getName());
        }
        return this.queue.enqueue(message);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Message dequeue() {
        Message dequeue = this.redeliveryQueue.dequeue();
        if (dequeue == null) {
            dequeue = this.queue.dequeue();
        }
        return dequeue;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acknowledge(Message message) throws BrokerException {
        this.queue.detach(message);
    }

    public void requeue(Message message) throws BrokerException {
        this.redeliveryQueue.enqueue(message);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CyclicConsumerIterator getCyclicConsumerIterator() {
        this.consumerIterator.setIterator(Iterables.cycle(this.consumers).iterator());
        return this.consumerIterator;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isEmpty() {
        return this.queue.size() == 0;
    }

    public int size() {
        return this.queue.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isUnused() {
        return this.consumers.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeAllConsumers() {
        Iterator<Consumer> it = this.consumers.iterator();
        while (it.hasNext()) {
            Consumer next = it.next();
            try {
                try {
                    next.close();
                    it.remove();
                } catch (BrokerException e) {
                    LOGGER.error("Error occurred while closing the consumer [ " + next + " ] for queue [ " + this.queue.toString() + " ]", e);
                    it.remove();
                }
            } catch (Throwable th) {
                it.remove();
                throw th;
            }
        }
    }

    public int consumerCount() {
        return this.consumers.size();
    }
}
