/*
 * Decompiled with CFR 0.152.
 */
package org.citrusframework.channel;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessageSelector;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class MessageSelectingQueueChannel
extends QueueChannel {
    private static final Logger RETRY_LOG = LoggerFactory.getLogger((String)"org.citrusframework.RetryLogger");
    private final BlockingQueue<Message<?>> queue;
    private long pollingInterval = 500L;

    public MessageSelectingQueueChannel(BlockingQueue<Message<?>> queue) {
        super(queue);
        this.setLoggingEnabled(false);
        this.queue = queue;
    }

    public MessageSelectingQueueChannel(int capacity) {
        this(new LinkedBlockingQueue(capacity));
        Assert.isTrue((capacity > 0 ? 1 : 0) != 0, (String)"The capacity must be a positive integer. For a zero-capacity alternative, consider using a 'RendezvousChannel'.");
    }

    public MessageSelectingQueueChannel() {
        this(new LinkedBlockingQueue());
    }

    public Message<?> receive(MessageSelector selector) {
        Object[] array;
        for (Object o : array = this.queue.toArray()) {
            Message message = (Message)o;
            if (!selector.accept(message) || !this.queue.remove(message)) continue;
            return message;
        }
        return null;
    }

    public Message<?> receive(MessageSelector selector, long timeout) {
        long timeLeft = timeout;
        Message<?> message = this.receive(selector);
        while (message == null && timeLeft > 0L) {
            timeLeft -= this.pollingInterval;
            if (RETRY_LOG.isDebugEnabled()) {
                RETRY_LOG.debug("No message received with message selector - retrying in " + (timeLeft > 0L ? this.pollingInterval : this.pollingInterval + timeLeft) + "ms");
            }
            try {
                Thread.sleep(timeLeft > 0L ? this.pollingInterval : this.pollingInterval + timeLeft);
            }
            catch (InterruptedException e) {
                RETRY_LOG.warn("Thread interrupted while waiting for retry", (Throwable)e);
            }
            message = this.receive(selector);
        }
        return message;
    }

    public long getPollingInterval() {
        return this.pollingInterval;
    }

    public void setPollingInterval(long pollingInterval) {
        this.pollingInterval = pollingInterval;
    }
}

