package org.graylog2.inputs.transports;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import javax.inject.Named;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.graylog2.configuration.HttpConfiguration;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.BooleanField;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.DropdownField;
import org.graylog2.plugin.configuration.fields.NumberField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.transports.ThrottleableTransport;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.inputs.util.ThroughputCounter;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.plugin.lifecycles.Lifecycle;
import org.graylog2.plugin.system.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/inputs/transports/KafkaTransport.class */
public class KafkaTransport extends ThrottleableTransport {
    public static final String CK_LEGACY = "legacy_mode";
    public static final String CK_FETCH_MIN_BYTES = "fetch_min_bytes";
    public static final String CK_FETCH_WAIT_MAX = "fetch_wait_max";
    public static final String CK_ZOOKEEPER = "zookeeper";
    public static final String CK_BOOTSTRAP = "bootstrap_server";
    public static final String CK_TOPIC_FILTER = "topic_filter";
    public static final String CK_THREADS = "threads";
    public static final String CK_OFFSET_RESET = "offset_reset";
    public static final String CK_GROUP_ID = "group_id";
    public static final String CK_CUSTOM_PROPERTIES = "custom_properties";
    private static final String DEFAULT_GROUP_ID = "graylog2";
    private final Configuration configuration;
    private final MetricRegistry localRegistry;
    private final NodeId nodeId;
    private final EventBus serverEventBus;
    private final ServerStatus serverStatus;
    private final ScheduledExecutorService scheduler;
    private final MetricRegistry metricRegistry;
    private final AtomicLong totalBytesRead;
    private final AtomicLong lastSecBytesRead;
    private final AtomicLong lastSecBytesReadTmp;
    private final ExecutorService executor;
    private volatile boolean stopped;
    private volatile boolean paused;
    private volatile CountDownLatch pausedLatch;
    private CountDownLatch stopLatch;
    private ConsumerConnector cc;
    private static final String DEFAULT_OFFSET_RESET = "largest";
    private static final ImmutableMap<String, String> OFFSET_RESET_VALUES = ImmutableMap.of(DEFAULT_OFFSET_RESET, "Automatically reset the offset to the latest offset", "smallest", "Automatically reset the offset to the earliest offset");
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTransport.class);

    @ConfigClass
    /* loaded from: input_file:org/graylog2/inputs/transports/KafkaTransport$Config.class */
    public static class Config extends ThrottleableTransport.Config {
        @Override // org.graylog2.plugin.inputs.transports.ThrottleableTransport.Config, org.graylog2.plugin.inputs.transports.Transport.Config
        public ConfigurationRequest getRequestedConfiguration() {
            ConfigurationRequest requestedConfiguration = super.getRequestedConfiguration();
            requestedConfiguration.addField(new BooleanField(KafkaTransport.CK_LEGACY, "Legacy mode", true, "Use old ZooKeeper-based consumer API. (Used before Graylog 3.3)", 10));
            requestedConfiguration.addField(new TextField(KafkaTransport.CK_BOOTSTRAP, "Bootstrap Servers", "127.0.0.1:9092", "Comma separated list of one or more Kafka brokers. (Format: \"host1:port1,host2:port2\").Not used in legacy mode.", ConfigurationField.Optional.OPTIONAL, 11, new TextField.Attribute[0]));
            requestedConfiguration.addField(new TextField(KafkaTransport.CK_ZOOKEEPER, "ZooKeeper address (legacy mode only)", "127.0.0.1:2181", "Host and port of the ZooKeeper that is managing your Kafka cluster. Not used in consumer API (non-legacy) mode.", ConfigurationField.Optional.OPTIONAL, 12, new TextField.Attribute[0]));
            requestedConfiguration.addField(new TextField(KafkaTransport.CK_TOPIC_FILTER, "Topic filter regex", "^your-topic$", "Every topic that matches this regular expression will be consumed.", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new NumberField(KafkaTransport.CK_FETCH_MIN_BYTES, "Fetch minimum bytes", 5, "Wait for a message batch to reach at least this size or the configured maximum wait time before fetching.", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new NumberField(KafkaTransport.CK_FETCH_WAIT_MAX, "Fetch maximum wait time (ms)", 100, "Wait for this time or the configured minimum size of a message batch before fetching.", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new NumberField(KafkaTransport.CK_THREADS, "Processor threads", 2, "Number of processor threads to spawn. Use one thread per Kafka topic partition.", ConfigurationField.Optional.NOT_OPTIONAL));
            requestedConfiguration.addField(new DropdownField(KafkaTransport.CK_OFFSET_RESET, "Auto offset reset", KafkaTransport.DEFAULT_OFFSET_RESET, KafkaTransport.OFFSET_RESET_VALUES, "What to do when there is no initial offset in Kafka or if an offset is out of range", ConfigurationField.Optional.OPTIONAL));
            requestedConfiguration.addField(new TextField(KafkaTransport.CK_GROUP_ID, "Consumer group id", KafkaTransport.DEFAULT_GROUP_ID, "Name of the consumer group the Kafka input belongs to", ConfigurationField.Optional.OPTIONAL));
            requestedConfiguration.addField(new TextField(KafkaTransport.CK_CUSTOM_PROPERTIES, "Custom Kafka properties", HttpConfiguration.PATH_WEB, "A newline separated list of Kafka properties. (e.g.: \"ssl.keystore.location=/etc/graylog/server/kafka.keystore.jks\").", ConfigurationField.Optional.OPTIONAL, 200, TextField.Attribute.TEXTAREA));
            return requestedConfiguration;
        }
    }

    /* loaded from: input_file:org/graylog2/inputs/transports/KafkaTransport$ConsumerRunnable.class */
    private class ConsumerRunnable implements Runnable {
        private final Properties props;
        private final MessageInput input;
        private final KafkaConsumer<byte[], byte[]> consumer;

        public ConsumerRunnable(Properties properties, MessageInput messageInput, int i) {
            this.input = messageInput;
            Properties properties2 = (Properties) properties.clone();
            properties2.put("client.id", "gl2-" + KafkaTransport.this.nodeId + "-" + messageInput.getId() + "-" + i);
            this.props = properties2;
            this.consumer = new KafkaConsumer<>(properties);
            this.consumer.subscribe(Pattern.compile(KafkaTransport.this.configuration.getString(KafkaTransport.CK_TOPIC_FILTER)), new NoOpConsumerRebalanceListener());
        }

        private void consumeRecords(ConsumerRecords<byte[], byte[]> consumerRecords) {
            Iterator it = consumerRecords.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                if (KafkaTransport.this.paused) {
                    KafkaTransport.LOG.debug("Message processing is paused, blocking until message processing is turned back on.");
                    Uninterruptibles.awaitUninterruptibly(KafkaTransport.this.pausedLatch);
                }
                if (KafkaTransport.this.stopped) {
                    return;
                }
                if (KafkaTransport.this.isThrottled()) {
                    KafkaTransport.this.blockUntilUnthrottled();
                }
                byte[] bArr = (byte[]) consumerRecord.value();
                if (bArr != null) {
                    KafkaTransport.this.totalBytesRead.addAndGet(bArr.length);
                    KafkaTransport.this.lastSecBytesReadTmp.addAndGet(bArr.length);
                    this.input.processRawMessage(new RawMessage(bArr));
                }
            }
        }

        private Optional<ConsumerRecords<byte[], byte[]>> tryPoll() {
            try {
                ScheduledExecutorService scheduledExecutorService = KafkaTransport.this.scheduler;
                KafkaConsumer<byte[], byte[]> kafkaConsumer = this.consumer;
                Objects.requireNonNull(kafkaConsumer);
                ScheduledFuture<?> schedule = scheduledExecutorService.schedule(kafkaConsumer::wakeup, 2000L, TimeUnit.MILLISECONDS);
                ConsumerRecords poll = this.consumer.poll(1000L);
                schedule.cancel(true);
                return Optional.of(poll);
            } catch (InvalidOffsetException | AuthorizationException e) {
                KafkaTransport.LOG.error("Exception in poll.", e);
                return Optional.empty();
            } catch (WakeupException e2) {
                KafkaTransport.LOG.error("WakeupException in poll. Kafka server is not responding.");
                return Optional.empty();
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!KafkaTransport.this.stopped) {
                try {
                    Optional<ConsumerRecords<byte[], byte[]>> tryPoll = tryPoll();
                    if (tryPoll.isPresent()) {
                        try {
                            consumeRecords(tryPoll.get());
                        } catch (Exception e) {
                            KafkaTransport.LOG.error("Exception in consumer thread. Stopping input", e);
                            KafkaTransport.this.stopped = true;
                        }
                    } else {
                        KafkaTransport.LOG.error("Caught recoverable exception. Retrying");
                        Thread.sleep(2000L);
                    }
                } catch (KafkaException | InterruptedException e2) {
                    KafkaTransport.LOG.error("Caught unrecoverable exception in poll. Stopping input", e2);
                    KafkaTransport.this.stopped = true;
                }
            }
            this.consumer.commitAsync();
            KafkaTransport.this.stopLatch.countDown();
            this.consumer.close();
        }
    }

    @FactoryClass
    /* loaded from: input_file:org/graylog2/inputs/transports/KafkaTransport$Factory.class */
    public interface Factory extends Transport.Factory<KafkaTransport> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.graylog2.plugin.inputs.transports.Transport.Factory
        KafkaTransport create(Configuration configuration);

        @Override // org.graylog2.plugin.inputs.transports.Transport.Factory
        Config getConfig();
    }

    @AssistedInject
    public KafkaTransport(@Assisted Configuration configuration, LocalMetricRegistry localMetricRegistry, NodeId nodeId, EventBus eventBus, ServerStatus serverStatus, @Named("daemonScheduler") ScheduledExecutorService scheduledExecutorService) {
        super(eventBus, configuration);
        this.totalBytesRead = new AtomicLong(0L);
        this.lastSecBytesRead = new AtomicLong(0L);
        this.lastSecBytesReadTmp = new AtomicLong(0L);
        this.stopped = false;
        this.paused = true;
        this.pausedLatch = new CountDownLatch(1);
        this.configuration = configuration;
        this.localRegistry = localMetricRegistry;
        this.nodeId = nodeId;
        this.serverEventBus = eventBus;
        this.serverStatus = serverStatus;
        this.scheduler = scheduledExecutorService;
        this.metricRegistry = localMetricRegistry;
        this.executor = executorService(configuration.getInt(CK_THREADS));
        localMetricRegistry.register(ThroughputCounter.READ_BYTES_1_SEC, new Gauge<Long>() { // from class: org.graylog2.inputs.transports.KafkaTransport.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m463getValue() {
                return Long.valueOf(KafkaTransport.this.lastSecBytesRead.get());
            }
        });
        localMetricRegistry.register(ThroughputCounter.WRITTEN_BYTES_1_SEC, new Gauge<Long>() { // from class: org.graylog2.inputs.transports.KafkaTransport.2
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m464getValue() {
                return 0L;
            }
        });
        localMetricRegistry.register(ThroughputCounter.READ_BYTES_TOTAL, new Gauge<Long>() { // from class: org.graylog2.inputs.transports.KafkaTransport.3
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m465getValue() {
                return Long.valueOf(KafkaTransport.this.totalBytesRead.get());
            }
        });
        localMetricRegistry.register(ThroughputCounter.WRITTEN_BYTES_TOTAL, new Gauge<Long>() { // from class: org.graylog2.inputs.transports.KafkaTransport.4
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m466getValue() {
                return 0L;
            }
        });
    }

    @Subscribe
    public void lifecycleStateChange(Lifecycle lifecycle) {
        LOG.debug("Lifecycle changed to {}", lifecycle);
        switch (lifecycle) {
            case PAUSED:
            case FAILED:
            case HALTING:
                this.pausedLatch = new CountDownLatch(1);
                this.paused = true;
                return;
            default:
                this.paused = false;
                this.pausedLatch.countDown();
                return;
        }
    }

    @Override // org.graylog2.plugin.inputs.transports.Transport
    public void setMessageAggregator(CodecAggregator codecAggregator) {
    }

    @Override // org.graylog2.plugin.inputs.transports.ThrottleableTransport
    public void doLaunch(MessageInput messageInput) {
        boolean z = this.configuration.getBoolean(CK_LEGACY, true);
        if (z) {
            if (Strings.isNullOrEmpty(this.configuration.getString(CK_ZOOKEEPER))) {
                throw new IllegalArgumentException("ZooKeeper configuration setting cannot be empty");
            }
        } else if (Strings.isNullOrEmpty(this.configuration.getString(CK_BOOTSTRAP))) {
            throw new IllegalArgumentException("Bootstrap server configuration setting cannot be empty");
        }
        this.serverStatus.awaitRunning(() -> {
            lifecycleStateChange(Lifecycle.RUNNING);
        });
        this.serverEventBus.register(this);
        if (z) {
            doLaunchLegacy(messageInput);
        } else {
            doLaunchConsumer(messageInput);
        }
        this.scheduler.scheduleAtFixedRate(() -> {
            this.lastSecBytesRead.set(this.lastSecBytesReadTmp.getAndSet(0L));
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    private void doLaunchConsumer(MessageInput messageInput) {
        Properties properties = new Properties();
        properties.put("group.id", this.configuration.getString(CK_GROUP_ID, DEFAULT_GROUP_ID));
        properties.put("fetch.min.bytes", String.valueOf(this.configuration.getInt(CK_FETCH_MIN_BYTES)));
        properties.put("fetch.max.wait.ms", String.valueOf(this.configuration.getInt(CK_FETCH_WAIT_MAX)));
        properties.put("bootstrap.servers", this.configuration.getString(CK_BOOTSTRAP));
        properties.put("auto.offset.reset", this.configuration.getString(CK_OFFSET_RESET, DEFAULT_OFFSET_RESET).equals(DEFAULT_OFFSET_RESET) ? "latest" : "earliest");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("key.deserializer", ByteArrayDeserializer.class.getName());
        properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
        insertCustomProperties(properties);
        int i = this.configuration.getInt(CK_THREADS);
        this.stopLatch = new CountDownLatch(i);
        IntStream.range(0, i).forEach(i2 -> {
            this.executor.submit(new ConsumerRunnable(properties, messageInput, i2));
        });
    }

    private void doLaunchLegacy(final MessageInput messageInput) {
        Properties properties = new Properties();
        properties.put("group.id", this.configuration.getString(CK_GROUP_ID, DEFAULT_GROUP_ID));
        properties.put("client.id", "gl2-" + this.nodeId + "-" + messageInput.getId());
        properties.put("fetch.min.bytes", String.valueOf(this.configuration.getInt(CK_FETCH_MIN_BYTES)));
        properties.put("fetch.wait.max.ms", String.valueOf(this.configuration.getInt(CK_FETCH_WAIT_MAX)));
        properties.put("zookeeper.connect", this.configuration.getString(CK_ZOOKEEPER));
        properties.put("auto.offset.reset", this.configuration.getString(CK_OFFSET_RESET, DEFAULT_OFFSET_RESET));
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("consumer.timeout.ms", "1000");
        insertCustomProperties(properties);
        int i = this.configuration.getInt(CK_THREADS);
        this.cc = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
        List<KafkaStream> createMessageStreamsByFilter = this.cc.createMessageStreamsByFilter(new Whitelist(this.configuration.getString(CK_TOPIC_FILTER)), i);
        this.stopLatch = new CountDownLatch(createMessageStreamsByFilter.size());
        for (final KafkaStream kafkaStream : createMessageStreamsByFilter) {
            this.executor.submit(new Runnable() { // from class: org.graylog2.inputs.transports.KafkaTransport.5
                @Override // java.lang.Runnable
                public void run() {
                    ConsumerIterator it = kafkaStream.iterator();
                    do {
                        boolean z = false;
                        while (it.hasNext()) {
                            try {
                                if (KafkaTransport.this.paused) {
                                    KafkaTransport.LOG.debug("Message processing is paused, blocking until message processing is turned back on.");
                                    Uninterruptibles.awaitUninterruptibly(KafkaTransport.this.pausedLatch);
                                }
                                if (KafkaTransport.this.stopped) {
                                    break;
                                }
                                if (KafkaTransport.this.isThrottled()) {
                                    KafkaTransport.this.blockUntilUnthrottled();
                                }
                                byte[] bArr = (byte[]) it.next().message();
                                if (bArr != null) {
                                    KafkaTransport.this.totalBytesRead.addAndGet(bArr.length);
                                    KafkaTransport.this.lastSecBytesReadTmp.addAndGet(bArr.length);
                                    messageInput.processRawMessage(new RawMessage(bArr));
                                }
                            } catch (ConsumerTimeoutException e) {
                                z = true;
                            } catch (Exception e2) {
                                KafkaTransport.LOG.error("Kafka consumer error, stopping consumer thread.", e2);
                            }
                        }
                        if (!z) {
                            break;
                        }
                    } while (!KafkaTransport.this.stopped);
                    KafkaTransport.this.cc.commitOffsets();
                    KafkaTransport.this.stopLatch.countDown();
                }
            });
        }
    }

    private void insertCustomProperties(Properties properties) {
        try {
            Properties properties2 = new Properties();
            properties2.load(new ByteArrayInputStream(this.configuration.getString(CK_CUSTOM_PROPERTIES, HttpConfiguration.PATH_WEB).getBytes(StandardCharsets.UTF_8)));
            properties.putAll(properties2);
        } catch (IOException e) {
            LOG.error("Failed to read custom properties", e);
        }
    }

    private ExecutorService executorService(int i) {
        return new InstrumentedExecutorService(Executors.newFixedThreadPool(i, new ThreadFactoryBuilder().setNameFormat("kafka-transport-%d").build()), this.metricRegistry, MetricRegistry.name(getClass(), new String[]{"executor-service"}));
    }

    @Override // org.graylog2.plugin.inputs.transports.ThrottleableTransport
    public void doStop() {
        this.stopped = true;
        this.serverEventBus.unregister(this);
        if (this.stopLatch != null) {
            try {
                if (this.pausedLatch != null && this.pausedLatch.getCount() > 0) {
                    this.pausedLatch.countDown();
                }
                boolean await = this.stopLatch.await(5L, TimeUnit.SECONDS);
                this.stopLatch = null;
                if (!await) {
                    LOG.info("Stopping Kafka input timed out (waited 5 seconds for consumer threads to stop). Forcefully closing connection now. This is usually harmless when stopping the input.");
                }
            } catch (InterruptedException e) {
                LOG.debug("Interrupted while waiting to stop input.");
            }
        }
        if (this.cc != null) {
            this.cc.shutdown();
            this.cc = null;
        }
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
            LOG.error("Interrupted in transport executor shutdown.");
        }
    }

    @Override // org.graylog2.plugin.inputs.transports.Transport
    public MetricSet getMetricSet() {
        return this.localRegistry;
    }
}
