/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.plugin.inputs.transports;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.graylog2.plugin.ThrottleState;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.BooleanField;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.inputs.transports.Transport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ThrottleableTransport
implements Transport {
    private static final Logger log = LoggerFactory.getLogger(ThrottleableTransport.class);
    public static final String CK_THROTTLING_ALLOWED = "throttling_allowed";
    private final boolean throttlingAllowed;
    private final AtomicBoolean currentlyThrottled = new AtomicBoolean(false);
    private final EventBus eventBus;
    private volatile CountDownLatch blockLatch = null;
    private long lastUncommitted;

    public ThrottleableTransport(EventBus eventBus, Configuration configuration) {
        this.eventBus = eventBus;
        this.throttlingAllowed = configuration.getBoolean(CK_THROTTLING_ALLOWED);
    }

    @Override
    public void launch(MessageInput input) throws MisfireException {
        this.doLaunch(input);
        if (this.throttlingAllowed) {
            this.eventBus.register((Object)this);
        }
    }

    protected abstract void doLaunch(MessageInput var1) throws MisfireException;

    @Override
    public void stop() {
        if (this.currentlyThrottled.get()) {
            this.blockLatch.countDown();
        }
        this.doStop();
        if (this.throttlingAllowed) {
            try {
                this.eventBus.unregister((Object)this);
            }
            catch (IllegalArgumentException illegalArgumentException) {
                // empty catch block
            }
        }
    }

    protected abstract void doStop();

    @Subscribe
    public void updateThrottleState(ThrottleState throttleState) {
        if (!this.throttlingAllowed) {
            return;
        }
        boolean throttled = this.determineIfThrottled(throttleState);
        if (this.currentlyThrottled.get()) {
            if (throttled) {
                return;
            }
            if (this.blockLatch == null) {
                log.error("Expected to see a transport throttle latch, but it is missing. This is a bug, continuing anyway.");
                return;
            }
            this.currentlyThrottled.set(false);
            this.blockLatch.countDown();
        } else if (throttled) {
            this.currentlyThrottled.set(true);
            this.blockLatch = new CountDownLatch(1);
        }
    }

    public boolean isThrottled() {
        return this.throttlingAllowed && this.currentlyThrottled.get();
    }

    protected boolean determineIfThrottled(ThrottleState state) {
        long prevUncommitted = this.lastUncommitted;
        this.lastUncommitted = state.uncommittedJournalEntries;
        String transportName = this.getClass().getSimpleName();
        log.debug("Checking if transport {} should be throttled {}", (Object)transportName, (Object)state);
        if (state.uncommittedJournalEntries == 0L) {
            log.debug("[{}] [unthrottled] journal empty", (Object)transportName);
            return false;
        }
        if (state.uncommittedJournalEntries > 100000L) {
            log.debug("[{}] [throttled] number of unread journal entries is larger than 100.000 entries: {}", (Object)transportName, (Object)state.uncommittedJournalEntries);
            return true;
        }
        if (state.uncommittedJournalEntries - prevUncommitted > 20000L) {
            log.debug("[{}] [throttled] number of unread journal entries is growing by more than 20.000 entries: {}", (Object)transportName, (Object)(state.uncommittedJournalEntries - prevUncommitted));
            return true;
        }
        if (state.processBufferCapacity == 0L) {
            log.debug("[{}] [throttled] no capacity in process buffer", (Object)transportName);
            return true;
        }
        if (state.appendEventsPerSec == 0L && state.readEventsPerSec == 0L && state.processBufferCapacity > 0L) {
            log.debug("[{}] [unthrottled] no incoming messages and nothing read from journal even if we could", (Object)transportName);
            return false;
        }
        if ((double)state.journalSize / (double)state.journalSizeLimit * 100.0 > 90.0) {
            log.debug("[{}] [throttled] journal more than 90% full", (Object)transportName);
            return true;
        }
        if ((double)state.readEventsPerSec / (double)state.appendEventsPerSec * 100.0 < 50.0) {
            log.debug("[{}] [throttled] write rate is more than twice as high than read rate", (Object)transportName);
            return true;
        }
        log.debug("[{}] [unthrottled] fall through", (Object)transportName);
        return false;
    }

    public void blockUntilUnthrottled() {
        if (this.blockLatch == null) {
            return;
        }
        try {
            this.blockLatch.await();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public static class Config
    implements Transport.Config {
        @Override
        public ConfigurationRequest getRequestedConfiguration() {
            ConfigurationRequest request = new ConfigurationRequest();
            request.addField(new BooleanField(ThrottleableTransport.CK_THROTTLING_ALLOWED, "Allow throttling this input.", false, "If enabled, no new messages will be read from this input until Graylog catches up with its message load. This is typically useful for inputs reading from files or message queue systems like AMQP or Kafka. If you regularly poll an external system, e.g. via HTTP, you normally want to leave this disabled."));
            return request;
        }
    }
}

