package org.apache.synapse.transport.amqp;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessageListener;
import org.apache.qpidity.transport.RangeSet;

/* loaded from: input_file:org/apache/synapse/transport/amqp/MessageManager.class */
public class MessageManager implements MessageListener {
    private static final Log log = LogFactory.getLog(AMQPSender.class);
    private ArrayBlockingQueue<Message> queue = new ArrayBlockingQueue<>(1, true);
    private Session session;
    private String destination;
    private String corelationId;

    public MessageManager(Session session, String str, String str2) {
        this.session = session;
        this.destination = str;
    }

    public Message receive(long j) {
        this.session.messageFlow(this.destination, (short) 0, 1L);
        this.session.messageFlow(this.destination, (short) 1, -1L);
        try {
            Message poll = this.queue.poll(j, TimeUnit.MILLISECONDS);
            if (poll == null) {
                log.debug("Message Didn't arrive in time, checking if one is inflight");
                this.session.messageFlush(this.destination);
                this.session.sync();
                try {
                    poll = this.queue.take();
                } catch (Exception e) {
                    throw new AMQPSynapseException("unable to receive message", e);
                }
            }
            return poll;
        } catch (Exception e2) {
            throw new AMQPSynapseException("unable to receive message", e2);
        }
    }

    public void onMessage(Message message) {
        System.out.println("\n================== Received Msg ==================");
        System.out.println("Message Id : " + message.getMessageProperties().getMessageId());
        System.out.println(message.toString());
        System.out.println("================== End Msg ==================\n");
        if (this.corelationId.equals(message.getMessageProperties().getCorrelationId())) {
            this.queue.add(message);
            return;
        }
        RangeSet rangeSet = new RangeSet();
        rangeSet.add(message.getMessageTransferId());
        this.session.messageRelease(rangeSet);
    }
}
