package com.netflix.conductor.core.events;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.core.LifecycleAwareComponent;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.dao.EventHandlerDAO;
import com.netflix.conductor.metrics.Monitors;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@ConditionalOnProperty(name = {"conductor.default-event-processor.enabled"}, havingValue = "true", matchIfMissing = true)
@Component
/* loaded from: input_file:com/netflix/conductor/core/events/DefaultEventQueueManager.class */
public class DefaultEventQueueManager extends LifecycleAwareComponent implements EventQueueManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventQueueManager.class);
    private final EventHandlerDAO eventHandlerDAO;
    private final EventQueues eventQueues;
    private final DefaultEventProcessor defaultEventProcessor;
    private final Map<String, ObservableQueue> eventToQueueMap = new ConcurrentHashMap();
    private final Map<Task.Status, ObservableQueue> defaultQueues;

    public DefaultEventQueueManager(Map<Task.Status, ObservableQueue> map, EventHandlerDAO eventHandlerDAO, EventQueues eventQueues, DefaultEventProcessor defaultEventProcessor) {
        this.defaultQueues = map;
        this.eventHandlerDAO = eventHandlerDAO;
        this.eventQueues = eventQueues;
        this.defaultEventProcessor = defaultEventProcessor;
    }

    @Override // com.netflix.conductor.core.events.EventQueueManager
    public Map<String, String> getQueues() {
        HashMap hashMap = new HashMap();
        this.eventToQueueMap.forEach((str, observableQueue) -> {
            hashMap.put(str, observableQueue.getName());
        });
        return hashMap;
    }

    @Override // com.netflix.conductor.core.events.EventQueueManager
    public Map<String, Map<String, Long>> getQueueSizes() {
        HashMap hashMap = new HashMap();
        this.eventToQueueMap.forEach((str, observableQueue) -> {
            HashMap hashMap2 = new HashMap();
            hashMap2.put(observableQueue.getName(), Long.valueOf(observableQueue.size()));
            hashMap.put(str, hashMap2);
        });
        return hashMap;
    }

    @Override // com.netflix.conductor.core.LifecycleAwareComponent
    public void doStart() {
        this.eventToQueueMap.forEach((str, observableQueue) -> {
            LOGGER.info("Start listening for events: {}", str);
            observableQueue.start();
        });
        this.defaultQueues.forEach((status, observableQueue2) -> {
            LOGGER.info("Start listening on default queue {} for status {}", status, observableQueue2.getName());
            observableQueue2.start();
        });
    }

    @Override // com.netflix.conductor.core.LifecycleAwareComponent
    public void doStop() {
        this.eventToQueueMap.forEach((str, observableQueue) -> {
            LOGGER.info("Stop listening for events: {}", str);
            observableQueue.stop();
        });
        this.defaultQueues.forEach((status, observableQueue2) -> {
            LOGGER.info("Stop listening on default queue {} for status {}", status, observableQueue2.getName());
            observableQueue2.stop();
        });
    }

    @Scheduled(fixedDelay = 60000)
    public void refreshEventQueues() {
        try {
            Set set = (Set) this.eventHandlerDAO.getAllEventHandlers().stream().map((v0) -> {
                return v0.getEvent();
            }).collect(Collectors.toSet());
            LinkedList linkedList = new LinkedList();
            set.forEach(str -> {
                this.eventToQueueMap.computeIfAbsent(str, str -> {
                    ObservableQueue queue = this.eventQueues.getQueue(str);
                    linkedList.add(queue);
                    return queue;
                });
            });
            linkedList.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).peek((v0) -> {
                v0.start();
            }).forEach(this::listen);
        } catch (Exception e) {
            Monitors.error(getClass().getSimpleName(), "refresh");
            LOGGER.error("refresh event queues failed", e);
        }
    }

    private void listen(ObservableQueue observableQueue) {
        observableQueue.observe().subscribe(message -> {
            this.defaultEventProcessor.handle(observableQueue, message);
        });
    }
}
