package org.graylog2.notifications;

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.graylog.events.processor.DBEventDefinitionService;
import org.graylog.events.processor.EventDefinitionDto;
import org.graylog.events.processor.EventProcessorEngine;
import org.graylog.events.processor.EventProcessorException;
import org.graylog.events.processor.systemnotification.SystemNotificationEventProcessorParameters;
import org.graylog.events.processor.systemnotification.SystemNotificationRenderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:org/graylog2/notifications/NotificationSystemEventPublisher.class */
public class NotificationSystemEventPublisher extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(NotificationSystemEventPublisher.class);
    private static final int MAX_QUEUED_NOTIFICATIONS = 10000;
    private final DBEventDefinitionService dbEventDefinitionService;
    private final SystemNotificationRenderService systemNotificationRenderService;
    private final EventProcessorEngine eventProcessorEngine;
    private final ScheduledExecutorService scheduler;
    private final Duration shutdownTimeout;
    private Thread executionThread;
    private final AtomicReference<ScheduledFuture<?>> shutdownTask = new AtomicReference<>();
    private final BlockingQueue<Notification> queuedNotifications = new LinkedBlockingQueue(10000);

    @Inject
    public NotificationSystemEventPublisher(DBEventDefinitionService dBEventDefinitionService, SystemNotificationRenderService systemNotificationRenderService, EventProcessorEngine eventProcessorEngine, @Named("daemonScheduler") ScheduledExecutorService scheduledExecutorService, @Named("shutdown_timeout") int i) {
        this.dbEventDefinitionService = dBEventDefinitionService;
        this.systemNotificationRenderService = systemNotificationRenderService;
        this.eventProcessorEngine = eventProcessorEngine;
        this.scheduler = scheduledExecutorService;
        this.shutdownTimeout = Duration.ofMillis(i);
    }

    public boolean submit(Notification notification) {
        try {
            if (this.queuedNotifications.offer(notification)) {
                return true;
            }
            LOG.error("Unable to submit system notification for publishing as a system event. Current number of queued notifications: {}. Max queue size: {}", Integer.valueOf(this.queuedNotifications.size()), 10000);
            return false;
        } catch (Exception e) {
            LOG.error("Failed submitting notification for publishing as a system event", e);
            return false;
        }
    }

    protected void run() throws Exception {
        while (true) {
            if (!isRunning() && this.queuedNotifications.isEmpty()) {
                return;
            }
            ScheduledFuture<?> scheduledFuture = this.shutdownTask.get();
            if (scheduledFuture != null && scheduledFuture.isDone()) {
                return;
            }
            try {
                Notification poll = this.queuedNotifications.poll(1L, TimeUnit.SECONDS);
                if (poll != null) {
                    publish(poll);
                }
            } catch (Exception e) {
                LOG.error("", e);
            }
        }
    }

    protected void startUp() throws Exception {
        this.executionThread = Thread.currentThread();
    }

    protected void triggerShutdown() {
        this.shutdownTask.set(this.scheduler.schedule(() -> {
            LOG.error("Notification queue was not drained within {}. Forcefully terminating.", this.shutdownTimeout);
            this.executionThread.interrupt();
        }, this.shutdownTimeout.getSeconds(), TimeUnit.SECONDS));
    }

    protected void shutDown() throws Exception {
        if (this.shutdownTask.get() != null) {
            this.shutdownTask.get().cancel(true);
        }
    }

    private void publish(Notification notification) {
        EventDefinitionDto orElseThrow = this.dbEventDefinitionService.getSystemEventDefinitions().stream().findFirst().orElseThrow(() -> {
            return new IllegalStateException("System notification event definition not found");
        });
        try {
            SystemNotificationRenderService.RenderResponse render = this.systemNotificationRenderService.render(notification);
            notification.addDetail("message_details", render.description);
            try {
                this.eventProcessorEngine.execute(orElseThrow.id(), SystemNotificationEventProcessorParameters.builder().notificationType(notification.getType()).notificationMessage(render.title).notificationDetails(notification.getDetails()).timestamp(notification.getTimestamp()).build());
            } catch (EventProcessorException e) {
                LOG.error("Failed to publish system event for notification {}", notification.getType().toString(), e);
            }
        } catch (Exception e2) {
            LOG.warn("Cannot render notification for system event publishing.", e2);
        }
    }
}
