package com.consol.citrus.channel;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessageSelector;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/* loaded from: input_file:com/consol/citrus/channel/MessageSelectingQueueChannel.class */
public class MessageSelectingQueueChannel extends QueueChannel {
    private static final Logger RETRY_LOG = LoggerFactory.getLogger("com.consol.citrus.RetryLogger");
    private final BlockingQueue<Message<?>> queue;
    private long pollingInterval;

    public MessageSelectingQueueChannel(BlockingQueue<Message<?>> blockingQueue) {
        super(blockingQueue);
        this.pollingInterval = 500L;
        this.queue = blockingQueue;
    }

    public MessageSelectingQueueChannel(int i) {
        this(new LinkedBlockingQueue(i));
        Assert.isTrue(i > 0, "The capacity must be a positive integer. For a zero-capacity alternative, consider using a 'RendezvousChannel'.");
    }

    public MessageSelectingQueueChannel() {
        this(new LinkedBlockingQueue());
    }

    public Message<?> receive(MessageSelector messageSelector) {
        for (Object obj : this.queue.toArray()) {
            Message<?> message = (Message) obj;
            if (messageSelector.accept(message) && this.queue.remove(message)) {
                return message;
            }
        }
        return null;
    }

    public Message<?> receive(MessageSelector messageSelector, long j) {
        Message<?> message;
        long j2;
        long j3 = j;
        Message<?> receive = receive(messageSelector);
        while (true) {
            message = receive;
            if (message != null || j3 <= 0) {
                break;
            }
            j3 -= this.pollingInterval;
            if (RETRY_LOG.isDebugEnabled()) {
                RETRY_LOG.debug("No message received with message selector - retrying in " + (j3 > 0 ? this.pollingInterval : this.pollingInterval + j3) + "ms");
            }
            if (j3 > 0) {
                try {
                    j2 = this.pollingInterval;
                } catch (InterruptedException e) {
                    RETRY_LOG.warn("Thread interrupted while waiting for retry", e);
                }
            } else {
                j2 = this.pollingInterval + j3;
            }
            Thread.sleep(j2);
            receive = receive(messageSelector);
        }
        return message;
    }

    public long getPollingInterval() {
        return this.pollingInterval;
    }

    public void setPollingInterval(long j) {
        this.pollingInterval = j;
    }
}
