package org.graylog2.streams;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.SimpleTimeLimiter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.TimeLimiter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import org.graylog2.Configuration;
import org.graylog2.database.NotFoundException;
import org.graylog2.database.ValidationException;
import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.plugin.streams.StreamRule;
import org.graylog2.streams.matchers.StreamRuleMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/streams/StreamRouter.class */
public class StreamRouter {
    private static final Logger LOG = LoggerFactory.getLogger(StreamRouter.class);
    protected final StreamService streamService;
    protected final StreamRuleService streamRuleService;
    private final MetricRegistry metricRegistry;
    private final Configuration configuration;
    private final NotificationService notificationService;
    private final Map<String, Meter> streamIncomingMeters = Maps.newHashMap();
    private final Map<String, Timer> streamExecutionTimers = Maps.newHashMap();
    private final Map<String, Meter> streamExceptionMeters = Maps.newHashMap();
    private final Map<String, Meter> streamRuleTimeoutMeters = Maps.newHashMap();
    private final Map<String, Meter> streamFaultsExceededMeters = Maps.newHashMap();
    private final ConcurrentMap<String, AtomicInteger> faultCounter = Maps.newConcurrentMap();
    private final ExecutorService executor = executorService();
    private final TimeLimiter timeLimiter = new SimpleTimeLimiter(this.executor);

    @Inject
    public StreamRouter(StreamService streamService, StreamRuleService streamRuleService, MetricRegistry metricRegistry, Configuration configuration, NotificationService notificationService) {
        this.streamService = streamService;
        this.streamRuleService = streamRuleService;
        this.metricRegistry = metricRegistry;
        this.configuration = configuration;
        this.notificationService = notificationService;
    }

