package org.graylog.failure;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.graylog2.Configuration;
import org.graylog2.plugin.Message;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:org/graylog/failure/FailureHandlingService.class */
public class FailureHandlingService extends AbstractExecutionThreadService {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final List<FailureHandler> fallbackFailureHandlerAsList;
    private final Set<FailureHandler> failureHandlers;
    private final FailureSubmissionQueue failureSubmissionQueue;
    private final Configuration configuration;
    private final MessageQueueAcknowledger acknowledger;
    private Thread executionThread;

    @Inject
    public FailureHandlingService(@Named("fallbackFailureHandler") FailureHandler failureHandler, Set<FailureHandler> set, FailureSubmissionQueue failureSubmissionQueue, Configuration configuration, MessageQueueAcknowledger messageQueueAcknowledger) {
        this.fallbackFailureHandlerAsList = Lists.newArrayList(new FailureHandler[]{failureHandler});
        this.failureHandlers = set;
        this.failureSubmissionQueue = failureSubmissionQueue;
        this.configuration = configuration;
        this.acknowledger = messageQueueAcknowledger;
    }

    protected void startUp() throws Exception {
        this.executionThread = Thread.currentThread();
        this.logger.debug("Starting up the service.");
    }

    protected void shutDown() throws Exception {
        long milliseconds = this.configuration.getFailureHandlingShutdownAwait().toMilliseconds();
        int i = 0;
        FailureBatch consumeBlockingWithTimeout = this.failureSubmissionQueue.consumeBlockingWithTimeout(milliseconds);
        while (true) {
            FailureBatch failureBatch = consumeBlockingWithTimeout;
            if (failureBatch == null) {
                this.logger.info("Shutting down the service. Processed {} remaining failure batches.", Integer.valueOf(i));
                this.failureSubmissionQueue.logStats("FailureHandlerService#shutDown");
                return;
            } else {
                handle(failureBatch);
                i++;
                consumeBlockingWithTimeout = this.failureSubmissionQueue.consumeBlockingWithTimeout(milliseconds);
            }
        }
    }

    protected void triggerShutdown() {
        this.logger.debug("Requested to shut down.");
        this.executionThread.interrupt();
        this.failureSubmissionQueue.logStats("FailureHandlerService#triggerShutdown");
    }

    protected void run() throws Exception {
        if (isRunning()) {
            this.logger.debug("The service is up and running!");
        }
        while (isRunning()) {
            try {
                handle(this.failureSubmissionQueue.consumeBlocking());
            } catch (InterruptedException e) {
                this.logger.info("The service's thread has been interrupted. The queue currently contains {} failure batches.", Integer.valueOf(this.failureSubmissionQueue.queueSize()));
            } catch (Exception e2) {
                this.logger.error("Error occurred while handling failures!", e2);
            }
        }
        this.logger.debug("The service has been interrupted.");
    }

    private void handle(FailureBatch failureBatch) {
        suitableHandlers(failureBatch).forEach(failureHandler -> {
            try {
                failureHandler.handle(failureBatch);
            } catch (Exception e) {
                this.logger.error("Error occurred while handling failures by {}", failureHandler.getClass().getName());
            }
        });
        Stream<R> map = failureBatch.getFailures().stream().filter((v0) -> {
            return v0.requiresAcknowledgement();
        }).map((v0) -> {
            return v0.failedMessage();
        });
        Class<Message> cls = Message.class;
        Objects.requireNonNull(Message.class);
        Stream filter = map.filter((v1) -> {
            return r1.isInstance(v1);
        });
        Class<Message> cls2 = Message.class;
        Objects.requireNonNull(Message.class);
        List<Message> list = (List) filter.map((v1) -> {
            return r1.cast(v1);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        this.acknowledger.acknowledge(list);
    }

    private List<FailureHandler> suitableHandlers(FailureBatch failureBatch) {
        List<FailureHandler> list = (List) suitableHandlers(this.failureHandlers, failureBatch).filter((v0) -> {
            return v0.isEnabled();
        }).collect(Collectors.toList());
        return list.isEmpty() ? (List) suitableHandlers(this.fallbackFailureHandlerAsList, failureBatch).collect(Collectors.toList()) : list;
    }

    private Stream<FailureHandler> suitableHandlers(Collection<FailureHandler> collection, FailureBatch failureBatch) {
        return collection.stream().filter(failureHandler -> {
            return failureHandler.supports(failureBatch);
        });
    }
}
