package com.netflix.conductor.core.events;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.events.EventHandler;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.service.ExecutionService;
import com.netflix.conductor.service.MetadataService;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/netflix/conductor/core/events/EventProcessor.class */
public class EventProcessor {
    private static Logger logger = LoggerFactory.getLogger(EventProcessor.class);
    private MetadataService metadataService;
    private ExecutionService executionService;
    private ActionProcessor actionProcessor;
    private Map<String, ObservableQueue> queuesMap = new ConcurrentHashMap();
    private ExecutorService executors;
    private ObjectMapper objectMapper;

    @Inject
    public EventProcessor(ExecutionService executionService, MetadataService metadataService, ActionProcessor actionProcessor, Configuration configuration, ObjectMapper objectMapper) {
        this.executionService = executionService;
        this.metadataService = metadataService;
        this.actionProcessor = actionProcessor;
        this.objectMapper = objectMapper;
        int intProperty = configuration.getIntProperty("workflow.event.processor.thread.count", 2);
        if (intProperty <= 0) {
            logger.warn("Event processing is DISABLED.  executorThreadCount set to {}", Integer.valueOf(intProperty));
            return;
        }
        this.executors = Executors.newFixedThreadPool(intProperty);
        refresh();
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
            refresh();
        }, 60L, 60L, TimeUnit.SECONDS);
    }

    public Map<String, String> getQueues() {
        HashMap hashMap = new HashMap();
        this.queuesMap.entrySet().stream().forEach(entry -> {
        });
        return hashMap;
    }

    public Map<String, Map<String, Long>> getQueueSizes() {
        HashMap hashMap = new HashMap();
        this.queuesMap.entrySet().stream().forEach(entry -> {
            HashMap hashMap2 = new HashMap();
            hashMap2.put(((ObservableQueue) entry.getValue()).getName(), Long.valueOf(((ObservableQueue) entry.getValue()).size()));
            hashMap.put(entry.getKey(), hashMap2);
        });
        return hashMap;
    }

    private void refresh() {
        Set set = (Set) this.metadataService.getEventHandlers().stream().map(eventHandler -> {
            return eventHandler.getEvent();
        }).collect(Collectors.toSet());
        LinkedList linkedList = new LinkedList();
        set.stream().forEach(str -> {
            this.queuesMap.computeIfAbsent(str, str -> {
                ObservableQueue queue = EventQueues.getQueue(str, false);
                linkedList.add(queue);
                return queue;
            });
        });
        if (linkedList.isEmpty()) {
            return;
        }
        linkedList.stream().filter(observableQueue -> {
            return observableQueue != null;
        }).forEach(observableQueue2 -> {
            listen(observableQueue2);
        });
    }

    private void listen(ObservableQueue observableQueue) {
        observableQueue.observe().subscribe(message -> {
            handle(observableQueue, message);
        });
    }

    private void handle(ObservableQueue observableQueue, Message message) {
        try {
            LinkedList linkedList = new LinkedList();
            String payload = message.getPayload();
            Object obj = null;
            if (payload != null) {
                try {
                    obj = this.objectMapper.readValue(payload, Object.class);
                } catch (Exception e) {
                    obj = payload;
                }
            }
            this.executionService.addMessage(observableQueue.getName(), message);
            for (EventHandler eventHandler : this.metadataService.getEventHandlersForEvent(observableQueue.getType() + QueueUtils.DOMAIN_SEPARATOR + observableQueue.getName(), true)) {
                String condition = eventHandler.getCondition();
                logger.debug("condition: {}", condition);
                if (StringUtils.isEmpty(condition) || ScriptEvaluator.evalBool(condition, obj).booleanValue()) {
                    int i = 0;
                    for (EventHandler.Action action : eventHandler.getActions()) {
                        int i2 = i;
                        i++;
                        String str = message.getId() + "_" + i2;
                        EventExecution eventExecution = new EventExecution(str, message.getId());
                        eventExecution.setCreated(System.currentTimeMillis());
                        eventExecution.setEvent(eventHandler.getEvent());
                        eventExecution.setName(eventHandler.getName());
                        eventExecution.setAction(action.getAction());
                        eventExecution.setStatus(EventExecution.Status.IN_PROGRESS);
                        if (this.executionService.addEventExecution(eventExecution)) {
                            linkedList.add(execute(eventExecution, action, payload));
                        } else {
                            logger.warn("Duplicate delivery/execution? {}", str);
                        }
                    }
                } else {
                    logger.info("handler {} condition {} did not match payload {}", new Object[]{eventHandler.getName(), condition, obj});
                    EventExecution eventExecution2 = new EventExecution(message.getId() + "_0", message.getId());
                    eventExecution2.setCreated(System.currentTimeMillis());
                    eventExecution2.setEvent(eventHandler.getEvent());
                    eventExecution2.setName(eventHandler.getName());
                    eventExecution2.setStatus(EventExecution.Status.SKIPPED);
                    eventExecution2.getOutput().put("msg", payload);
                    eventExecution2.getOutput().put("condition", condition);
                    this.executionService.addEventExecution(eventExecution2);
                }
            }
            Iterator it = linkedList.iterator();
            while (it.hasNext()) {
                try {
                    ((Future) it.next()).get();
                } catch (Exception e2) {
                    logger.error(e2.getMessage(), e2);
                }
            }
            observableQueue.ack(Arrays.asList(message));
        } catch (Exception e3) {
            logger.error(e3.getMessage(), e3);
        }
    }

    private Future<Void> execute(EventExecution eventExecution, EventHandler.Action action, String str) {
        return this.executors.submit(() -> {
            try {
                logger.debug("Executing {} with payload {}", action.getAction(), str);
                Map<String, Object> execute = this.actionProcessor.execute(action, str, eventExecution.getEvent(), eventExecution.getMessageId());
                if (execute != null) {
                    eventExecution.getOutput().putAll(execute);
                }
                eventExecution.setStatus(EventExecution.Status.COMPLETED);
                this.executionService.updateEventExecution(eventExecution);
                return null;
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                eventExecution.setStatus(EventExecution.Status.FAILED);
                eventExecution.getOutput().put("exception", e.getMessage());
                return null;
            }
        });
    }
}
