/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.dispatch.executor;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.PostConstruct;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Message;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.AbstractExecutorManager;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class NettyExecutorManager
extends AbstractExecutorManager<Boolean> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(NettyExecutorManager.class);
    @Autowired
    private ServerNodeManager serverNodeManager;
    @Autowired
    private List<NettyRequestProcessor> nettyRequestProcessors;
    private final NettyRemotingClient nettyRemotingClient;

    public NettyExecutorManager() {
        NettyClientConfig clientConfig = new NettyClientConfig();
        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
    }

    @PostConstruct
    public void init() {
        for (NettyRequestProcessor nettyRequestProcessor : this.nettyRequestProcessors) {
            this.nettyRemotingClient.registerProcessor(nettyRequestProcessor);
        }
    }

    @Override
    public void execute(ExecutionContext context) throws ExecuteException {
        Set<String> allNodes = this.getAllNodes(context);
        HashSet<String> failNodeSet = new HashSet<String>();
        Message message = context.getMessage();
        Host host = context.getHost();
        for (int i = 0; i < allNodes.size(); ++i) {
            try {
                this.doExecute(host, message);
                context.setHost(host);
                context.getTaskInstance().setHost(host.getAddress());
                return;
            }
            catch (ExecuteException ex) {
                log.error("Execute command {} error", (Object)message, (Object)ex);
                try {
                    failNodeSet.add(host.getAddress());
                    HashSet<String> tmpAllIps = new HashSet<String>(allNodes);
                    Collection remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
                    if (!CollectionUtils.isNotEmpty((Collection)remained)) {
                        throw new ExecuteException("fail after try all nodes");
                    }
                    host = Host.of((String)((String)remained.iterator().next()));
                    log.error("retry execute command : {} host : {}", (Object)message, (Object)host);
                    continue;
                }
                catch (Throwable t) {
                    throw new ExecuteException("fail after try all nodes");
                }
            }
        }
    }

    @Override
    public void executeDirectly(ExecutionContext context) throws ExecuteException {
        Host host = context.getHost();
        this.doExecute(host, context.getMessage());
    }

    public void doExecute(Host host, Message message) throws ExecuteException {
        int retryCount = 3;
        boolean success = false;
        do {
            try {
                this.nettyRemotingClient.send(host, message);
                success = true;
            }
            catch (Exception ex) {
                log.error("Send command to {} error, command: {}", new Object[]{host, message, ex});
                --retryCount;
                ThreadUtils.sleep((long)1000L);
            }
        } while (retryCount >= 0 && !success);
        if (!success) {
            throw new ExecuteException(String.format("send command : %s to %s error", message, host));
        }
    }

    private Set<String> getAllNodes(ExecutionContext context) throws WorkerGroupNotFoundException {
        Set<String> nodes = Collections.emptySet();
        ExecutorType executorType = context.getExecutorType();
        switch (executorType) {
            case WORKER: {
                nodes = this.serverNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
                break;
            }
            case CLIENT: {
                break;
            }
            default: {
                throw new IllegalArgumentException("invalid executor type : " + (Object)((Object)executorType));
            }
        }
        return nodes;
    }
}

