/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.extension.siddhi.io.rabbitmq.source;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.wso2.extension.siddhi.io.rabbitmq.util.RabbitMQSinkUtil;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;

public class RabbitMQConsumer {
    private static final Logger log = Logger.getLogger(RabbitMQConsumer.class);
    private static Channel channel = null;
    private static boolean isPaused;
    private static ReentrantLock lock;
    private static Condition condition;

    public static void consume(Connection connection, String exchangeName, String exchangeType, boolean exchangeDurable, boolean exchangeAutoDelete, String queueName, boolean queueExclusive, boolean queueDurable, boolean queueAutodelete, String routingKey, Map<String, Object> map, final SourceEventListener sourceEventListener) throws Exception {
        channel = connection.createChannel();
        lock = new ReentrantLock();
        condition = lock.newCondition();
        try {
            channel.exchangeDeclarePassive(exchangeName);
        }
        catch (Exception e) {
            channel = connection.createChannel();
            RabbitMQSinkUtil.declareExchange(connection, channel, exchangeName, exchangeType, exchangeDurable, exchangeAutoDelete);
        }
        if (queueName.isEmpty()) {
            queueName = channel.queueDeclare().getQueue();
        } else {
            try {
                channel.queueDeclarePassive(queueName);
            }
            catch (Exception e) {
                channel = connection.createChannel();
                RabbitMQSinkUtil.declareQueue(connection, channel, queueName, queueDurable, queueAutodelete, queueExclusive);
            }
        }
        channel.queueBind(queueName, exchangeName, routingKey, map);
        DefaultConsumer consumer = new DefaultConsumer(channel){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                try {
                    if (isPaused) {
                        lock.lock();
                        try {
                            while (!isPaused) {
                                condition.await();
                            }
                        }
                        catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                        }
                        finally {
                            lock.unlock();
                        }
                    }
                    String message = new String(body, "UTF-8");
                    sourceEventListener.onEvent((Object)message, null);
                }
                catch (IOException e) {
                    log.error((Object)("Error in receiving the message from the RabbitMQ broker in " + sourceEventListener), (Throwable)e);
                }
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }

    public static void closeChannel() throws IOException, TimeoutException {
        channel.close();
    }

    public static void pause() {
        isPaused = true;
    }

    public static void resume() {
        isPaused = false;
        try {
            lock.lock();
            condition.signalAll();
        }
        finally {
            lock.unlock();
        }
    }
}