    private ExecutorService executorService() {
        return Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("stream-router-%d").setDaemon(true).build());
    }

    private AtomicInteger getFaultCount(String str) {
        this.faultCounter.putIfAbsent(str, new AtomicInteger());
        return this.faultCounter.get(str);
    }

    public List<Stream> route(final Message message) {
        Timer.Context time;
        Throwable th;
        ArrayList newArrayList = Lists.newArrayList();
        List<Stream> streams = getStreams();
        long streamProcessingTimeout = this.configuration.getStreamProcessingTimeout();
        int streamProcessingMaxFaults = this.configuration.getStreamProcessingMaxFaults();
        for (final Stream stream : streams) {
            Timer executionTimer = getExecutionTimer(stream.getId());
            Callable<Boolean> callable = new Callable<Boolean>() { // from class: org.graylog2.streams.StreamRouter.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Boolean call() {
                    return Boolean.valueOf(StreamRouter.this.doesStreamMatch(StreamRouter.this.getRuleMatches(stream, message)));
                }
            };
            try {
                time = executionTimer.time();
                th = null;
            } catch (Exception e) {
                AtomicInteger faultCount = getFaultCount(stream.getId());
                int incrementAndGet = faultCount.incrementAndGet();
                getStreamRuleTimeoutMeter(stream.getId()).mark();
                if (streamProcessingMaxFaults <= 0 || incrementAndGet < streamProcessingMaxFaults) {
                    LOG.warn("Processing of stream <{}> failed to return within {}ms.", stream.getId(), Long.valueOf(streamProcessingTimeout));
                } else {
                    try {
                        this.streamService.pause(stream);
                        faultCount.set(0);
                        getStreamFaultsExceededMeter(stream.getId()).mark();
                        LOG.error("Processing of stream <" + stream.getId() + "> failed to return within " + streamProcessingTimeout + "ms for more than " + streamProcessingMaxFaults + " times. Disabling stream.");
                        this.notificationService.publishIfFirst(this.notificationService.buildNow().addType(Notification.Type.STREAM_PROCESSING_DISABLED).addSeverity(Notification.Severity.URGENT).addDetail("stream_id", stream.getId()).addDetail("fault_count", Integer.valueOf(incrementAndGet)));
                    } catch (ValidationException e2) {
                        LOG.error("Unable to pause stream: {}", (Throwable) e2);
                    }
                }
            }
            try {
                try {
                    if (((Boolean) this.timeLimiter.callWithTimeout(callable, streamProcessingTimeout, TimeUnit.MILLISECONDS, true)).booleanValue()) {
                        getIncomingMeter(stream.getId()).mark();
                        newArrayList.add(stream);
                    }
                    if (time != null) {
                        if (0 != 0) {
                            try {
                                time.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            time.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (time != null) {
                        if (th != null) {
                            try {
                                time.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            time.close();
                        }
                    }
                    throw th3;
                    break;
                }
            } catch (Throwable th5) {
                th = th5;
                throw th5;
                break;
            }
        }
        return newArrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<Stream> getStreams() {
        return this.streamService.loadAllEnabled();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<StreamRule> getStreamRules(Stream stream) {
        try {
            return this.streamRuleService.loadForStream(stream);
        } catch (NotFoundException e) {
            LOG.error("Caught exception while fetching stream rules", (Throwable) e);
            return Collections.emptyList();
        }
    }

    public Map<StreamRule, Boolean> getRuleMatches(Stream stream, Message message) {
        HashMap newHashMap = Maps.newHashMap();
        for (StreamRule streamRule : getStreamRules(stream)) {
            try {
                newHashMap.put(streamRule, Boolean.valueOf(matchStreamRule(message, StreamRuleMatcherFactory.build(streamRule.getType()), streamRule)));
            } catch (InvalidStreamRuleTypeException e) {
                LOG.warn("Invalid stream rule type. Skipping matching for this rule. " + e.getMessage(), (Throwable) e);
            }
        }
        return newHashMap;
    }

    public boolean doesStreamMatch(Map<StreamRule, Boolean> map) {
        return (map.isEmpty() || map.values().contains(false)) ? false : true;
    }

    public boolean matchStreamRule(Message message, StreamRuleMatcher streamRuleMatcher, StreamRule streamRule) {
        try {
            return streamRuleMatcher.match(message, streamRule);
        } catch (Exception e) {
            LOG.debug("Could not match stream rule <" + streamRule.getType() + "/" + streamRule.getValue() + ">: " + e.getMessage(), (Throwable) e);
            getExceptionMeter(streamRule.getStreamId()).mark();
            return false;
        }
    }

    protected Meter getIncomingMeter(String str) {
        Meter meter = this.streamIncomingMeters.get(str);
        if (meter == null) {
            meter = this.metricRegistry.meter(MetricRegistry.name((Class<?>) Stream.class, str, "incomingMessages"));
            this.streamIncomingMeters.put(str, meter);
        }
        return meter;
    }

    protected Timer getExecutionTimer(String str) {
        Timer timer = this.streamExecutionTimers.get(str);
        if (timer == null) {
            timer = this.metricRegistry.timer(MetricRegistry.name((Class<?>) Stream.class, str, "executionTime"));
            this.streamExecutionTimers.put(str, timer);
        }
        return timer;
    }

    protected Meter getExceptionMeter(String str) {
        Meter meter = this.streamExceptionMeters.get(str);
        if (meter == null) {
            meter = this.metricRegistry.meter(MetricRegistry.name((Class<?>) Stream.class, str, "matchingExceptions"));
            this.streamExceptionMeters.put(str, meter);
        }
        return meter;
    }

    protected Meter getStreamRuleTimeoutMeter(String str) {
        Meter meter = this.streamRuleTimeoutMeters.get(str);
        if (meter == null) {
            meter = this.metricRegistry.meter(MetricRegistry.name((Class<?>) Stream.class, str, "ruleTimeouts"));
            this.streamRuleTimeoutMeters.put(str, meter);
        }
        return meter;
    }

    protected Meter getStreamFaultsExceededMeter(String str) {
        Meter meter = this.streamFaultsExceededMeters.get(str);
        if (meter == null) {
            meter = this.metricRegistry.meter(MetricRegistry.name((Class<?>) Stream.class, str, "faultsExceeded"));
            this.streamFaultsExceededMeters.put(str, meter);
        }
        return meter;
    }
}
