package com.netflix.conductor.core.events;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.events.EventHandler;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.utils.JsonUtils;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.service.ExecutionService;
import com.netflix.conductor.service.MetadataService;
import com.spotify.futures.CompletableFutures;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

@ConditionalOnProperty(name = {"conductor.default-event-processor.enabled"}, havingValue = "true", matchIfMissing = true)
@Component
/* loaded from: input_file:com/netflix/conductor/core/events/DefaultEventProcessor.class */
public class DefaultEventProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventProcessor.class);
    private static final int RETRY_COUNT = 3;
    private final MetadataService metadataService;
    private final ExecutionService executionService;
    private final ActionProcessor actionProcessor;
    private final ExecutorService eventActionExecutorService;
    private final ObjectMapper objectMapper;
    private final JsonUtils jsonUtils;
    private final boolean isEventMessageIndexingEnabled;

    public DefaultEventProcessor(ExecutionService executionService, MetadataService metadataService, ActionProcessor actionProcessor, JsonUtils jsonUtils, ConductorProperties conductorProperties, ObjectMapper objectMapper) {
        this.executionService = executionService;
        this.metadataService = metadataService;
        this.actionProcessor = actionProcessor;
        this.objectMapper = objectMapper;
        this.jsonUtils = jsonUtils;
        if (conductorProperties.getEventProcessorThreadCount() <= 0) {
            throw new IllegalStateException("Cannot set event processor thread count to <=0. To disable event processing, set conductor.default-event-processor.enabled=false.");
        }
        this.eventActionExecutorService = Executors.newFixedThreadPool(conductorProperties.getEventProcessorThreadCount(), new ThreadFactoryBuilder().setNameFormat("event-action-executor-thread-%d").build());
        this.isEventMessageIndexingEnabled = conductorProperties.isEventMessageIndexingEnabled();
        LOGGER.info("Event Processing is ENABLED");
    }

    public void handle(ObservableQueue observableQueue, Message message) {
        try {
            try {
                if (this.isEventMessageIndexingEnabled) {
                    this.executionService.addMessage(observableQueue.getName(), message);
                }
                String str = observableQueue.getType() + ":" + observableQueue.getName();
                LOGGER.debug("Evaluating message: {} for event: {}", message.getId(), str);
                if (executeEvent(str, message).isEmpty()) {
                    observableQueue.ack(Collections.singletonList(message));
                    LOGGER.debug("Message: {} acked on queue: {}", message.getId(), observableQueue.getName());
                } else if (observableQueue.rePublishIfNoAck()) {
                    observableQueue.publish(Collections.singletonList(message));
                    LOGGER.debug("Message: {} published to queue: {}", message.getId(), observableQueue.getName());
                }
                Monitors.recordEventQueueMessagesHandled(observableQueue.getType(), observableQueue.getName());
            } catch (Exception e) {
                LOGGER.error("Error handling message: {} on queue:{}", new Object[]{message, observableQueue.getName(), e});
                Monitors.recordEventQueueMessagesError(observableQueue.getType(), observableQueue.getName());
                Monitors.recordEventQueueMessagesHandled(observableQueue.getType(), observableQueue.getName());
            }
        } catch (Throwable th) {
            Monitors.recordEventQueueMessagesHandled(observableQueue.getType(), observableQueue.getName());
            throw th;
        }
    }

    protected List<EventExecution> executeEvent(String str, Message message) throws Exception {
        List<EventHandler> eventHandlersForEvent = this.metadataService.getEventHandlersForEvent(str, true);
        Object payloadObject = getPayloadObject(message.getPayload());
        ArrayList arrayList = new ArrayList();
        for (EventHandler eventHandler : eventHandlersForEvent) {
            String condition = eventHandler.getCondition();
            if (StringUtils.isNotEmpty(condition)) {
                LOGGER.debug("Checking condition: {} for event: {}", condition, str);
                if (!ScriptEvaluator.evalBool(condition, this.jsonUtils.expand(payloadObject)).booleanValue()) {
                    EventExecution eventExecution = new EventExecution(message.getId() + "_0", message.getId());
                    eventExecution.setCreated(System.currentTimeMillis());
                    eventExecution.setEvent(eventHandler.getEvent());
                    eventExecution.setName(eventHandler.getName());
                    eventExecution.setStatus(EventExecution.Status.SKIPPED);
                    eventExecution.getOutput().put("msg", message.getPayload());
                    eventExecution.getOutput().put("condition", condition);
                    this.executionService.addEventExecution(eventExecution);
                    LOGGER.debug("Condition: {} not successful for event: {} with payload: {}", new Object[]{condition, eventHandler.getEvent(), message.getPayload()});
                }
            }
            executeActionsForEventHandler(eventHandler, message).whenComplete((list, th) -> {
                list.forEach(eventExecution2 -> {
                    if (th != null || eventExecution2.getStatus() == EventExecution.Status.IN_PROGRESS) {
                        arrayList.add(eventExecution2);
                    } else {
                        this.executionService.updateEventExecution(eventExecution2);
                    }
                });
            }).get();
        }
        return processTransientFailures(arrayList);
    }

    protected List<EventExecution> processTransientFailures(List<EventExecution> list) {
        ExecutionService executionService = this.executionService;
        Objects.requireNonNull(executionService);
        list.forEach(executionService::removeEventExecution);
        return list;
    }

    protected CompletableFuture<List<EventExecution>> executeActionsForEventHandler(EventHandler eventHandler, Message message) {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        for (EventHandler.Action action : eventHandler.getActions()) {
            int i2 = i;
            i++;
            EventExecution eventExecution = new EventExecution(message.getId() + "_" + i2, message.getId());
            eventExecution.setCreated(System.currentTimeMillis());
            eventExecution.setEvent(eventHandler.getEvent());
            eventExecution.setName(eventHandler.getName());
            eventExecution.setAction(action.getAction());
            eventExecution.setStatus(EventExecution.Status.IN_PROGRESS);
            if (this.executionService.addEventExecution(eventExecution)) {
                arrayList.add(CompletableFuture.supplyAsync(() -> {
                    return execute(eventExecution, action, getPayloadObject(message.getPayload()));
                }, this.eventActionExecutorService));
            } else {
                LOGGER.warn("Duplicate delivery/execution of message: {}", message.getId());
            }
        }
        return CompletableFutures.allAsList(arrayList);
    }

    protected EventExecution execute(EventExecution eventExecution, EventHandler.Action action, Object obj) {
        try {
            String format = String.format("Executing action: %s for event: %s with messageId: %s with payload: %s", action.getAction(), eventExecution.getId(), eventExecution.getMessageId(), obj);
            LOGGER.debug(format);
            Map map = (Map) new RetryUtil().retryOnException(() -> {
                return this.actionProcessor.execute(action, obj, eventExecution.getEvent(), eventExecution.getMessageId());
            }, this::isTransientException, (Predicate) null, RETRY_COUNT, format, "executeEventAction");
            if (map != null) {
                eventExecution.getOutput().putAll(map);
            }
            eventExecution.setStatus(EventExecution.Status.COMPLETED);
            Monitors.recordEventExecutionSuccess(eventExecution.getEvent(), eventExecution.getName(), eventExecution.getAction().name());
        } catch (RuntimeException e) {
            LOGGER.error("Error executing action: {} for event: {} with messageId: {}", new Object[]{action.getAction(), eventExecution.getEvent(), eventExecution.getMessageId(), e});
            if (!isTransientException(e.getCause())) {
                eventExecution.setStatus(EventExecution.Status.FAILED);
                eventExecution.getOutput().put("exception", e.getMessage());
                Monitors.recordEventExecutionError(eventExecution.getEvent(), eventExecution.getName(), eventExecution.getAction().name(), e.getClass().getSimpleName());
            }
        }
        return eventExecution;
    }

    protected boolean isTransientException(Throwable th) {
        if (th != null) {
            return !(th instanceof UnsupportedOperationException) && (!(th instanceof ApplicationException) || ((ApplicationException) th).getCode() == ApplicationException.Code.BACKEND_ERROR);
        }
        return true;
    }

    private Object getPayloadObject(String str) {
        Object obj = null;
        if (str != null) {
            try {
                obj = this.objectMapper.readValue(str, Object.class);
            } catch (Exception e) {
                obj = str;
            }
        }
        return obj;
    }
}
