/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.schedule;

import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.NetWorkUtils;
import com.alibaba.jstorm.utils.PathUtils;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.FileExistsException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FollowerRunnable
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(FollowerRunnable.class);
    private NimbusData data;
    private int sleepTime;
    private volatile boolean state = true;
    private RunnableCallback callback;
    private final String hostPort;
    public static final String NIMBUS_DIFFER_COUNT_ZK = "nimbus.differ.count.zk";
    public static final Integer SLAVE_NIMBUS_WAIT_TIME = 120;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FollowerRunnable(final NimbusData data, int sleepTime) {
        boolean isLocaliP;
        this.data = data;
        this.sleepTime = sleepTime;
        if (!ConfigExtension.isNimbusUseIp(data.getConf())) {
            this.hostPort = NetWorkUtils.hostname() + ":" + String.valueOf(Utils.getInt(data.getConf().get("nimbus.thrift.port")));
            isLocaliP = NetWorkUtils.hostname().equals("localhost");
        } else {
            this.hostPort = NetWorkUtils.ip() + ":" + String.valueOf(Utils.getInt(data.getConf().get("nimbus.thrift.port")));
            isLocaliP = NetWorkUtils.ip().equals("127.0.0.1");
        }
        try {
            if (isLocaliP) {
                throw new Exception("the hostname which Nimbus get is localhost");
            }
        }
        catch (Exception e1) {
            LOG.error("get nimbus host error!", (Throwable)e1);
            throw new RuntimeException(e1);
        }
        try {
            data.getStormClusterState().update_nimbus_slave(this.hostPort, data.uptime());
            data.getStormClusterState().update_nimbus_detail(this.hostPort, null);
        }
        catch (Exception e) {
            LOG.error("register nimbus host fail!", (Throwable)e);
            throw new RuntimeException();
        }
        try {
            StormClusterState zkClusterState = data.getStormClusterState();
            if (!zkClusterState.leader_existed()) {
                this.tryToBeLeader(data.getConf());
            }
        }
        catch (Exception e) {
            LOG.error("register detail of nimbus fail!", (Throwable)e);
            throw new RuntimeException();
        }
        try {
            this.tryToBeLeader(data.getConf());
        }
        catch (Exception e1) {
            try {
                data.getStormClusterState().unregister_nimbus_host(this.hostPort);
                data.getStormClusterState().unregister_nimbus_detail(this.hostPort);
            }
            catch (Exception e2) {
                LOG.info("due to task errors, so remove register nimbus infomation");
            }
            finally {
                LOG.error("try to be leader error.", (Throwable)e1);
                throw new RuntimeException(e1);
            }
        }
        this.callback = new RunnableCallback(){

            @Override
            public void run() {
                if (!data.isLeader()) {
                    FollowerRunnable.this.check();
                }
            }
        };
    }

    public boolean isLeader(String zkMaster) {
        if (StringUtils.isBlank((String)zkMaster)) {
            return false;
        }
        if (this.hostPort.equalsIgnoreCase(zkMaster)) {
            return true;
        }
        String[] part = zkMaster.split(":");
        return NetWorkUtils.equals(part[0], NetWorkUtils.ip());
    }

    @Override
    public void run() {
        LOG.info("Follower Thread starts!");
        while (this.state) {
            StormClusterState zkClusterState = this.data.getStormClusterState();
            try {
                Thread.sleep(this.sleepTime);
                if (!zkClusterState.leader_existed()) {
                    this.tryToBeLeader(this.data.getConf());
                    continue;
                }
                String master = zkClusterState.get_leader_host();
                boolean isZkLeader = this.isLeader(master);
                if (this.data.isLeader() && !isZkLeader) {
                    LOG.info("New ZK master is " + master);
                    JStormUtils.halt_process(1, "Lose ZK master node, halt process");
                    return;
                }
                if (isZkLeader) {
                    zkClusterState.unregister_nimbus_host(this.hostPort);
                    zkClusterState.unregister_nimbus_detail(this.hostPort);
                    this.data.setLeader(true);
                    continue;
                }
                this.check();
                zkClusterState.update_nimbus_slave(this.hostPort, this.data.uptime());
                this.update_nimbus_detail();
            }
            catch (InterruptedException e) {
            }
            catch (Exception e) {
                if (!this.state) continue;
                LOG.error("Unknow exception ", (Throwable)e);
            }
        }
        LOG.info("Follower Thread has closed!");
    }

    public void clean() {
        this.state = false;
    }

    private synchronized void check() {
        StormClusterState clusterState = this.data.getStormClusterState();
        try {
            String master_stormdist_root = StormConfig.masterStormdistRoot(this.data.getConf());
            List<String> code_ids = PathUtils.read_dir_contents(master_stormdist_root);
            List<String> assignments_ids = clusterState.assignments(this.callback);
            HashMap<String, Assignment> assignmentMap = new HashMap<String, Assignment>();
            ArrayList<String> update_ids = new ArrayList<String>();
            for (String id : assignments_ids) {
                Long localCodeDownTS;
                Assignment assignment = clusterState.assignment_info(id, null);
                try {
                    Long tmp = StormConfig.read_nimbus_topology_timestamp(this.data.getConf(), id);
                    localCodeDownTS = tmp == null ? 0L : tmp;
                }
                catch (FileNotFoundException e) {
                    localCodeDownTS = 0L;
                }
                if (assignment != null && assignment.isTopologyChange(localCodeDownTS)) {
                    update_ids.add(id);
                }
                assignmentMap.put(id, assignment);
            }
            ArrayList<String> done_ids = new ArrayList<String>();
            for (String id : code_ids) {
                if (!assignments_ids.contains(id)) continue;
                done_ids.add(id);
            }
            for (String id : done_ids) {
                assignments_ids.remove(id);
                code_ids.remove(id);
            }
            assignments_ids.addAll(update_ids);
            for (String topologyId : code_ids) {
                this.deleteLocalTopology(topologyId);
            }
            for (String id : assignments_ids) {
                this.downloadCodeFromMaster((Assignment)assignmentMap.get(id), id);
            }
        }
        catch (IOException e) {
            LOG.error("Get stormdist dir error!", (Throwable)e);
            return;
        }
        catch (Exception e) {
            LOG.error("Check error!", (Throwable)e);
            return;
        }
    }

    private void deleteLocalTopology(String topologyId) throws IOException {
        String dir_to_delete = StormConfig.masterStormdistRoot(this.data.getConf(), topologyId);
        try {
            PathUtils.rmr(dir_to_delete);
            LOG.info("delete:" + dir_to_delete + "successfully!");
        }
        catch (IOException e) {
            LOG.error("delete:" + dir_to_delete + "fail!", (Throwable)e);
        }
    }

    private void downloadCodeFromMaster(Assignment assignment, String topologyId) throws IOException, TException {
        try {
            String localRoot = StormConfig.masterStormdistRoot(this.data.getConf(), topologyId);
            String tmpDir = StormConfig.masterInbox(this.data.getConf()) + "/" + UUID.randomUUID().toString();
            String masterCodeDir = assignment.getMasterCodeDir();
            JStormServerUtils.downloadCodeFromMaster(this.data.getConf(), tmpDir, masterCodeDir, topologyId, false);
            File srcDir = new File(tmpDir);
            File destDir = new File(localRoot);
            try {
                FileUtils.moveDirectory((File)srcDir, (File)destDir);
            }
            catch (FileExistsException e) {
                FileUtils.copyDirectory((File)srcDir, (File)destDir);
                FileUtils.deleteQuietly((File)srcDir);
            }
            StormConfig.write_nimbus_topology_timestamp(this.data.getConf(), topologyId, System.currentTimeMillis());
        }
        catch (TException e) {
            LOG.error((Object)((Object)e) + " downloadStormCode failed " + "topologyId:" + topologyId + "masterCodeDir:" + assignment.getMasterCodeDir());
            throw e;
        }
        LOG.info("Finished downloading code for topology id " + topologyId + " from " + assignment.getMasterCodeDir());
    }

    private void tryToBeLeader(final Map conf) throws Exception {
        boolean allowed = this.check_nimbus_priority();
        if (allowed) {
            RunnableCallback masterCallback = new RunnableCallback(){

                @Override
                public void run() {
                    try {
                        FollowerRunnable.this.tryToBeLeader(conf);
                    }
                    catch (Exception e) {
                        LOG.error("To be master error", (Throwable)e);
                        JStormUtils.halt_process(30, "Cant't to be master" + e.getMessage());
                    }
                }
            };
            LOG.info("This nimbus can be  leader");
            this.data.getStormClusterState().try_to_be_leader(Cluster.MASTER_SUBTREE, this.hostPort, masterCallback);
        } else {
            LOG.info("This nimbus can't be leader");
        }
    }

    private boolean check_nimbus_priority() throws Exception {
        int gap = this.update_nimbus_detail();
        if (gap == 0) {
            return true;
        }
        for (int left = SLAVE_NIMBUS_WAIT_TIME.intValue(); left > 0; left -= 10) {
            LOG.info("After " + left + " seconds, nimbus will try to be Leader!");
            Thread.sleep(10000L);
        }
        StormClusterState zkClusterState = this.data.getStormClusterState();
        List<String> followers = zkClusterState.list_dirs(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE, false);
        if (followers == null || followers.size() == 0) {
            return false;
        }
        for (String follower : followers) {
            Object object;
            Map bMap;
            if (follower == null || follower.equals(this.hostPort) || (bMap = zkClusterState.get_nimbus_detail(follower, false)) == null || (object = bMap.get(NIMBUS_DIFFER_COUNT_ZK)) == null || JStormUtils.parseInt(object) >= gap) continue;
            LOG.info("Current node can't be leader, due to {} has higher priority", (Object)follower);
            return false;
        }
        return true;
    }

    private int update_nimbus_detail() throws Exception {
        StormClusterState zkClusterState = this.data.getStormClusterState();
        String master_stormdist_root = StormConfig.masterStormdistRoot(this.data.getConf());
        List<String> code_ids = PathUtils.read_dir_contents(master_stormdist_root);
        List<String> assignments_ids = this.data.getStormClusterState().assignments(this.callback);
        assignments_ids.removeAll(code_ids);
        HashMap<String, Integer> mtmp = zkClusterState.get_nimbus_detail(this.hostPort, false);
        if (mtmp == null) {
            mtmp = new HashMap<String, Integer>();
        }
        mtmp.put(NIMBUS_DIFFER_COUNT_ZK, assignments_ids.size());
        zkClusterState.update_nimbus_detail(this.hostPort, mtmp);
        LOG.debug("update nimbus's detail " + mtmp);
        return assignments_ids.size();
    }

    private void checkOwnMaster() throws Exception {
        int retry_times = 10;
        StormClusterState zkClient = this.data.getStormClusterState();
        for (int i = 0; i < retry_times; ++i) {
            if (zkClient.leader_existed()) {
                String zkHost = zkClient.get_leader_host();
                if (this.hostPort.equals(zkHost)) {
                    return;
                }
                LOG.warn("Current Nimbus has start thrift, but fail to own zk master :" + zkHost);
            }
            JStormUtils.sleepMs(this.sleepTime);
        }
        String err = "Current Nimubs fail to own nimbus_master, should halt process";
        LOG.error(err);
        JStormUtils.halt_process(0, err);
    }
}

