/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.processor.queue;

import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandleError;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandleException;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandler;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecuteRunnable
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteRunnable.class);
    private final int processInstanceId;
    private final ConcurrentLinkedQueue<TaskEvent> events = new ConcurrentLinkedQueue();
    private final Map<TaskEventType, TaskEventHandler> taskEventHandlerMap;

    public TaskExecuteRunnable(int processInstanceId, Map<TaskEventType, TaskEventHandler> taskEventHandlerMap) {
        this.processInstanceId = processInstanceId;
        this.taskEventHandlerMap = taskEventHandlerMap;
    }

    @Override
    public void run() {
        while (!this.events.isEmpty()) {
            TaskEvent event = this.events.peek();
            try {
                LoggerUtils.setWorkflowAndTaskInstanceIDMDC((Integer)event.getProcessInstanceId(), (Integer)event.getTaskInstanceId());
                logger.info("Handle task event begin: {}", (Object)event);
                this.taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event);
                this.events.remove(event);
                logger.info("Handle task event finished: {}", (Object)event);
            }
            catch (TaskEventHandleException taskEventHandleException) {
                logger.error("Handle task event failed, this event will be retry later, event: {}", (Object)event, (Object)taskEventHandleException);
            }
            catch (TaskEventHandleError taskEventHandleError) {
                logger.error("Handle task event error, this event will be removed, event: {}", (Object)event, (Object)taskEventHandleError);
                this.events.remove(event);
            }
            catch (Exception unknownException) {
                logger.error("Handle task event error, get a unknown exception, this event will be removed, event: {}", (Object)event, (Object)unknownException);
                this.events.remove(event);
            }
            finally {
                LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
            }
        }
    }

    public String getKey() {
        return String.valueOf(this.processInstanceId);
    }

    public int eventSize() {
        return this.events.size();
    }

    public boolean isEmpty() {
        return this.events.isEmpty();
    }

    public Integer getProcessInstanceId() {
        return this.processInstanceId;
    }

    public boolean addEvent(TaskEvent event) {
        if (event.getProcessInstanceId() != this.processInstanceId) {
            logger.warn("event would be abounded, task instance id:{}, process instance id:{}, this.processInstanceId:{}", new Object[]{event.getTaskInstanceId(), event.getProcessInstanceId(), this.processInstanceId});
            return false;
        }
        return this.events.add(event);
    }
}

