package org.apache.inlong.manager.common.event.process;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.inlong.manager.common.event.EventListenerManager;
import org.apache.inlong.manager.common.event.EventListenerNotifier;
import org.apache.inlong.manager.common.event.LogableEventListener;
import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.inlong.manager.common.model.definition.Process;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/manager/common/event/process/ProcessEventNotifier.class */
public class ProcessEventNotifier implements EventListenerNotifier<ProcessEvent> {
    private static final Logger log = LoggerFactory.getLogger(ProcessEventNotifier.class);
    private final ExecutorService executorService = new ThreadPoolExecutor(20, 20, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("async-process-event-notifier-%s").build(), new ThreadPoolExecutor.CallerRunsPolicy());
    private EventListenerManager<ProcessEvent, ProcessEventListener> eventListenerManager;

    public ProcessEventNotifier(ProcessEventListenerManager processEventListenerManager) {
        this.eventListenerManager = processEventListenerManager;
    }

    @Override // org.apache.inlong.manager.common.event.EventListenerNotifier
    public void notify(ProcessEvent processEvent, WorkflowContext workflowContext) {
        WorkflowContext m32clone = workflowContext.m32clone();
        Process process = m32clone.getProcess();
        this.eventListenerManager.syncListeners(processEvent).forEach(syncLogableNotify(m32clone));
        process.syncListeners(processEvent).forEach(syncLogableNotify(m32clone));
        this.eventListenerManager.asyncListeners(processEvent).forEach(asyncLogableNotify(m32clone));
        process.asyncListeners(processEvent).forEach(asyncLogableNotify(m32clone));
    }

    @Override // org.apache.inlong.manager.common.event.EventListenerNotifier
    public void notify(String str, boolean z, WorkflowContext workflowContext) {
        WorkflowContext m32clone = workflowContext.m32clone();
        Process process = m32clone.getProcess();
        Optional.ofNullable(this.eventListenerManager.listener(str)).ifPresent(logableNotify(z, m32clone));
        Optional.ofNullable(process.listener(str)).ifPresent(logableNotify(z, m32clone));
    }

    private Consumer<ProcessEventListener> logableNotify(boolean z, WorkflowContext workflowContext) {
        return processEventListener -> {
            if (z || !processEventListener.async()) {
                syncLogableNotify(workflowContext).accept(processEventListener);
            } else {
                asyncLogableNotify(workflowContext).accept(processEventListener);
            }
        };
    }

    private Consumer<ProcessEventListener> asyncLogableNotify(WorkflowContext workflowContext) {
        return processEventListener -> {
            this.executorService.execute(() -> {
                logableEventListener(processEventListener).listen(workflowContext);
            });
        };
    }

    private Consumer<ProcessEventListener> syncLogableNotify(WorkflowContext workflowContext) {
        return processEventListener -> {
            logableEventListener(processEventListener).listen(workflowContext);
        };
    }

    private LogableProcessEventListener logableEventListener(ProcessEventListener processEventListener) {
        return processEventListener instanceof LogableEventListener ? (LogableProcessEventListener) processEventListener : new LogableProcessEventListener(processEventListener, this.eventListenerManager.eventLogStorage());
    }
}
