package com.netflix.conductor.core.events;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.events.EventHandler;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.core.execution.ApplicationException;
import com.netflix.conductor.core.utils.JsonUtils;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.service.ExecutionService;
import com.netflix.conductor.service.MetadataService;
import com.spotify.futures.CompletableFutures;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netflix/conductor/core/events/SimpleEventProcessor.class */
public class SimpleEventProcessor implements EventProcessor {
    private static final Logger logger = LoggerFactory.getLogger(SimpleEventProcessor.class);
    private static final String className = SimpleEventProcessor.class.getSimpleName();
    private static final int RETRY_COUNT = 3;
    private final MetadataService metadataService;
    private final ExecutionService executionService;
    private final ActionProcessor actionProcessor;
    private final EventQueues eventQueues;
    private ExecutorService executorService;
    private final Map<String, ObservableQueue> eventToQueueMap = new ConcurrentHashMap();
    private final ObjectMapper objectMapper;
    private final JsonUtils jsonUtils;
    private final boolean isEventMessageIndexingEnabled;

    @Inject
    public SimpleEventProcessor(ExecutionService executionService, MetadataService metadataService, ActionProcessor actionProcessor, EventQueues eventQueues, JsonUtils jsonUtils, Configuration configuration, ObjectMapper objectMapper) {
        this.executionService = executionService;
        this.metadataService = metadataService;
        this.actionProcessor = actionProcessor;
        this.eventQueues = eventQueues;
        this.objectMapper = objectMapper;
        this.jsonUtils = jsonUtils;
        this.isEventMessageIndexingEnabled = configuration.isEventMessageIndexingEnabled();
        int intProperty = configuration.getIntProperty("workflow.event.processor.thread.count", 2);
        if (intProperty <= 0) {
            logger.warn("Event processing is DISABLED. executorThreadCount set to {}", Integer.valueOf(intProperty));
            return;
        }
        this.executorService = Executors.newFixedThreadPool(intProperty);
        refresh();
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 60L, 60L, TimeUnit.SECONDS);
        logger.info("Event Processing is ENABLED. executorThreadCount set to {}", Integer.valueOf(intProperty));
    }

    @Override // com.netflix.conductor.core.events.EventProcessor
    public Map<String, String> getQueues() {
        HashMap hashMap = new HashMap();
        this.eventToQueueMap.forEach((str, observableQueue) -> {
        });
        return hashMap;
    }

    @Override // com.netflix.conductor.core.events.EventProcessor
    public Map<String, Map<String, Long>> getQueueSizes() {
        HashMap hashMap = new HashMap();
        this.eventToQueueMap.forEach((str, observableQueue) -> {
            HashMap hashMap2 = new HashMap();
            hashMap2.put(observableQueue.getName(), Long.valueOf(observableQueue.size()));
            hashMap.put(str, hashMap2);
        });
        return hashMap;
    }

    private void refresh() {
        try {
            Set set = (Set) this.metadataService.getAllEventHandlers().stream().map((v0) -> {
                return v0.getEvent();
            }).collect(Collectors.toSet());
            LinkedList linkedList = new LinkedList();
            set.forEach(str -> {
                this.eventToQueueMap.computeIfAbsent(str, str -> {
                    ObservableQueue queue = this.eventQueues.getQueue(str);
                    linkedList.add(queue);
                    return queue;
                });
            });
            linkedList.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).forEach(this::listen);
        } catch (Exception e) {
            Monitors.error(className, "refresh");
            logger.error("refresh event queues failed", e);
        }
    }

    private void listen(ObservableQueue observableQueue) {
        observableQueue.observe().subscribe(message -> {
            handle(observableQueue, message);
        });
    }

    private void handle(ObservableQueue observableQueue, Message message) {
        try {
            try {
                if (this.isEventMessageIndexingEnabled) {
                    this.executionService.addMessage(observableQueue.getName(), message);
                }
                String str = observableQueue.getType() + QueueUtils.DOMAIN_SEPARATOR + observableQueue.getName();
                logger.debug("Evaluating message: {} for event: {}", message.getId(), str);
                if (executeEvent(str, message).isEmpty()) {
                    observableQueue.ack(Collections.singletonList(message));
                    logger.debug("Message: {} acked on queue: {}", message.getId(), observableQueue.getName());
                } else if (observableQueue.rePublishIfNoAck()) {
                    observableQueue.publish(Collections.singletonList(message));
                    logger.debug("Message: {} published to queue: {}", message.getId(), observableQueue.getName());
                }
                Monitors.recordEventQueueMessagesHandled(observableQueue.getType(), observableQueue.getName());
            } catch (Exception e) {
                logger.error("Error handling message: {} on queue:{}", new Object[]{message, observableQueue.getName(), e});
                Monitors.recordEventQueueMessagesHandled(observableQueue.getType(), observableQueue.getName());
            }
        } catch (Throwable th) {
            Monitors.recordEventQueueMessagesHandled(observableQueue.getType(), observableQueue.getName());
            throw th;
        }
    }

    private List<EventExecution> executeEvent(String str, Message message) throws Exception {
        List<EventHandler> eventHandlersForEvent = this.metadataService.getEventHandlersForEvent(str, true);
        Object payloadObject = getPayloadObject(message.getPayload());
        ArrayList arrayList = new ArrayList();
        for (EventHandler eventHandler : eventHandlersForEvent) {
            String condition = eventHandler.getCondition();
            if (StringUtils.isNotEmpty(condition)) {
                logger.debug("Checking condition: {} for event: {}", condition, str);
                if (!ScriptEvaluator.evalBool(condition, this.jsonUtils.expand(payloadObject)).booleanValue()) {
                    EventExecution eventExecution = new EventExecution(message.getId() + "_0", message.getId());
                    eventExecution.setCreated(System.currentTimeMillis());
                    eventExecution.setEvent(eventHandler.getEvent());
                    eventExecution.setName(eventHandler.getName());
                    eventExecution.setStatus(EventExecution.Status.SKIPPED);
                    eventExecution.getOutput().put("msg", message.getPayload());
                    eventExecution.getOutput().put("condition", condition);
                    this.executionService.addEventExecution(eventExecution);
                    logger.debug("Condition: {} not successful for event: {} with payload: {}", new Object[]{condition, eventHandler.getEvent(), message.getPayload()});
                }
            }
            executeActionsForEventHandler(eventHandler, message).whenComplete((list, th) -> {
                list.forEach(eventExecution2 -> {
                    if (th == null && eventExecution2.getStatus() != EventExecution.Status.IN_PROGRESS) {
                        this.executionService.updateEventExecution(eventExecution2);
                    } else {
                        this.executionService.removeEventExecution(eventExecution2);
                        arrayList.add(eventExecution2);
                    }
                });
            }).get();
        }
        return arrayList;
    }

    private CompletableFuture<List<EventExecution>> executeActionsForEventHandler(EventHandler eventHandler, Message message) {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        for (EventHandler.Action action : eventHandler.getActions()) {
            int i2 = i;
            i++;
            EventExecution eventExecution = new EventExecution(message.getId() + "_" + i2, message.getId());
            eventExecution.setCreated(System.currentTimeMillis());
            eventExecution.setEvent(eventHandler.getEvent());
            eventExecution.setName(eventHandler.getName());
            eventExecution.setAction(action.getAction());
            eventExecution.setStatus(EventExecution.Status.IN_PROGRESS);
            if (this.executionService.addEventExecution(eventExecution)) {
                arrayList.add(CompletableFuture.supplyAsync(() -> {
                    return execute(eventExecution, action, getPayloadObject(message.getPayload()));
                }, this.executorService));
            } else {
                logger.warn("Duplicate delivery/execution of message: {}", message.getId());
            }
        }
        return CompletableFutures.allAsList(arrayList);
    }

    @VisibleForTesting
    EventExecution execute(EventExecution eventExecution, EventHandler.Action action, Object obj) {
        try {
            String format = String.format("Executing action: %s for event: %s with messageId: %s with payload: %s", action.getAction(), eventExecution.getId(), eventExecution.getMessageId(), obj);
            logger.debug(format);
            Map map = (Map) new RetryUtil().retryOnException(() -> {
                return this.actionProcessor.execute(action, obj, eventExecution.getEvent(), eventExecution.getMessageId());
            }, this::isTransientException, (Predicate) null, RETRY_COUNT, format, "executeEventAction");
            if (map != null) {
                eventExecution.getOutput().putAll(map);
            }
            eventExecution.setStatus(EventExecution.Status.COMPLETED);
        } catch (RuntimeException e) {
            logger.error("Error executing action: {} for event: {} with messageId: {}", new Object[]{action.getAction(), eventExecution.getEvent(), eventExecution.getMessageId(), e});
            if (!isTransientException(e.getCause())) {
                eventExecution.setStatus(EventExecution.Status.FAILED);
                eventExecution.getOutput().put("exception", e.getMessage());
            }
        }
        return eventExecution;
    }

    private boolean isTransientException(Throwable th) {
        if (th != null) {
            return !(th instanceof UnsupportedOperationException) && (!(th instanceof ApplicationException) || ((ApplicationException) th).getCode() == ApplicationException.Code.BACKEND_ERROR);
        }
        return true;
    }

    private Object getPayloadObject(String str) {
        Object obj = null;
        if (str != null) {
            try {
                obj = this.objectMapper.readValue(str, Object.class);
            } catch (Exception e) {
                obj = str;
            }
        }
        return obj;
    }
}
