/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.registry;

import com.google.common.collect.Sets;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class WorkerRegistry {
    private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
    @Autowired
    private ZookeeperRegistryCenter zookeeperRegistryCenter;
    @Autowired
    private WorkerConfig workerConfig;
    private ScheduledExecutorService heartBeatExecutor;
    private String startTime;
    private Set<String> workerGroups;

    @PostConstruct
    public void init() {
        this.workerGroups = this.workerConfig.getWorkerGroups();
        this.startTime = DateUtils.dateToString((Date)new Date());
        this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory("HeartBeatExecutor"));
    }

    public void registry() {
        final String address = OSUtils.getHost();
        Set<String> workerZkPaths = this.getWorkerZkPaths();
        int workerHeartbeatInterval = this.workerConfig.getWorkerHeartbeatInterval();
        for (final String workerZKPath : workerZkPaths) {
            this.zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, "");
            this.zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener((Object)new ConnectionStateListener(){

                public void stateChanged(CuratorFramework client, ConnectionState newState) {
                    if (newState == ConnectionState.LOST) {
                        WorkerRegistry.this.logger.error("worker : {} connection lost from zookeeper", (Object)address);
                    } else if (newState == ConnectionState.RECONNECTED) {
                        WorkerRegistry.this.logger.info("worker : {} reconnected to zookeeper", (Object)address);
                        WorkerRegistry.this.zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, "");
                    } else if (newState == ConnectionState.SUSPENDED) {
                        WorkerRegistry.this.logger.warn("worker : {} connection SUSPENDED ", (Object)address);
                    }
                }
            });
            this.logger.info("worker node : {} registry to ZK {} successfully", (Object)address, (Object)workerZKPath);
        }
        HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime, this.workerConfig.getWorkerReservedMemory(), this.workerConfig.getWorkerMaxCpuloadAvg(), workerZkPaths, this.zookeeperRegistryCenter);
        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
        this.logger.info("worker node : {} heartbeat interval {} s", (Object)address, (Object)workerHeartbeatInterval);
    }

    public void unRegistry() {
        String address = this.getLocalAddress();
        Set<String> workerZkPaths = this.getWorkerZkPaths();
        for (String workerZkPath : workerZkPaths) {
            this.zookeeperRegistryCenter.getZookeeperCachedOperator().remove(workerZkPath);
            this.logger.info("worker node : {} unRegistry from ZK {}.", (Object)address, (Object)workerZkPath);
        }
        this.heartBeatExecutor.shutdownNow();
    }

    private Set<String> getWorkerZkPaths() {
        HashSet workerZkPaths = Sets.newHashSet();
        String address = this.getLocalAddress();
        String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath();
        for (String workGroup : this.workerGroups) {
            StringBuilder workerZkPathBuilder = new StringBuilder(100);
            workerZkPathBuilder.append(workerZkPathPrefix).append("/");
            if (StringUtils.isEmpty((CharSequence)workGroup)) {
                workGroup = "default";
            }
            workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append("/");
            workerZkPathBuilder.append(address);
            workerZkPaths.add(workerZkPathBuilder.toString());
        }
        return workerZkPaths;
    }

    private String getLocalAddress() {
        return OSUtils.getAddr((int)this.workerConfig.getListenPort());
    }
}

