package org.apache.dolphinscheduler.server.master.dispatch.executor;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.class */
public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {

    @Autowired
    private ServerNodeManager serverNodeManager;
    private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class);
    private final NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(new NettyClientConfig());

    @PostConstruct
    public void init() {
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
    }

    @Override // org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager
    public Boolean execute(ExecutionContext executionContext) throws ExecuteException {
        String str;
        Set<String> allNodes = getAllNodes(executionContext);
        HashSet hashSet = new HashSet();
        Command command = executionContext.getCommand();
        Host host = executionContext.getHost();
        boolean z = false;
        while (!z) {
            try {
                doExecute(host, command);
                z = true;
                executionContext.setHost(host);
            } catch (ExecuteException e) {
                this.logger.error(String.format("execute command : %s error", command), e);
                try {
                    hashSet.add(host.getAddress());
                    Collection subtract = CollectionUtils.subtract(new HashSet(allNodes), hashSet);
                    if (subtract == null || subtract.size() <= 0) {
                        throw new ExecuteException(str);
                    }
                    host = Host.of((String) subtract.iterator().next());
                    this.logger.error("retry execute command : {} host : {}", command, host);
                } finally {
                    ExecuteException executeException = new ExecuteException("fail after try all nodes");
                }
            }
        }
        return Boolean.valueOf(z);
    }

    @Override // org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager
    public void executeDirectly(ExecutionContext executionContext) throws ExecuteException {
        doExecute(executionContext.getHost(), executionContext.getCommand());
    }

    public void doExecute(Host host, Command command) throws ExecuteException {
        int i = 3;
        boolean z = false;
        do {
            try {
                this.nettyRemotingClient.send(host, command);
                z = true;
            } catch (Exception e) {
                this.logger.error(String.format("send command : %s to %s error", command, host), e);
                i--;
                ThreadUtils.sleep(100L);
            }
            if (i < 0) {
                break;
            }
        } while (!z);
        if (!z) {
            throw new ExecuteException(String.format("send command : %s to %s error", command, host));
        }
    }

    private Set<String> getAllNodes(ExecutionContext executionContext) {
        Set<String> emptySet = Collections.emptySet();
        ExecutorType executorType = executionContext.getExecutorType();
        switch (executorType) {
            case WORKER:
                emptySet = this.serverNodeManager.getWorkerGroupNodes(executionContext.getWorkerGroup());
                break;
            case CLIENT:
                break;
            default:
                throw new IllegalArgumentException("invalid executor type : " + executorType);
        }
        return emptySet;
    }

    public NettyRemotingClient getNettyRemotingClient() {
        return this.nettyRemotingClient;
    }
}
