package org.graylog2.inputs.transports;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Named;
import org.graylog2.configuration.HttpConfiguration;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.BooleanField;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.NumberField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.transports.ThrottleableTransport;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.inputs.util.ThroughputCounter;
import org.graylog2.plugin.lifecycles.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/inputs/transports/AmqpTransport.class */
public class AmqpTransport extends ThrottleableTransport {
    public static final String CK_HOSTNAME = "broker_hostname";
    public static final String CK_PORT = "broker_port";
    public static final String CK_VHOST = "broker_vhost";
    public static final String CK_USERNAME = "broker_username";
    public static final String CK_PASSWORD = "broker_password";
    public static final String CK_PREFETCH = "prefetch";
    public static final String CK_EXCHANGE = "exchange";
    public static final String CK_EXCHANGE_BIND = "exchange_bind";
    public static final String CK_QUEUE = "queue";
    public static final String CK_ROUTING_KEY = "routing_key";
    public static final String CK_PARALLEL_QUEUES = "parallel_queues";
    public static final String CK_TLS = "tls";
    public static final String CK_REQUEUE_INVALID_MESSAGES = "requeue_invalid_messages";
    public static final String CK_HEARTBEAT_TIMEOUT = "heartbeat";
    private static final Logger LOG = LoggerFactory.getLogger(AmqpTransport.class);
    private final Configuration configuration;
    private final EventBus eventBus;
    private final MetricRegistry localRegistry;
    private final ScheduledExecutorService scheduler;
    private AmqpConsumer consumer;

