package org.graylog2.inputs.transports;

import com.google.common.base.Strings;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Command;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.TrafficListener;
import com.rabbitmq.client.impl.DefaultExceptionHandler;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.graylog2.plugin.InputFailureRecorder;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.security.encryption.EncryptedValue;
import org.graylog2.security.encryption.EncryptedValueService;
import org.graylog2.shared.utilities.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/inputs/transports/AmqpConsumer.class */
public class AmqpConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
    private final String hostname;
    private final int port;
    private final String virtualHost;
    private final String username;
    private final EncryptedValue password;
    private final int prefetchCount;
    private final String queue;
    private final String exchange;
    private final boolean exchangeBind;
    private final String routingKey;
    private final boolean requeueInvalid;
    private final int heartbeatTimeout;
    private final MessageInput sourceInput;
    private final int parallelQueues;
    private final boolean tls;
    private final ScheduledExecutorService scheduler;
    private final InputFailureRecorder inputFailureRecorder;
    private final AmqpTransport amqpTransport;
    private final EncryptedValueService encryptedValueService;
    private final Duration connectionRecoveryInterval;
    private final AtomicLong totalBytesRead = new AtomicLong(0);
    private final AtomicLong lastSecBytesRead = new AtomicLong(0);
    private final AtomicLong lastSecBytesReadTmp = new AtomicLong(0);
    private Connection connection;
    private Channel channel;
    private ScheduledFuture<?> scheduledFuture;

    public AmqpConsumer(int i, MessageInput messageInput, Configuration configuration, ScheduledExecutorService scheduledExecutorService, InputFailureRecorder inputFailureRecorder, AmqpTransport amqpTransport, EncryptedValueService encryptedValueService, Duration duration) {
        this.hostname = configuration.getString(AmqpTransport.CK_HOSTNAME);
        this.port = configuration.getInt(AmqpTransport.CK_PORT);
        this.virtualHost = configuration.getString(AmqpTransport.CK_VHOST);
        this.username = configuration.getString(AmqpTransport.CK_USERNAME);
        this.password = configuration.getEncryptedValue(AmqpTransport.CK_PASSWORD);
        this.prefetchCount = configuration.getInt(AmqpTransport.CK_PREFETCH);
        this.queue = configuration.getString(AmqpTransport.CK_QUEUE);
        this.exchange = configuration.getString(AmqpTransport.CK_EXCHANGE);
        this.exchangeBind = configuration.getBoolean(AmqpTransport.CK_EXCHANGE_BIND);
        this.routingKey = configuration.getString(AmqpTransport.CK_ROUTING_KEY);
        this.parallelQueues = configuration.getInt(AmqpTransport.CK_PARALLEL_QUEUES);
        this.requeueInvalid = configuration.getBoolean(AmqpTransport.CK_REQUEUE_INVALID_MESSAGES);
        this.tls = configuration.getBoolean(AmqpTransport.CK_TLS);
        this.heartbeatTimeout = i;
        this.sourceInput = messageInput;
        this.scheduler = scheduledExecutorService;
        this.inputFailureRecorder = inputFailureRecorder;
        this.amqpTransport = amqpTransport;
        this.encryptedValueService = encryptedValueService;
        this.connectionRecoveryInterval = duration;
    }

    public void run() throws IOException, TimeoutException {
        if (!isConnected()) {
            connect();
        }
        this.scheduledFuture = this.scheduler.scheduleAtFixedRate(() -> {
            this.lastSecBytesRead.set(this.lastSecBytesReadTmp.getAndSet(0L));
        }, 1L, 1L, TimeUnit.SECONDS);
        for (int i = 0; i < this.parallelQueues; i++) {
            String format = String.format(Locale.ENGLISH, this.queue, Integer.valueOf(i));
            this.channel.queueDeclare(format, true, false, false, (Map) null);
            if (this.exchangeBind) {
                this.channel.queueBind(format, this.exchange, this.routingKey);
            }
            this.channel.basicConsume(format, false, new DefaultConsumer(this.channel) { // from class: org.graylog2.inputs.transports.AmqpConsumer.1
                public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    try {
                        AmqpConsumer.this.totalBytesRead.addAndGet(bArr.length);
                        AmqpConsumer.this.lastSecBytesReadTmp.addAndGet(bArr.length);
                        RawMessage rawMessage = new RawMessage(bArr);
                        if (AmqpConsumer.this.amqpTransport.isThrottled()) {
                            AmqpConsumer.this.amqpTransport.blockUntilUnthrottled();
                        }
                        AmqpConsumer.this.sourceInput.processRawMessage(rawMessage);
                        AmqpConsumer.this.channel.basicAck(deliveryTag, false);
                    } catch (Exception e) {
                        AmqpConsumer.LOG.error("Error while trying to process AMQP message", e);
                        if (AmqpConsumer.this.channel.isOpen()) {
                            AmqpConsumer.this.channel.basicNack(deliveryTag, false, AmqpConsumer.this.requeueInvalid);
                            if (AmqpConsumer.LOG.isDebugEnabled()) {
                                if (AmqpConsumer.this.requeueInvalid) {
                                    AmqpConsumer.LOG.debug("Re-queue message with delivery tag {}", Long.valueOf(deliveryTag));
                                } else {
                                    AmqpConsumer.LOG.debug("Message with delivery tag {} not re-queued", Long.valueOf(deliveryTag));
                                }
                            }
                        }
                    }
                }
            });
        }
    }

    public void connect() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setExceptionHandler(new DefaultExceptionHandler() { // from class: org.graylog2.inputs.transports.AmqpConsumer.2
            public void handleConnectionRecoveryException(Connection connection, Throwable th) {
                super.handleConnectionRecoveryException(connection, th);
                AmqpConsumer.this.inputFailureRecorder.setFailing(getClass(), "Connection recovery error!", th);
            }
        });
        connectionFactory.setTrafficListener(new TrafficListener() { // from class: org.graylog2.inputs.transports.AmqpConsumer.3
            public void write(Command command) {
            }

            public void read(Command command) {
                AmqpConsumer.this.inputFailureRecorder.setRunning();
            }
        });
        connectionFactory.setHost(this.hostname);
        connectionFactory.setPort(this.port);
        connectionFactory.setVirtualHost(this.virtualHost);
        connectionFactory.setRequestedHeartbeat(this.heartbeatTimeout);
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(this.connectionRecoveryInterval.toMillis());
        if (this.tls) {
            try {
                LOG.info("Enabling TLS for AMQP input {}.", this.sourceInput.toIdentifier());
                connectionFactory.useSslProtocol();
            } catch (KeyManagementException | NoSuchAlgorithmException e) {
                throw new IOException("Couldn't enable TLS for AMQP input.", e);
            }
        }
        if (!Strings.isNullOrEmpty(this.username) && this.password.isSet()) {
            connectionFactory.setUsername(this.username);
            connectionFactory.setPassword(this.encryptedValueService.decrypt(this.password));
        }
        this.connection = connectionFactory.newConnection();
        this.channel = this.connection.createChannel();
        if (null == this.channel) {
            LOG.error("No channel descriptor available!");
        }
        if (null != this.channel && this.prefetchCount > 0) {
            this.channel.basicQos(this.prefetchCount);
            LOG.debug("AMQP prefetch count overriden to <{}>.", Integer.valueOf(this.prefetchCount));
        }
        this.connection.addShutdownListener(shutdownSignalException -> {
            if (shutdownSignalException.isInitiatedByApplication()) {
                LOG.info("Shutting down AMPQ consumer.");
            } else {
                this.inputFailureRecorder.setFailing(getClass(), StringUtils.f("AMQP connection lost (reason: %s)! Reconnecting ...", shutdownSignalException.getReason().protocolMethodName()));
            }
        });
    }

    public void stop() throws IOException {
        if (this.channel != null && this.channel.isOpen()) {
            try {
                this.channel.close();
            } catch (TimeoutException e) {
                LOG.error("Timeout when closing AMQP channel", e);
                this.channel.abort();
            }
        }
        if (this.connection != null && this.connection.isOpen()) {
            this.connection.close();
        } else if (this.connection != null) {
            this.connection.abort();
        }
        if (null != this.scheduledFuture) {
            this.scheduledFuture.cancel(true);
        }
    }

    public boolean isConnected() {
        return this.connection != null && this.connection.isOpen() && this.channel != null && this.channel.isOpen();
    }

    public AtomicLong getLastSecBytesRead() {
        return this.lastSecBytesRead;
    }

    public AtomicLong getTotalBytesRead() {
        return this.totalBytesRead;
    }
}
