/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.notifications;

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.graylog.events.processor.DBEventDefinitionService;
import org.graylog.events.processor.EventDefinitionDto;
import org.graylog.events.processor.EventProcessorEngine;
import org.graylog.events.processor.EventProcessorException;
import org.graylog.events.processor.systemnotification.SystemNotificationEventProcessorParameters;
import org.graylog.events.processor.systemnotification.SystemNotificationRenderService;
import org.graylog2.notifications.Notification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class NotificationSystemEventPublisher
extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(NotificationSystemEventPublisher.class);
    private static final int MAX_QUEUED_NOTIFICATIONS = 10000;
    private final DBEventDefinitionService dbEventDefinitionService;
    private final SystemNotificationRenderService systemNotificationRenderService;
    private final EventProcessorEngine eventProcessorEngine;
    private final ScheduledExecutorService scheduler;
    private final Duration shutdownTimeout;
    private final BlockingQueue<Notification> queuedNotifications;
    private final AtomicReference<ScheduledFuture<?>> shutdownTask = new AtomicReference();
    private Thread executionThread;

    @Inject
    public NotificationSystemEventPublisher(DBEventDefinitionService dbEventDefinitionService, SystemNotificationRenderService systemNotificationRenderService, EventProcessorEngine eventProcessorEngine, @Named(value="daemonScheduler") ScheduledExecutorService scheduler, @Named(value="shutdown_timeout") int shutDownTimeoutMs) {
        this.dbEventDefinitionService = dbEventDefinitionService;
        this.systemNotificationRenderService = systemNotificationRenderService;
        this.eventProcessorEngine = eventProcessorEngine;
        this.scheduler = scheduler;
        this.queuedNotifications = new LinkedBlockingQueue<Notification>(10000);
        this.shutdownTimeout = Duration.ofMillis(shutDownTimeoutMs);
    }

    public boolean submit(Notification notification) {
        try {
            if (!this.queuedNotifications.offer(notification)) {
                LOG.error("Unable to submit system notification for publishing as a system event. Current number of queued notifications: {}. Max queue size: {}", (Object)this.queuedNotifications.size(), (Object)10000);
                return false;
            }
        }
        catch (Exception e) {
            LOG.error("Failed submitting notification for publishing as a system event", (Throwable)e);
            return false;
        }
        return true;
    }

    protected void run() throws Exception {
        while (this.isRunning() || !this.queuedNotifications.isEmpty()) {
            ScheduledFuture<?> task = this.shutdownTask.get();
            if (task != null && task.isDone()) {
                return;
            }
            try {
                Notification notification = this.queuedNotifications.poll(1L, TimeUnit.SECONDS);
                if (notification == null) continue;
                this.publish(notification);
            }
            catch (Exception e) {
                LOG.error("", (Throwable)e);
            }
        }
    }

    protected void startUp() throws Exception {
        this.executionThread = Thread.currentThread();
    }

    protected void triggerShutdown() {
        this.shutdownTask.set(this.scheduler.schedule(() -> {
            LOG.error("Notification queue was not drained within {}. Forcefully terminating.", (Object)this.shutdownTimeout);
            this.executionThread.interrupt();
        }, this.shutdownTimeout.getSeconds(), TimeUnit.SECONDS));
    }

    protected void shutDown() throws Exception {
        if (this.shutdownTask.get() != null) {
            this.shutdownTask.get().cancel(true);
        }
    }

    private void publish(Notification notification) {
        SystemNotificationRenderService.RenderResponse renderResponse;
        EventDefinitionDto systemEventDefinition = (EventDefinitionDto)this.dbEventDefinitionService.getSystemEventDefinitions().stream().findFirst().orElseThrow(() -> new IllegalStateException("System notification event definition not found"));
        try {
            renderResponse = this.systemNotificationRenderService.render(notification);
        }
        catch (Exception e) {
            LOG.warn("Cannot render notification for system event publishing.", (Throwable)e);
            return;
        }
        notification.addDetail("message_details", renderResponse.description);
        SystemNotificationEventProcessorParameters parameters = SystemNotificationEventProcessorParameters.builder().notificationType(notification.getType()).notificationMessage(renderResponse.title).notificationDetails(notification.getDetails()).timestamp(notification.getTimestamp()).build();
        try {
            this.eventProcessorEngine.execute(systemEventDefinition.id(), parameters);
        }
        catch (EventProcessorException e) {
            LOG.error("Failed to publish system event for notification {}", (Object)notification.getType().toString(), (Object)e);
        }
    }
}

