package org.graylog2.inputs.transports;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Named;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.NumberField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.transports.ThrottleableTransport;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.plugin.lifecycles.Lifecycle;
import org.graylog2.plugin.system.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/inputs/transports/KafkaTransport.class */
public class KafkaTransport extends ThrottleableTransport {
    public static final String GROUP_ID = "graylog2";
    public static final String CK_FETCH_MIN_BYTES = "fetch_min_bytes";
    public static final String CK_FETCH_WAIT_MAX = "fetch_wait_max";
    public static final String CK_ZOOKEEPER = "zookeeper";
    public static final String CK_TOPIC_FILTER = "topic_filter";
    public static final String CK_THREADS = "threads";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTransport.class);
    private final Configuration configuration;
    private final MetricRegistry localRegistry;
    private final NodeId nodeId;
    private final EventBus serverEventBus;
    private final ServerStatus serverStatus;
    private final ScheduledExecutorService scheduler;
    private final MetricRegistry metricRegistry;
    private final AtomicLong totalBytesRead;
    private final AtomicLong lastSecBytesRead;
    private final AtomicLong lastSecBytesReadTmp;
    private volatile boolean stopped;
    private volatile boolean paused;
    private volatile CountDownLatch pausedLatch;
    private CountDownLatch stopLatch;
    private ConsumerConnector cc;

    @ConfigClass
    /* loaded from: input_file:org/graylog2/inputs/transports/KafkaTransport$Config.class */
    public static class Config extends ThrottleableTransport.Config {
        @Override // org.graylog2.plugin.inputs.transports.ThrottleableTransport.Config, org.graylog2.plugin.inputs.transports.Transport.Config
        public ConfigurationRequest getRequestedConfiguration() {
            ConfigurationRequest requestedConfiguration = super.getRequestedConfiguration();
            requestedConfiguration.addField(new TextField(KafkaTransport.CK_ZOOKEEPER, "ZooKeeper address", "127.0.0.1:2181", "Host and port of the ZooKeeper that is managing your Kafka cluster.", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new TextField(KafkaTransport.CK_TOPIC_FILTER, "Topic filter regex", "^your-topic$", "Every topic that matches this regular expression will be consumed.", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new NumberField(KafkaTransport.CK_FETCH_MIN_BYTES, "Fetch minimum bytes", 5, "Wait for a message batch to reach at least this size or the configured maximum wait time before fetching.", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new NumberField(KafkaTransport.CK_FETCH_WAIT_MAX, "Fetch maximum wait time (ms)", 100, "Wait for this time or the configured minimum size of a message batch before fetching.", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new NumberField(KafkaTransport.CK_THREADS, "Processor threads", 2, "Number of processor threads to spawn. Use one thread per Kafka topic partition.", ConfigurationField.Optional.NOT_OPTIONAL));
            return requestedConfiguration;
        }
    }

    @FactoryClass
    /* loaded from: input_file:org/graylog2/inputs/transports/KafkaTransport$Factory.class */
    public interface Factory extends Transport.Factory<KafkaTransport> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.graylog2.plugin.inputs.transports.Transport.Factory
        KafkaTransport create(Configuration configuration);

        @Override // org.graylog2.plugin.inputs.transports.Transport.Factory
        Config getConfig();
    }

    @AssistedInject
    public KafkaTransport(@Assisted Configuration configuration, LocalMetricRegistry localMetricRegistry, NodeId nodeId, EventBus eventBus, ServerStatus serverStatus, @Named("daemonScheduler") ScheduledExecutorService scheduledExecutorService) {
        super(eventBus, configuration);
        this.totalBytesRead = new AtomicLong(0L);
        this.lastSecBytesRead = new AtomicLong(0L);
        this.lastSecBytesReadTmp = new AtomicLong(0L);
        this.stopped = false;
        this.paused = true;
        this.pausedLatch = new CountDownLatch(1);
        this.configuration = configuration;
        this.localRegistry = localMetricRegistry;
        this.nodeId = nodeId;
        this.serverEventBus = eventBus;
        this.serverStatus = serverStatus;
        this.scheduler = scheduledExecutorService;
        this.metricRegistry = localMetricRegistry;
        localMetricRegistry.register("read_bytes_1sec", new Gauge<Long>() { // from class: org.graylog2.inputs.transports.KafkaTransport.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m149getValue() {
                return Long.valueOf(KafkaTransport.this.lastSecBytesRead.get());
            }
        });
        localMetricRegistry.register("written_bytes_1sec", new Gauge<Long>() { // from class: org.graylog2.inputs.transports.KafkaTransport.2
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m150getValue() {
                return 0L;
            }
        });
        localMetricRegistry.register("read_bytes_total", new Gauge<Long>() { // from class: org.graylog2.inputs.transports.KafkaTransport.3
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m151getValue() {
                return Long.valueOf(KafkaTransport.this.totalBytesRead.get());
            }
        });
        localMetricRegistry.register("written_bytes_total", new Gauge<Long>() { // from class: org.graylog2.inputs.transports.KafkaTransport.4
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m152getValue() {
                return 0L;
            }
        });
    }

    @Subscribe
    public void lifecycleStateChange(Lifecycle lifecycle) {
        LOG.debug("Lifecycle changed to {}", lifecycle);
        switch (lifecycle) {
            case PAUSED:
            case FAILED:
            case HALTING:
                this.pausedLatch = new CountDownLatch(1);
                this.paused = true;
                return;
            default:
                this.paused = false;
                this.pausedLatch.countDown();
                return;
        }
    }

    @Override // org.graylog2.plugin.inputs.transports.Transport
    public void setMessageAggregator(CodecAggregator codecAggregator) {
    }

    @Override // org.graylog2.plugin.inputs.transports.ThrottleableTransport
    public void doLaunch(final MessageInput messageInput) throws MisfireException {
        this.serverStatus.awaitRunning(new Runnable() { // from class: org.graylog2.inputs.transports.KafkaTransport.5
            @Override // java.lang.Runnable
            public void run() {
                KafkaTransport.this.lifecycleStateChange(Lifecycle.RUNNING);
            }
        });
        this.serverEventBus.register(this);
        Properties properties = new Properties();
        properties.put("group.id", GROUP_ID);
        properties.put("client.id", "gl2-" + this.nodeId + "-" + messageInput.getId());
        properties.put("fetch.min.bytes", String.valueOf(this.configuration.getInt(CK_FETCH_MIN_BYTES)));
        properties.put("fetch.wait.max.ms", String.valueOf(this.configuration.getInt(CK_FETCH_WAIT_MAX)));
        properties.put("zookeeper.connect", this.configuration.getString(CK_ZOOKEEPER));
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("consumer.timeout.ms", "1000");
        int i = this.configuration.getInt(CK_THREADS);
        this.cc = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
        List<KafkaStream> createMessageStreamsByFilter = this.cc.createMessageStreamsByFilter(new Whitelist(this.configuration.getString(CK_TOPIC_FILTER)), i);
        ExecutorService executorService = executorService(i);
        this.stopLatch = new CountDownLatch(createMessageStreamsByFilter.size());
        for (final KafkaStream kafkaStream : createMessageStreamsByFilter) {
            executorService.submit(new Runnable() { // from class: org.graylog2.inputs.transports.KafkaTransport.6
                @Override // java.lang.Runnable
                public void run() {
                    ConsumerIterator it = kafkaStream.iterator();
                    do {
                        boolean z = false;
                        while (it.hasNext()) {
                            try {
                                if (KafkaTransport.this.paused) {
                                    KafkaTransport.LOG.debug("Message processing is paused, blocking until message processing is turned back on.");
                                    Uninterruptibles.awaitUninterruptibly(KafkaTransport.this.pausedLatch);
                                }
                                if (KafkaTransport.this.stopped) {
                                    break;
                                }
                                if (KafkaTransport.this.isThrottled()) {
                                    KafkaTransport.this.blockUntilUnthrottled();
                                }
                                byte[] bArr = (byte[]) it.next().message();
                                KafkaTransport.this.totalBytesRead.addAndGet(bArr.length);
                                KafkaTransport.this.lastSecBytesReadTmp.addAndGet(bArr.length);
                                messageInput.processRawMessage(new RawMessage(bArr));
                            } catch (ConsumerTimeoutException e) {
                                z = true;
                            } catch (Exception e2) {
                                KafkaTransport.LOG.error("Kafka consumer error, stopping consumer thread.", e2);
                            }
                        }
                        if (!z) {
                            break;
                        }
                    } while (!KafkaTransport.this.stopped);
                    KafkaTransport.this.cc.commitOffsets();
                    KafkaTransport.this.stopLatch.countDown();
                }
            });
        }
        this.scheduler.scheduleAtFixedRate(new Runnable() { // from class: org.graylog2.inputs.transports.KafkaTransport.7
            @Override // java.lang.Runnable
            public void run() {
                KafkaTransport.this.lastSecBytesRead.set(KafkaTransport.this.lastSecBytesReadTmp.getAndSet(0L));
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    private ExecutorService executorService(int i) {
        return new InstrumentedExecutorService(Executors.newFixedThreadPool(i, new ThreadFactoryBuilder().setNameFormat("kafka-transport-%d").build()), this.metricRegistry, MetricRegistry.name(getClass(), new String[]{"executor-service"}));
    }

    @Override // org.graylog2.plugin.inputs.transports.ThrottleableTransport
    public void doStop() {
        this.stopped = true;
        this.serverEventBus.unregister(this);
        if (this.stopLatch != null) {
            try {
                if (this.pausedLatch != null && this.pausedLatch.getCount() > 0) {
                    this.pausedLatch.countDown();
                }
                boolean await = this.stopLatch.await(5L, TimeUnit.SECONDS);
                this.stopLatch = null;
                if (!await) {
                    LOG.info("Stopping Kafka input timed out (waited 5 seconds for consumer threads to stop). Forcefully closing connection now. This is usually harmless when stopping the input.");
                }
            } catch (InterruptedException e) {
                LOG.debug("Interrupted while waiting to stop input.");
            }
        }
        if (this.cc != null) {
            this.cc.shutdown();
            this.cc = null;
        }
    }

    @Override // org.graylog2.plugin.inputs.transports.Transport
    public MetricSet getMetricSet() {
        return this.localRegistry;
    }
}
