/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.common.event.task;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.inlong.manager.common.event.EventListenerManager;
import org.apache.inlong.manager.common.event.EventListenerNotifier;
import org.apache.inlong.manager.common.event.LogableEventListener;
import org.apache.inlong.manager.common.event.task.LogableTaskEventListener;
import org.apache.inlong.manager.common.event.task.TaskEvent;
import org.apache.inlong.manager.common.event.task.TaskEventListener;
import org.apache.inlong.manager.common.event.task.TaskEventListenerManager;
import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.inlong.manager.common.model.definition.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskEventNotifier
implements EventListenerNotifier<TaskEvent> {
    private static final Logger log = LoggerFactory.getLogger(TaskEventNotifier.class);
    private final ExecutorService executorService = new ThreadPoolExecutor(20, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setNameFormat("async-task-event-notifier-%s").build(), new ThreadPoolExecutor.CallerRunsPolicy());
    private EventListenerManager<TaskEvent, TaskEventListener> eventListenerManager;

    public TaskEventNotifier(TaskEventListenerManager eventListenerManager) {
        this.eventListenerManager = eventListenerManager;
    }

    @Override
    public void notify(TaskEvent event, WorkflowContext sourceContext) {
        WorkflowContext context = sourceContext.clone();
        Task task = (Task)context.getCurrentElement();
        this.eventListenerManager.syncListeners(event).forEach(this.syncLogableNotify(context));
        task.syncListeners(event).forEach(this.syncLogableNotify(context));
        this.eventListenerManager.asyncListeners(event).forEach(this.asyncLogableNotify(context));
        task.asyncListeners(event).forEach(this.asyncLogableNotify(context));
    }

    @Override
    public void notify(String listenerName, boolean forceSync, WorkflowContext sourceContext) {
        WorkflowContext context = sourceContext.clone();
        Optional.ofNullable(this.eventListenerManager.listener(listenerName)).ifPresent(this.logableNotify(forceSync, context));
        Task task = (Task)context.getCurrentElement();
        Optional.ofNullable(task.listener(listenerName)).ifPresent(this.logableNotify(forceSync, context));
    }

    private Consumer<TaskEventListener> logableNotify(boolean forceSync, WorkflowContext context) {
        return listener -> {
            if (forceSync || !listener.async()) {
                this.syncLogableNotify(context).accept((TaskEventListener)listener);
                return;
            }
            this.asyncLogableNotify(context).accept((TaskEventListener)listener);
        };
    }

    private Consumer<TaskEventListener> asyncLogableNotify(WorkflowContext context) {
        return listener -> this.executorService.execute(() -> {
            try {
                this.logableEventListener((TaskEventListener)listener).listen(context);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    private Consumer<TaskEventListener> syncLogableNotify(WorkflowContext context) {
        return listener -> this.logableEventListener((TaskEventListener)listener).listen(context);
    }

    private LogableTaskEventListener logableEventListener(TaskEventListener listener) {
        if (listener instanceof LogableEventListener) {
            return (LogableTaskEventListener)listener;
        }
        return new LogableTaskEventListener(listener, this.eventListenerManager.eventLogStorage());
    }
}

