/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.dispatch.executor;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.AbstractExecutorManager;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class NettyExecutorManager
extends AbstractExecutorManager<Boolean> {
    private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class);
    @Autowired
    private ServerNodeManager serverNodeManager;
    @Autowired
    private TaskKillResponseProcessor taskKillResponseProcessor;
    @Autowired
    private TaskRecallProcessor taskRecallProcessor;
    private final NettyRemotingClient nettyRemotingClient;

    public NettyExecutorManager() {
        NettyClientConfig clientConfig = new NettyClientConfig();
        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
    }

    @PostConstruct
    public void init() {
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, (NettyRequestProcessor)this.taskKillResponseProcessor);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_REJECT, (NettyRequestProcessor)this.taskRecallProcessor);
    }

    @Override
    public Boolean execute(ExecutionContext context) throws ExecuteException {
        Set<String> allNodes = this.getAllNodes(context);
        HashSet<String> failNodeSet = new HashSet<String>();
        Command command = context.getCommand();
        Host host = context.getHost();
        boolean success = false;
        while (!success) {
            try {
                this.doExecute(host, command);
                success = true;
                context.setHost(host);
                context.getTaskInstance().setHost(host.getAddress());
            }
            catch (ExecuteException ex) {
                this.logger.error("Execute command {} error", (Object)command, (Object)ex);
                try {
                    failNodeSet.add(host.getAddress());
                    HashSet<String> tmpAllIps = new HashSet<String>(allNodes);
                    Collection remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
                    if (remained != null && remained.size() > 0) {
                        host = Host.of((String)((String)remained.iterator().next()));
                        this.logger.error("retry execute command : {} host : {}", (Object)command, (Object)host);
                        continue;
                    }
                    throw new ExecuteException("fail after try all nodes");
                }
                catch (Throwable t) {
                    throw new ExecuteException("fail after try all nodes");
                }
            }
        }
        return success;
    }

    @Override
    public void executeDirectly(ExecutionContext context) throws ExecuteException {
        Host host = context.getHost();
        this.doExecute(host, context.getCommand());
    }

    public void doExecute(Host host, Command command) throws ExecuteException {
        int retryCount = 3;
        boolean success = false;
        do {
            try {
                this.nettyRemotingClient.send(host, command);
                success = true;
            }
            catch (Exception ex) {
                this.logger.error("Send command to {} error, command: {}", new Object[]{host, command, ex});
                --retryCount;
                ThreadUtils.sleep((long)1000L);
            }
        } while (retryCount >= 0 && !success);
        if (!success) {
            throw new ExecuteException(String.format("send command : %s to %s error", command, host));
        }
    }

    private Set<String> getAllNodes(ExecutionContext context) {
        Set<String> nodes = Collections.emptySet();
        ExecutorType executorType = context.getExecutorType();
        switch (executorType) {
            case WORKER: {
                nodes = this.serverNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
                break;
            }
            case CLIENT: {
                break;
            }
            default: {
                throw new IllegalArgumentException("invalid executor type : " + (Object)((Object)executorType));
            }
        }
        return nodes;
    }
}

