/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.plugins.pipelineprocessor.processors;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import javax.inject.Named;
import org.graylog.plugins.pipelineprocessor.EvaluationContext;
import org.graylog.plugins.pipelineprocessor.ast.Pipeline;
import org.graylog.plugins.pipelineprocessor.ast.Rule;
import org.graylog.plugins.pipelineprocessor.ast.Stage;
import org.graylog.plugins.pipelineprocessor.ast.statements.Statement;
import org.graylog.plugins.pipelineprocessor.codegen.GeneratedRule;
import org.graylog.plugins.pipelineprocessor.db.RuleMetricsConfigDto;
import org.graylog.plugins.pipelineprocessor.processors.ConfigurationStateUpdater;
import org.graylog.plugins.pipelineprocessor.processors.StageIterator;
import org.graylog.plugins.pipelineprocessor.processors.listeners.InterpreterListener;
import org.graylog.plugins.pipelineprocessor.processors.listeners.NoopInterpreterListener;
import org.graylog.plugins.pipelineprocessor.processors.listeners.RuleMetricsListener;
import org.graylog2.metrics.CacheStatsSet;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.MessageCollection;
import org.graylog2.plugin.Messages;
import org.graylog2.plugin.database.Persisted;
import org.graylog2.plugin.messageprocessors.MessageProcessor;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.shared.buffers.processors.ProcessBufferProcessor;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.graylog2.shared.metrics.MetricUtils;
import org.graylog2.shared.utilities.ExceptionUtils;
import org.jooq.lambda.tuple.Tuple;
import org.jooq.lambda.tuple.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipelineInterpreter
implements MessageProcessor {
    private static final Logger log = LoggerFactory.getLogger(PipelineInterpreter.class);
    private final MessageQueueAcknowledger messageQueueAcknowledger;
    private final Meter filteredOutMessages;
    private final Timer executionTime;
    private final MetricRegistry metricRegistry;
    private final ConfigurationStateUpdater stateUpdater;

    @Inject
    public PipelineInterpreter(MessageQueueAcknowledger messageQueueAcknowledger, MetricRegistry metricRegistry, ConfigurationStateUpdater stateUpdater) {
        this.messageQueueAcknowledger = messageQueueAcknowledger;
        this.filteredOutMessages = metricRegistry.meter(MetricRegistry.name(ProcessBufferProcessor.class, (String[])new String[]{"filteredOutMessages"}));
        this.executionTime = metricRegistry.timer(MetricRegistry.name(PipelineInterpreter.class, (String[])new String[]{"executionTime"}));
        this.metricRegistry = metricRegistry;
        this.stateUpdater = stateUpdater;
    }

    @Override
    public Messages process(Messages messages) {
        try (Timer.Context ignored = this.executionTime.time();){
            State latestState = this.stateUpdater.getLatestState();
            if (latestState.enableRuleMetrics()) {
                Messages messages2 = this.process(messages, new RuleMetricsListener(this.metricRegistry), latestState);
                return messages2;
            }
            Messages messages3 = this.process(messages, new NoopInterpreterListener(), latestState);
            return messages3;
        }
    }

    public Messages process(Messages messages, InterpreterListener interpreterListener, State state) {
        interpreterListener.startProcessing();
        HashSet processingBlacklist = Sets.newHashSet();
        ArrayList toProcess = Lists.newArrayList((Iterable)messages);
        ArrayList fullyProcessed = Lists.newArrayListWithExpectedSize((int)toProcess.size());
        while (!toProcess.isEmpty()) {
            MessageCollection currentSet = new MessageCollection(toProcess);
            toProcess.clear();
            for (Message message : currentSet) {
                String msgId = message.getId();
                Set<String> initialStreamIds = message.getStreams().stream().map(Persisted::getId).collect(Collectors.toSet());
                ImmutableSet<Pipeline> pipelinesToRun = this.selectPipelines(interpreterListener, processingBlacklist, message, initialStreamIds, state.getStreamPipelineConnections());
                toProcess.addAll(this.processForResolvedPipelines(message, msgId, (Set<Pipeline>)pipelinesToRun, interpreterListener, state));
                boolean addedStreams = this.updateStreamBlacklist(processingBlacklist, message, initialStreamIds);
                this.potentiallyDropFilteredMessage(message);
                if (!addedStreams || message.getFilterOut()) {
                    log.debug("[{}] no new streams matches or dropped message, not running again", (Object)msgId);
                    fullyProcessed.add(message);
                    continue;
                }
                log.debug("[{}] new streams assigned, running again for those streams", (Object)msgId);
                toProcess.add(message);
            }
        }
        interpreterListener.finishProcessing();
        return new MessageCollection(fullyProcessed);
    }

    private void potentiallyDropFilteredMessage(Message message) {
        if (message.getFilterOut()) {
            log.debug("[{}] marked message to be discarded. Dropping message.", (Object)message.getId());
            this.filteredOutMessages.mark();
            this.messageQueueAcknowledger.acknowledge(message);
        }
    }

    private boolean updateStreamBlacklist(Set<Tuple2<String, String>> processingBlacklist, Message message, Set<String> initialStreamIds) {
        boolean addedStreams = false;
        for (Stream stream : message.getStreams()) {
            if (!initialStreamIds.remove(stream.getId())) {
                addedStreams = true;
                continue;
            }
            processingBlacklist.add((Tuple2<String, String>)Tuple.tuple((Object)message.getId(), (Object)stream.getId()));
        }
        return addedStreams;
    }

    private ImmutableSet<Pipeline> selectPipelines(InterpreterListener interpreterListener, Set<Tuple2<String, String>> processingBlacklist, Message message, Set<String> initialStreamIds, ImmutableSetMultimap<String, Pipeline> streamConnection) {
        String msgId = message.getId();
        Set<String> streamsIds = initialStreamIds.stream().filter(streamId -> !processingBlacklist.contains(Tuple.tuple((Object)msgId, (Object)streamId))).filter(arg_0 -> streamConnection.containsKey(arg_0)).collect(Collectors.toSet());
        ImmutableSet pipelinesToRun = (ImmutableSet)streamsIds.stream().flatMap(streamId -> streamConnection.get(streamId).stream()).collect(ImmutableSet.toImmutableSet());
        interpreterListener.processStreams(message, (Set<Pipeline>)pipelinesToRun, streamsIds);
        log.debug("[{}] running pipelines {} for streams {}", new Object[]{msgId, pipelinesToRun, streamsIds});
        return pipelinesToRun;
    }

    public List<Message> processForPipelines(Message message, Set<String> pipelineIds, InterpreterListener interpreterListener, State state) {
        ImmutableMap<String, Pipeline> currentPipelines = state.getCurrentPipelines();
        ImmutableSet pipelinesToRun = (ImmutableSet)pipelineIds.stream().map(arg_0 -> currentPipelines.get(arg_0)).filter(Objects::nonNull).collect(ImmutableSet.toImmutableSet());
        return this.processForResolvedPipelines(message, message.getId(), (Set<Pipeline>)pipelinesToRun, interpreterListener, state);
    }

    private List<Message> processForResolvedPipelines(Message message, String msgId, Set<Pipeline> pipelines, InterpreterListener interpreterListener, State state) {
        ArrayList<Message> result = new ArrayList<Message>();
        pipelines.forEach(Pipeline::markExecution);
        StageIterator stages = state.getStageIterator(pipelines);
        HashSet pipelinesToSkip = Sets.newHashSet();
        while (stages.hasNext()) {
            List stageSet = (List)stages.next();
            for (Stage stage : stageSet) {
                this.evaluateStage(stage, message, msgId, result, pipelinesToSkip, interpreterListener);
            }
        }
        return result;
    }

    private void evaluateStage(Stage stage, Message message, String msgId, List<Message> result, Set<Pipeline> pipelinesToSkip, InterpreterListener interpreterListener) {
        boolean matchEitherSuccess;
        Pipeline pipeline = stage.getPipeline();
        if (pipelinesToSkip.contains(pipeline)) {
            log.debug("[{}] previous stage result prevents further processing of pipeline `{}`", (Object)msgId, (Object)pipeline.name());
            return;
        }
        stage.markExecution();
        interpreterListener.enterStage(stage);
        log.debug("[{}] evaluating rule conditions in stage {}: match {}", new Object[]{msgId, stage.stage(), stage.matchAll() ? "all" : "either"});
        EvaluationContext context = new EvaluationContext(message);
        List<Rule> stageRules = stage.getRules();
        ArrayList<Rule> rulesToRun = new ArrayList<Rule>(stageRules.size());
        boolean anyRulesMatched = stageRules.isEmpty();
        boolean allRulesMatched = true;
        for (Rule rule : stageRules) {
            try {
                boolean ruleCondition = this.evaluateRuleCondition(rule, message, msgId, pipeline, context, rulesToRun, interpreterListener);
                anyRulesMatched |= ruleCondition;
                allRulesMatched &= ruleCondition;
            }
            catch (Exception e) {
                log.warn("Error evaluating condition for rule <{}/{}> with message: {} (Error: {})", new Object[]{rule.name(), rule.id(), message, e.getMessage()});
                throw e;
            }
        }
        for (Rule rule : rulesToRun) {
            if (this.executeRuleActions(rule, message, msgId, pipeline, context, interpreterListener)) continue;
            EvaluationContext.EvalError lastError = (EvaluationContext.EvalError)Iterables.getLast(context.evaluationErrors());
            log.warn("Error evaluating action for rule <{}/{}> with message: {} (Error: {})", new Object[]{rule.name(), rule.id(), message, lastError});
            break;
        }
        boolean matchAllSuccess = stage.matchAll() && allRulesMatched;
        boolean bl = matchEitherSuccess = !stage.matchAll() && anyRulesMatched;
        if (matchAllSuccess || matchEitherSuccess) {
            interpreterListener.continuePipelineExecution(pipeline, stage);
            log.debug("[{}] stage {} for pipeline `{}` required match: {}, ok to proceed with next stage", new Object[]{msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either"});
        } else {
            interpreterListener.stopPipelineExecution(pipeline, stage);
            log.debug("[{}] stage {} for pipeline `{}` required match: {}, NOT ok to proceed with next stage", new Object[]{msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either"});
            pipelinesToSkip.add(pipeline);
        }
        Iterables.addAll(result, (Iterable)context.createdMessages());
        context.clearCreatedMessages();
        interpreterListener.exitStage(stage);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    private boolean executeRuleActions(Rule rule, Message message, String msgId, Pipeline pipeline, EvaluationContext context, InterpreterListener interpreterListener) {
        rule.markExecution();
        interpreterListener.executeRule(rule, pipeline);
        try {
            log.debug("[{}] rule `{}` matched running actions", (Object)msgId, (Object)rule.name());
            GeneratedRule generatedRule = rule.generatedRule();
            if (generatedRule != null) {
                try {
                    generatedRule.then(context);
                    boolean bl = true;
                    return bl;
                }
                catch (Exception ignored) {
                    EvaluationContext.EvalError lastError = (EvaluationContext.EvalError)Iterables.getLast(context.evaluationErrors());
                    this.appendProcessingError(rule, message, lastError.toString());
                    log.debug("Encountered evaluation error, skipping rest of the rule: {}", (Object)lastError);
                    rule.markFailure();
                    boolean bl = false;
                    interpreterListener.finishExecuteRule(rule, pipeline);
                    return bl;
                }
            }
            if (ConfigurationStateUpdater.isAllowCodeGeneration()) {
                throw new IllegalStateException("Should have generated code and not interpreted the tree");
            }
            for (Statement statement : rule.then()) {
                if (this.evaluateStatement(message, interpreterListener, pipeline, context, rule, statement)) continue;
                boolean bl = false;
                return bl;
            }
            boolean bl = true;
            return bl;
            {
                catch (Throwable throwable) {
                    throw throwable;
                }
            }
        }
        finally {
            interpreterListener.finishExecuteRule(rule, pipeline);
        }
    }

    private boolean evaluateStatement(Message message, InterpreterListener interpreterListener, Pipeline pipeline, EvaluationContext context, Rule rule, Statement statement) {
        statement.evaluate(context);
        if (context.hasEvaluationErrors()) {
            EvaluationContext.EvalError lastError = (EvaluationContext.EvalError)Iterables.getLast(context.evaluationErrors());
            this.appendProcessingError(rule, message, lastError.toString());
            interpreterListener.failExecuteRule(rule, pipeline);
            log.debug("Encountered evaluation error, skipping rest of the rule: {}", (Object)lastError);
            rule.markFailure();
            return false;
        }
        return true;
    }

    private boolean evaluateRuleCondition(Rule rule, Message message, String msgId, Pipeline pipeline, EvaluationContext context, List<Rule> rulesToRun, InterpreterListener interpreterListener) {
        boolean matched;
        interpreterListener.evaluateRule(rule, pipeline);
        GeneratedRule generatedRule = rule.generatedRule();
        boolean bl = matched = generatedRule != null ? generatedRule.when(context) : rule.when().evaluateBool(context);
        if (matched) {
            rule.markMatch();
            if (context.hasEvaluationErrors()) {
                EvaluationContext.EvalError lastError = (EvaluationContext.EvalError)Iterables.getLast(context.evaluationErrors());
                this.appendProcessingError(rule, message, lastError.toString());
                interpreterListener.failEvaluateRule(rule, pipeline);
                log.debug("Encountered evaluation error during condition, skipping rule actions: {}", (Object)lastError);
                return false;
            }
            interpreterListener.satisfyRule(rule, pipeline);
            log.debug("[{}] rule `{}` matches, scheduling to run", (Object)msgId, (Object)rule.name());
            rulesToRun.add(rule);
            return true;
        }
        rule.markNonMatch();
        interpreterListener.dissatisfyRule(rule, pipeline);
        log.debug("[{}] rule `{}` does not match", (Object)msgId, (Object)rule.name());
        return false;
    }

    private void appendProcessingError(Rule rule, Message message, String errorString) {
        String msg = "For rule '" + rule.name() + "': " + errorString;
        if (message.hasField("gl2_processing_error")) {
            message.addField("gl2_processing_error", message.getFieldAs(String.class, "gl2_processing_error") + "," + msg);
        } else {
            message.addField("gl2_processing_error", msg);
        }
    }

    public static class State {
        private static final Logger LOG = LoggerFactory.getLogger(State.class);
        private final ImmutableMap<String, Pipeline> currentPipelines;
        private final ImmutableSetMultimap<String, Pipeline> streamPipelineConnections;
        private final LoadingCache<Set<Pipeline>, StageIterator.Configuration> cache;
        private final boolean cachedIterators;
        private final RuleMetricsConfigDto ruleMetricsConfig;

        @AssistedInject
        public State(@Assisted ImmutableMap<String, Pipeline> currentPipelines, @Assisted ImmutableSetMultimap<String, Pipeline> streamPipelineConnections, @Assisted RuleMetricsConfigDto ruleMetricsConfig, MetricRegistry metricRegistry, @Named(value="processbuffer_processors") int processorCount, @Named(value="cached_stageiterators") boolean cachedIterators) {
            this.currentPipelines = currentPipelines;
            this.streamPipelineConnections = streamPipelineConnections;
            this.cachedIterators = cachedIterators;
            this.ruleMetricsConfig = ruleMetricsConfig;
            this.cache = CacheBuilder.newBuilder().concurrencyLevel(processorCount).recordStats().build((CacheLoader)new CacheLoader<Set<Pipeline>, StageIterator.Configuration>(){

                public StageIterator.Configuration load(@Nonnull Set<Pipeline> pipelines) throws Exception {
                    return new StageIterator.Configuration(pipelines);
                }
            });
            metricRegistry.removeMatching((name, metric) -> name.startsWith(MetricRegistry.name(PipelineInterpreter.class, (String[])new String[]{"stage-cache"})));
            MetricUtils.safelyRegisterAll(metricRegistry, new CacheStatsSet(MetricRegistry.name(PipelineInterpreter.class, (String[])new String[]{"stage-cache"}), (Cache)this.cache));
        }

        public ImmutableMap<String, Pipeline> getCurrentPipelines() {
            return this.currentPipelines;
        }

        public ImmutableSetMultimap<String, Pipeline> getStreamPipelineConnections() {
            return this.streamPipelineConnections;
        }

        public boolean enableRuleMetrics() {
            return this.ruleMetricsConfig.metricsEnabled();
        }

        public StageIterator getStageIterator(Set<Pipeline> pipelines) {
            try {
                if (this.cachedIterators) {
                    return new StageIterator((StageIterator.Configuration)this.cache.get(pipelines));
                }
                return new StageIterator(pipelines);
            }
            catch (ExecutionException e) {
                LOG.error("Unable to get StageIterator from cache, this should not happen.", ExceptionUtils.getRootCause(e));
                return new StageIterator(pipelines);
            }
        }

        public static interface Factory {
            public State newState(ImmutableMap<String, Pipeline> var1, ImmutableSetMultimap<String, Pipeline> var2, RuleMetricsConfigDto var3);
        }
    }

    public static class Descriptor
    implements MessageProcessor.Descriptor {
        @Override
        public String name() {
            return "Pipeline Processor";
        }

        @Override
        public String className() {
            return PipelineInterpreter.class.getCanonicalName();
        }
    }
}

