package org.graylog2.inputs.transports;

import com.google.common.base.Strings;
import com.google.common.util.concurrent.Uninterruptibles;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.graylog2.plugin.buffers.BufferOutOfCapacityException;
import org.graylog2.plugin.buffers.ProcessingDisabledException;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.journal.RawMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/inputs/transports/AmqpConsumer.class */
public class AmqpConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
    private final String hostname;
    private final int port;
    private final String virtualHost;
    private final String username;
    private final String password;
    private final int prefetchCount;
    private final String queue;
    private final String exchange;
    private final String routingKey;
    private Connection connection;
    private Channel channel;
    private final MessageInput sourceInput;
    private AtomicLong totalBytesRead = new AtomicLong(0);
    private AtomicLong lastSecBytesRead = new AtomicLong(0);
    private AtomicLong lastSecBytesReadTmp = new AtomicLong(0);

    public AmqpConsumer(String str, int i, String str2, String str3, String str4, int i2, String str5, String str6, String str7, MessageInput messageInput, ScheduledExecutorService scheduledExecutorService) {
        this.hostname = str;
        this.port = i;
        this.virtualHost = str2;
        this.username = str3;
        this.password = str4;
        this.prefetchCount = i2;
        this.queue = str5;
        this.exchange = str6;
        this.routingKey = str7;
        this.sourceInput = messageInput;
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.graylog2.inputs.transports.AmqpConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                AmqpConsumer.this.lastSecBytesRead.set(AmqpConsumer.this.lastSecBytesReadTmp.getAndSet(0L));
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    public void run() throws IOException {
        if (!isConnected()) {
            connect();
        }
        this.channel.basicConsume(this.queue, false, new DefaultConsumer(this.channel) { // from class: org.graylog2.inputs.transports.AmqpConsumer.2
            @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
            public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                long deliveryTag = envelope.getDeliveryTag();
                try {
                    AmqpConsumer.this.totalBytesRead.addAndGet(bArr.length);
                    AmqpConsumer.this.lastSecBytesReadTmp.addAndGet(bArr.length);
                    AmqpConsumer.this.sourceInput.processRawMessageFailFast(new RawMessage("radio-msgpack", AmqpConsumer.this.sourceInput.getId(), null, bArr));
                    AmqpConsumer.this.channel.basicAck(deliveryTag, false);
                } catch (BufferOutOfCapacityException e) {
                    AmqpConsumer.LOG.debug("Input buffer full, requeuing message. Delaying 10 ms until trying next message.");
                    if (AmqpConsumer.this.channel.isOpen()) {
                        AmqpConsumer.this.channel.basicNack(deliveryTag, false, true);
                        Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
                    }
                } catch (ProcessingDisabledException e2) {
                    AmqpConsumer.LOG.debug("Message processing is disabled, requeuing message. Delaying 100 ms until trying next message.");
                    if (AmqpConsumer.this.channel.isOpen()) {
                        AmqpConsumer.this.channel.basicNack(deliveryTag, false, true);
                        Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
                    }
                } catch (Exception e3) {
                    AmqpConsumer.LOG.error("Error while trying to process AMQP message, requeuing message", (Throwable) e3);
                    if (AmqpConsumer.this.channel.isOpen()) {
                        AmqpConsumer.this.channel.basicNack(deliveryTag, false, true);
                    }
                }
            }
        });
    }

    public void connect() throws IOException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(this.hostname);
        connectionFactory.setPort(this.port);
        connectionFactory.setVirtualHost(this.virtualHost);
        if (!Strings.isNullOrEmpty(this.username) && !Strings.isNullOrEmpty(this.password)) {
            connectionFactory.setUsername(this.username);
            connectionFactory.setPassword(this.password);
        }
        this.connection = connectionFactory.newConnection();
        this.channel = this.connection.createChannel();
        if (null == this.channel) {
            LOG.error("No channel descriptor available!");
        }
        if (null != this.channel && this.prefetchCount > 0) {
            this.channel.basicQos(this.prefetchCount);
            LOG.info("AMQP prefetch count overriden to <{}>.", Integer.valueOf(this.prefetchCount));
        }
        this.connection.addShutdownListener(new ShutdownListener() { // from class: org.graylog2.inputs.transports.AmqpConsumer.3
            @Override // com.rabbitmq.client.ShutdownListener
            public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                if (shutdownSignalException.isInitiatedByApplication()) {
                    AmqpConsumer.LOG.info("Not reconnecting connection, we disconnected explicitly.");
                    return;
                }
                while (true) {
                    try {
                        AmqpConsumer.LOG.error("AMQP connection lost! Trying reconnect in 1 second.");
                        Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
                        AmqpConsumer.this.connect();
                        AmqpConsumer.LOG.info("Connected! Re-starting consumer.");
                        AmqpConsumer.this.run();
                        AmqpConsumer.LOG.info("Consumer running.");
                        return;
                    } catch (IOException e) {
                        AmqpConsumer.LOG.error("Could not re-connect to AMQP broker.", (Throwable) e);
                    }
                }
            }
        });
    }

    public void stop() throws IOException {
        if (this.channel != null && this.channel.isOpen()) {
            this.channel.close();
        }
        if (this.connection == null || !this.connection.isOpen()) {
            return;
        }
        this.connection.close();
    }

    public boolean isConnected() {
        return this.connection != null && this.connection.isOpen() && this.channel != null && this.channel.isOpen();
    }

    public AtomicLong getLastSecBytesRead() {
        return this.lastSecBytesRead;
    }

    public AtomicLong getTotalBytesRead() {
        return this.totalBytesRead;
    }
}
