package org.graylog2.plugin.inputs.transports;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.graylog2.plugin.ThrottleState;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.BooleanField;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.inputs.transports.Transport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/plugin/inputs/transports/ThrottleableTransport.class */
public abstract class ThrottleableTransport implements Transport {
    private static final Logger log = LoggerFactory.getLogger(ThrottleableTransport.class);
    public static final String CK_THROTTLING_ALLOWED = "throttling_allowed";
    private final boolean throttlingAllowed;
    private final EventBus eventBus;
    private long lastUncommitted;
    private final AtomicBoolean currentlyThrottled = new AtomicBoolean(false);
    private volatile CountDownLatch blockLatch = null;

    /* loaded from: input_file:org/graylog2/plugin/inputs/transports/ThrottleableTransport$Config.class */
    public static class Config implements Transport.Config {
        @Override // org.graylog2.plugin.inputs.transports.Transport.Config
        public ConfigurationRequest getRequestedConfiguration() {
            ConfigurationRequest configurationRequest = new ConfigurationRequest();
            configurationRequest.addField(new BooleanField(ThrottleableTransport.CK_THROTTLING_ALLOWED, "Allow throttling this input.", false, "If enabled, no new messages will be read from this input until Graylog catches up with its message load. This is typically useful for inputs reading from files or message queue systems like AMQP or Kafka. If you regularly poll an external system, e.g. via HTTP, you normally want to leave this disabled."));
            return configurationRequest;
        }
    }

    public ThrottleableTransport(EventBus eventBus, Configuration configuration) {
        this.eventBus = eventBus;
        this.throttlingAllowed = configuration.getBoolean(CK_THROTTLING_ALLOWED);
    }

    @Override // org.graylog2.plugin.inputs.transports.Transport
    public void launch(MessageInput messageInput) throws MisfireException {
        doLaunch(messageInput);
        if (this.throttlingAllowed) {
            this.eventBus.register(this);
        }
    }

    protected abstract void doLaunch(MessageInput messageInput) throws MisfireException;

    @Override // org.graylog2.plugin.inputs.transports.Transport
    public void stop() {
        if (this.currentlyThrottled.get()) {
            this.blockLatch.countDown();
        }
        doStop();
        if (this.throttlingAllowed) {
            try {
                this.eventBus.unregister(this);
            } catch (IllegalArgumentException e) {
            }
        }
    }

    protected abstract void doStop();

    @Subscribe
    public void updateThrottleState(ThrottleState throttleState) {
        if (this.throttlingAllowed) {
            boolean determineIfThrottled = determineIfThrottled(throttleState);
            if (!this.currentlyThrottled.get()) {
                if (determineIfThrottled) {
                    this.currentlyThrottled.set(true);
                    this.blockLatch = new CountDownLatch(1);
                    return;
                }
                return;
            }
            if (determineIfThrottled) {
                return;
            }
            if (this.blockLatch == null) {
                log.error("Expected to see a transport throttle latch, but it is missing. This is a bug, continuing anyway.");
            } else {
                this.currentlyThrottled.set(false);
                this.blockLatch.countDown();
            }
        }
    }

    public boolean isThrottled() {
        return this.throttlingAllowed && this.currentlyThrottled.get();
    }

    protected boolean determineIfThrottled(ThrottleState throttleState) {
        long j = this.lastUncommitted;
        this.lastUncommitted = throttleState.uncommittedJournalEntries;
        String simpleName = getClass().getSimpleName();
        log.debug("Checking if transport {} should be throttled {}", simpleName, throttleState);
        if (throttleState.uncommittedJournalEntries == 0) {
            log.debug("[{}] [unthrottled] journal empty", simpleName);
            return false;
        }
        if (throttleState.uncommittedJournalEntries > 100000) {
            log.debug("[{}] [throttled] number of unread journal entries is larger than 100.000 entries: {}", simpleName, Long.valueOf(throttleState.uncommittedJournalEntries));
            return true;
        }
        if (throttleState.uncommittedJournalEntries - j > 20000) {
            log.debug("[{}] [throttled] number of unread journal entries is growing by more than 20.000 entries: {}", simpleName, Long.valueOf(throttleState.uncommittedJournalEntries - j));
            return true;
        }
        if (throttleState.processBufferCapacity == 0) {
            log.debug("[{}] [throttled] no capacity in process buffer", simpleName);
            return true;
        }
        if (throttleState.appendEventsPerSec == 0 && throttleState.readEventsPerSec == 0 && throttleState.processBufferCapacity > 0) {
            log.debug("[{}] [unthrottled] no incoming messages and nothing read from journal even if we could", simpleName);
            return false;
        }
        if ((throttleState.journalSize / throttleState.journalSizeLimit) * 100.0d > 90.0d) {
            log.debug("[{}] [throttled] journal more than 90% full", simpleName);
            return true;
        }
        if ((throttleState.readEventsPerSec / throttleState.appendEventsPerSec) * 100.0d < 50.0d) {
            log.debug("[{}] [throttled] write rate is more than twice as high than read rate", simpleName);
            return true;
        }
        log.debug("[{}] [unthrottled] fall through", simpleName);
        return false;
    }

    public void blockUntilUnthrottled() {
        if (this.blockLatch == null) {
            return;
        }
        try {
            this.blockLatch.await();
        } catch (InterruptedException e) {
        }
    }
}
