package com.netflix.conductor.core.events.queue;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.service.ExecutionService;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.class */
public class DefaultEventQueueProcessor {
    private final Map<Task.Status, ObservableQueue> queues;
    private final ExecutionService executionService;
    private final ObjectMapper objectMapper;
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventQueueProcessor.class);
    private static final TypeReference<Map<String, Object>> _mapType = new TypeReference<Map<String, Object>>() { // from class: com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor.1
    };

    public DefaultEventQueueProcessor(Map<Task.Status, ObservableQueue> map, ExecutionService executionService, ObjectMapper objectMapper) {
        this.queues = map;
        this.executionService = executionService;
        this.objectMapper = objectMapper;
        map.forEach(this::startMonitor);
    }

    private void startMonitor(Task.Status status, ObservableQueue observableQueue) {
        observableQueue.observe().subscribe(message -> {
            Optional findFirst;
            try {
                LOGGER.debug("Got message {}", message.getPayload());
                String payload = message.getPayload();
                JsonNode readTree = this.objectMapper.readTree(payload);
                String value = getValue("externalId", readTree);
                if (value == null || "".equals(value)) {
                    LOGGER.error("No external Id found in the payload {}", payload);
                    observableQueue.ack(Collections.singletonList(message));
                    return;
                }
                JsonNode readTree2 = this.objectMapper.readTree(value);
                String value2 = getValue("workflowId", readTree2);
                String value3 = getValue("taskRefName", readTree2);
                String value4 = getValue("taskId", readTree2);
                if (value2 == null || "".equals(value2)) {
                    LOGGER.error("No workflow id found in the message. {}", payload);
                    observableQueue.ack(Collections.singletonList(message));
                    return;
                }
                Workflow executionStatus = this.executionService.getExecutionStatus(value2, true);
                if (StringUtils.isNotEmpty(value4)) {
                    findFirst = executionStatus.getTasks().stream().filter(task -> {
                        return !task.getStatus().isTerminal() && task.getTaskId().equals(value4);
                    }).findFirst();
                } else if (StringUtils.isEmpty(value3)) {
                    LOGGER.error("No taskRefName found in the message. If there is only one WAIT task, will mark it as completed. {}", payload);
                    findFirst = executionStatus.getTasks().stream().filter(task2 -> {
                        return !task2.getStatus().isTerminal() && task2.getTaskType().equals("WAIT");
                    }).findFirst();
                } else {
                    findFirst = executionStatus.getTasks().stream().filter(task3 -> {
                        return !task3.getStatus().isTerminal() && task3.getReferenceTaskName().equals(value3);
                    }).findFirst();
                }
                if (!findFirst.isPresent()) {
                    LOGGER.error("No matching tasks found to be marked as completed for workflow {}, taskRefName {}, taskId {}", new Object[]{value2, value3, value4});
                    observableQueue.ack(Collections.singletonList(message));
                    return;
                }
                Task task4 = (Task) findFirst.get();
                task4.setStatus(status);
                task4.getOutputData().putAll((Map) this.objectMapper.convertValue(readTree, _mapType));
                this.executionService.updateTask(task4);
                List<String> ack = observableQueue.ack(Collections.singletonList(message));
                if (!ack.isEmpty()) {
                    LOGGER.error("Not able to ack the messages {}", ack.toString());
                }
            } catch (JsonParseException e) {
                LOGGER.error("Bad message? : {} ", message, e);
                observableQueue.ack(Collections.singletonList(message));
            } catch (ApplicationException e2) {
                if (e2.getCode().equals(ApplicationException.Code.NOT_FOUND)) {
                    LOGGER.error("Workflow ID specified is not valid for this environment");
                    observableQueue.ack(Collections.singletonList(message));
                }
                LOGGER.error("Error processing message: {}", message, e2);
            } catch (Exception e3) {
                LOGGER.error("Error processing message: {}", message, e3);
            }
        }, th -> {
            LOGGER.error(th.getMessage(), th);
        });
        LOGGER.info("QueueListener::STARTED...listening for " + observableQueue.getName());
    }

    private String getValue(String str, JsonNode jsonNode) {
        JsonNode findValue = jsonNode.findValue(str);
        if (findValue == null) {
            return null;
        }
        return findValue.textValue();
    }

    public Map<String, Long> size() {
        HashMap hashMap = new HashMap();
        this.queues.forEach((status, observableQueue) -> {
            hashMap.put(observableQueue.getName(), Long.valueOf(observableQueue.size()));
        });
        return hashMap;
    }

    public Map<Task.Status, String> queues() {
        HashMap hashMap = new HashMap();
        this.queues.forEach((status, observableQueue) -> {
            hashMap.put(status, observableQueue.getURI());
        });
        return hashMap;
    }

    public void updateByTaskRefName(String str, String str2, Map<String, Object> map, Task.Status status) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("workflowId", str);
        hashMap.put("taskRefName", str2);
        update(hashMap, map, status);
    }

    public void updateByTaskId(String str, String str2, Map<String, Object> map, Task.Status status) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("workflowId", str);
        hashMap.put("taskId", str2);
        update(hashMap, map, status);
    }

    private void update(Map<String, Object> map, Map<String, Object> map2, Task.Status status) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("externalId", this.objectMapper.writeValueAsString(map));
        hashMap.putAll(map2);
        Message message = new Message(UUID.randomUUID().toString(), this.objectMapper.writeValueAsString(hashMap), null);
        ObservableQueue observableQueue = this.queues.get(status);
        if (observableQueue == null) {
            throw new IllegalArgumentException("There is no queue for handling " + status.toString() + " status");
        }
        observableQueue.publish(Collections.singletonList(message));
    }
}