    @ConfigClass
    /* loaded from: input_file:org/graylog2/inputs/transports/AmqpTransport$Config.class */
    public static class Config extends ThrottleableTransport.Config {
        @Override // org.graylog2.plugin.inputs.transports.ThrottleableTransport.Config, org.graylog2.plugin.inputs.transports.Transport.Config
        public ConfigurationRequest getRequestedConfiguration() {
            ConfigurationRequest requestedConfiguration = super.getRequestedConfiguration();
            requestedConfiguration.addField(new TextField(AmqpTransport.CK_HOSTNAME, "Broker hostname", HttpConfiguration.PATH_WEB, "Hostname of the AMQP broker to use", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new NumberField(AmqpTransport.CK_PORT, "Broker port", 5672, "Port of the AMQP broker to use", ConfigurationField.Optional.OPTIONAL, NumberField.Attribute.IS_PORT_NUMBER));
            requestedConfiguration.addField(new TextField(AmqpTransport.CK_VHOST, "Broker virtual host", "/", "Virtual host of the AMQP broker to use", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new TextField(AmqpTransport.CK_USERNAME, "Username", HttpConfiguration.PATH_WEB, "Username to connect to AMQP broker", ConfigurationField.Optional.OPTIONAL));
            requestedConfiguration.addField(new TextField(AmqpTransport.CK_PASSWORD, "Password", HttpConfiguration.PATH_WEB, "Password to connect to AMQP broker", ConfigurationField.Optional.OPTIONAL, TextField.Attribute.IS_PASSWORD));
            requestedConfiguration.addField(new NumberField(AmqpTransport.CK_PREFETCH, "Prefetch count", 100, "For advanced usage: AMQP prefetch count. Default is 100.", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new TextField(AmqpTransport.CK_QUEUE, "Queue", defaultQueueName(), "Name of queue that is created.", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new TextField(AmqpTransport.CK_EXCHANGE, "Exchange", defaultExchangeName(), "Name of exchange to bind to.", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new BooleanField(AmqpTransport.CK_EXCHANGE_BIND, "Bind to exchange", false, "Binds the queue to the configured exchange. The exchange must already exist."));
            requestedConfiguration.addField(new TextField(AmqpTransport.CK_ROUTING_KEY, "Routing key", defaultRoutingKey(), "Routing key to listen for.", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new NumberField(AmqpTransport.CK_PARALLEL_QUEUES, "Number of Queues", 1, "Number of parallel Queues", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new NumberField(AmqpTransport.CK_HEARTBEAT_TIMEOUT, "Heartbeat timeout", 60, "Heartbeat interval in seconds (use 0 to disable heartbeat)", ConfigurationField.Optional.OPTIONAL));
            requestedConfiguration.addField(new BooleanField(AmqpTransport.CK_TLS, "Enable TLS?", false, "Enable transport encryption via TLS. (requires valid TLS port setting)"));
            requestedConfiguration.addField(new BooleanField(AmqpTransport.CK_REQUEUE_INVALID_MESSAGES, "Re-queue invalid messages?", true, "Invalid messages will be discarded if disabled."));
            return requestedConfiguration;
        }

        protected String defaultRoutingKey() {
            return "#";
        }

        protected String defaultExchangeName() {
            return "log-messages";
        }

        protected String defaultQueueName() {
            return "log-messages";
        }
    }

    @FactoryClass
    /* loaded from: input_file:org/graylog2/inputs/transports/AmqpTransport$Factory.class */
    public interface Factory extends Transport.Factory<AmqpTransport> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.graylog2.plugin.inputs.transports.Transport.Factory
        AmqpTransport create(Configuration configuration);

        @Override // org.graylog2.plugin.inputs.transports.Transport.Factory
        Config getConfig();
    }

    @AssistedInject
    public AmqpTransport(@Assisted Configuration configuration, EventBus eventBus, LocalMetricRegistry localMetricRegistry, @Named("daemonScheduler") ScheduledExecutorService scheduledExecutorService) {
        super(eventBus, configuration);
        this.configuration = configuration;
        this.eventBus = eventBus;
        this.localRegistry = localMetricRegistry;
        this.scheduler = scheduledExecutorService;
        localMetricRegistry.register(ThroughputCounter.READ_BYTES_1_SEC, new Gauge<Long>() { // from class: org.graylog2.inputs.transports.AmqpTransport.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m497getValue() {
                return Long.valueOf(AmqpTransport.this.consumer.getLastSecBytesRead().get());
            }
        });
        localMetricRegistry.register(ThroughputCounter.WRITTEN_BYTES_1_SEC, new Gauge<Long>() { // from class: org.graylog2.inputs.transports.AmqpTransport.2
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m498getValue() {
                return 0L;
            }
        });
        localMetricRegistry.register(ThroughputCounter.READ_BYTES_TOTAL, new Gauge<Long>() { // from class: org.graylog2.inputs.transports.AmqpTransport.3
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m499getValue() {
                return Long.valueOf(AmqpTransport.this.consumer.getTotalBytesRead().get());
            }
        });
        localMetricRegistry.register(ThroughputCounter.WRITTEN_BYTES_TOTAL, new Gauge<Long>() { // from class: org.graylog2.inputs.transports.AmqpTransport.4
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m500getValue() {
                return 0L;
            }
        });
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:3:0x0013. Please report as an issue. */
    @Subscribe
    public void lifecycleChanged(Lifecycle lifecycle) {
        try {
            LOG.debug("Lifecycle changed to {}", lifecycle);
            switch (lifecycle) {
                case PAUSED:
                case FAILED:
                case HALTING:
                    try {
                        if (this.consumer != null) {
                            this.consumer.stop();
                        }
                    } catch (IOException e) {
                        LOG.warn("Unable to stop consumer", e);
                    }
                    return;
                default:
                    if (this.consumer.isConnected()) {
                        LOG.debug("Consumer is already connected, not running it a second time.");
                    } else {
                        try {
                            this.consumer.run();
                        } catch (IOException e2) {
                            LOG.warn("Unable to resume consumer", e2);
                        }
                    }
                    return;
            }
        } catch (Exception e3) {
            LOG.warn("This should not throw any exceptions", e3);
        }
    }

    @Override // org.graylog2.plugin.inputs.transports.Transport
    public void setMessageAggregator(CodecAggregator codecAggregator) {
    }

    @Override // org.graylog2.plugin.inputs.transports.ThrottleableTransport
    public void doLaunch(MessageInput messageInput) throws MisfireException {
        int i = 60;
        if (this.configuration.intIsSet(CK_HEARTBEAT_TIMEOUT)) {
            i = this.configuration.getInt(CK_HEARTBEAT_TIMEOUT);
            if (i < 0) {
                LOG.warn("AMQP heartbeat interval must not be negative ({}), using default timeout ({}).", Integer.valueOf(i), 60);
                i = 60;
            }
        }
        this.consumer = new AmqpConsumer(this.configuration.getString(CK_HOSTNAME), this.configuration.getInt(CK_PORT), this.configuration.getString(CK_VHOST), this.configuration.getString(CK_USERNAME), this.configuration.getString(CK_PASSWORD), this.configuration.getInt(CK_PREFETCH), this.configuration.getString(CK_QUEUE), this.configuration.getString(CK_EXCHANGE), this.configuration.getBoolean(CK_EXCHANGE_BIND), this.configuration.getString(CK_ROUTING_KEY), this.configuration.getInt(CK_PARALLEL_QUEUES), this.configuration.getBoolean(CK_TLS), this.configuration.getBoolean(CK_REQUEUE_INVALID_MESSAGES), i, messageInput, this.scheduler, this);
        this.eventBus.register(this);
        try {
            this.consumer.run();
        } catch (IOException e) {
            this.eventBus.unregister(this);
            throw new MisfireException("Could not launch AMQP consumer.", e);
        }
    }

    @Override // org.graylog2.plugin.inputs.transports.ThrottleableTransport
    public void doStop() {
        if (this.consumer != null) {
            try {
                this.consumer.stop();
            } catch (IOException e) {
                LOG.error("Could not stop AMQP consumer.", e);
            }
        }
        this.eventBus.unregister(this);
    }

    @Override // org.graylog2.plugin.inputs.transports.Transport
    public MetricSet getMetricSet() {
        return this.localRegistry;
    }
}
