package com.alibaba.jstorm.schedule;

import backtype.storm.Config;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.blobstore.BlobStore;
import com.alibaba.jstorm.blobstore.BlobStoreUtils;
import com.alibaba.jstorm.blobstore.BlobSynchronizer;
import com.alibaba.jstorm.blobstore.LocalFsBlobStore;
import com.alibaba.jstorm.callback.Callback;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.nimbus.metric.uploader.AlimonitorClient;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.NetWorkUtils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.com.google.common.collect.Sets;
import shade.storm.org.apache.commons.lang.StringUtils;
import shade.storm.org.apache.thrift.protocol.TMultiplexedProtocol;

/* loaded from: input_file:com/alibaba/jstorm/schedule/FollowerRunnable.class */
public class FollowerRunnable implements Runnable {
    private NimbusData data;
    private int sleepTime;
    private volatile boolean state = true;
    private RunnableCallback blobSyncCallback;
    private Callback leaderCallback;
    private final String hostPort;
    public static final String NIMBUS_DIFFER_COUNT_ZK = "nimbus.differ.count.zk";
    private static final Logger LOG = LoggerFactory.getLogger(FollowerRunnable.class);
    public static final Integer SLAVE_NIMBUS_WAIT_TIME = 60;

    public FollowerRunnable(NimbusData nimbusData, int i, Callback callback) {
        boolean equals;
        this.data = nimbusData;
        this.sleepTime = i;
        this.leaderCallback = callback;
        if (ConfigExtension.isNimbusUseIp(nimbusData.getConf())) {
            this.hostPort = NetWorkUtils.ip() + TMultiplexedProtocol.SEPARATOR + String.valueOf(Utils.getInt(nimbusData.getConf().get(Config.NIMBUS_THRIFT_PORT)));
            equals = NetWorkUtils.ip().equals(AlimonitorClient.DEFAUT_ADDR);
        } else {
            this.hostPort = NetWorkUtils.hostname() + TMultiplexedProtocol.SEPARATOR + String.valueOf(Utils.getInt(nimbusData.getConf().get(Config.NIMBUS_THRIFT_PORT)));
            equals = NetWorkUtils.hostname().equals("localhost");
        }
        if (equals) {
            try {
                throw new Exception("the hostname which Nimbus get is localhost");
            } catch (Exception e) {
                LOG.error("get nimbus host error!", e);
                throw new RuntimeException(e);
            }
        }
        try {
            nimbusData.getStormClusterState().update_nimbus_slave(this.hostPort, nimbusData.uptime());
            nimbusData.getStormClusterState().update_nimbus_detail(this.hostPort, null);
            StormClusterState stormClusterState = nimbusData.getStormClusterState();
            try {
                if (!stormClusterState.leader_existed()) {
                    tryToBeLeader(nimbusData.getConf());
                }
                try {
                    if (!stormClusterState.leader_existed()) {
                        tryToBeLeader(nimbusData.getConf());
                    }
                    this.blobSyncCallback = new RunnableCallback() { // from class: com.alibaba.jstorm.schedule.FollowerRunnable.1
                        @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
                        public void run() {
                            FollowerRunnable.this.blobSync();
                        }
                    };
                    if (nimbusData.getBlobStore() instanceof LocalFsBlobStore) {
                        try {
                            nimbusData.getStormClusterState().blobstore(this.blobSyncCallback);
                            setupBlobstore();
                        } catch (Exception e2) {
                            LOG.error("setup blob store error", e2);
                        }
                    }
                } catch (Exception e3) {
                    try {
                        try {
                            nimbusData.getStormClusterState().unregister_nimbus_host(this.hostPort);
                            nimbusData.getStormClusterState().unregister_nimbus_detail(this.hostPort);
                            LOG.error("try to be leader error.", e3);
                            throw new RuntimeException(e3);
                        } catch (Exception e4) {
                            LOG.info("due to task errors, so remove register nimbus infomation");
                            LOG.error("try to be leader error.", e3);
                            throw new RuntimeException(e3);
                        }
                    } catch (Throwable th) {
                        LOG.error("try to be leader error.", e3);
                        throw new RuntimeException(e3);
                    }
                }
            } catch (Exception e5) {
                LOG.error("register detail of nimbus fail!", e5);
                throw new RuntimeException();
            }
        } catch (Exception e6) {
            LOG.error("register nimbus host fail!", e6);
            throw new RuntimeException();
        }
    }

    private void setupBlobstore() throws Exception {
        BlobStore blobStore = this.data.getBlobStore();
        StormClusterState stormClusterState = this.data.getStormClusterState();
        HashSet newHashSet = Sets.newHashSet(blobStore.listKeys());
        HashSet newHashSet2 = Sets.newHashSet(stormClusterState.active_keys());
        Sets.SetView<String> intersection = Sets.intersection(newHashSet, newHashSet2);
        Sets.SetView difference = Sets.difference(newHashSet, newHashSet2);
        LOG.debug("deleting keys not on the zookeeper {}", difference);
        Iterator<E> it = difference.iterator();
        while (it.hasNext()) {
            blobStore.deleteBlob((String) it.next());
        }
        LOG.debug("Creating list of key entries for blobstore inside zookeeper {} local {}", newHashSet2, intersection);
        for (String str : intersection) {
            stormClusterState.setup_blobstore(str, this.data.getNimbusHostPortInfo(), BlobStoreUtils.getVersionForKey(str, this.data.getNimbusHostPortInfo(), this.data.getConf()));
        }
    }

    public boolean isLeader(String str) {
        if (StringUtils.isBlank(str)) {
            return false;
        }
        if (this.hostPort.equalsIgnoreCase(str)) {
            return true;
        }
        return NetWorkUtils.equals(str.split(TMultiplexedProtocol.SEPARATOR)[0], NetWorkUtils.ip());
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("Follower Thread starts!");
        while (this.state) {
            StormClusterState stormClusterState = this.data.getStormClusterState();
            try {
                Thread.sleep(this.sleepTime);
                if (stormClusterState.leader_existed()) {
                    String str = stormClusterState.get_leader_host();
                    if (isLeader(str)) {
                        if (!this.data.isLeader()) {
                            stormClusterState.unregister_nimbus_host(this.hostPort);
                            stormClusterState.unregister_nimbus_detail(this.hostPort);
                            this.data.setLeader(true);
                            this.leaderCallback.execute(new Object[0]);
                        }
                    } else if (this.data.isLeader()) {
                        LOG.info("New ZK master is " + str);
                        JStormUtils.halt_process(1, "Lose ZK master node, halt process");
                        return;
                    } else {
                        if (this.data.getBlobStore() instanceof LocalFsBlobStore) {
                            blobSync();
                        }
                        stormClusterState.update_nimbus_slave(this.hostPort, this.data.uptime());
                        update_nimbus_detail();
                    }
                } else {
                    tryToBeLeader(this.data.getConf());
                }
            } catch (InterruptedException e) {
            } catch (Exception e2) {
                if (this.state) {
                    LOG.error("Unknow exception ", e2);
                }
            }
        }
        LOG.info("Follower Thread has closed!");
    }

    public void clean() {
        this.state = false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void blobSync() {
        if (this.data.isLeader()) {
            return;
        }
        try {
            BlobStore blobStore = this.data.getBlobStore();
            StormClusterState stormClusterState = this.data.getStormClusterState();
            HashSet newHashSet = Sets.newHashSet(blobStore.listKeys());
            HashSet newHashSet2 = Sets.newHashSet(stormClusterState.blobstore(this.blobSyncCallback));
            BlobSynchronizer blobSynchronizer = new BlobSynchronizer(blobStore, this.data.getConf());
            blobSynchronizer.setNimbusInfo(this.data.getNimbusHostPortInfo());
            blobSynchronizer.setBlobStoreKeySet(newHashSet);
            blobSynchronizer.setZookeeperKeySet(newHashSet2);
            blobSynchronizer.syncBlobs();
        } catch (Exception e) {
            LOG.error("blob sync error", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void tryToBeLeader(final Map map) throws Exception {
        if (!check_nimbus_priority()) {
            LOG.info("This nimbus can't be leader");
            return;
        }
        RunnableCallback runnableCallback = new RunnableCallback() { // from class: com.alibaba.jstorm.schedule.FollowerRunnable.2
            @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
            public void run() {
                try {
                    FollowerRunnable.this.tryToBeLeader(map);
                } catch (Exception e) {
                    FollowerRunnable.LOG.error("To be master error", e);
                    JStormUtils.halt_process(30, "Cant't to be master" + e.getMessage());
                }
            }
        };
        LOG.info("This nimbus can be  leader");
        this.data.getStormClusterState().try_to_be_leader(Cluster.MASTER_SUBTREE, this.hostPort, runnableCallback);
    }

    private boolean check_nimbus_priority() throws Exception {
        Map map;
        Object obj;
        int update_nimbus_detail = update_nimbus_detail();
        if (update_nimbus_detail == 0) {
            return true;
        }
        for (int intValue = SLAVE_NIMBUS_WAIT_TIME.intValue(); intValue > 0; intValue -= 10) {
            LOG.info("nimbus.differ.count.zk is {}, so after {} seconds, nimbus will try to be Leader!", Integer.valueOf(update_nimbus_detail), Integer.valueOf(intValue));
            Thread.sleep(10000L);
        }
        StormClusterState stormClusterState = this.data.getStormClusterState();
        List<String> list_dirs = stormClusterState.list_dirs(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE, false);
        if (list_dirs == null || list_dirs.size() == 0) {
            return false;
        }
        for (String str : list_dirs) {
            if (str != null && !str.equals(this.hostPort) && (map = stormClusterState.get_nimbus_detail(str, false)) != null && (obj = map.get(NIMBUS_DIFFER_COUNT_ZK)) != null && JStormUtils.parseInt(obj).intValue() < update_nimbus_detail) {
                LOG.info("Current node can't be leader, due to {} has higher priority", str);
                return false;
            }
        }
        return true;
    }

    private int update_nimbus_detail() throws Exception {
        StormClusterState stormClusterState = this.data.getStormClusterState();
        int i = 0;
        if (this.data.getBlobStore() instanceof LocalFsBlobStore) {
            i = Sets.difference(Sets.newHashSet(stormClusterState.active_keys()), Sets.newHashSet(this.data.getBlobStore().listKeys())).size();
        }
        Map map = stormClusterState.get_nimbus_detail(this.hostPort, false);
        if (map == null) {
            map = new HashMap();
        }
        map.put(NIMBUS_DIFFER_COUNT_ZK, Integer.valueOf(i));
        stormClusterState.update_nimbus_detail(this.hostPort, map);
        LOG.debug("update nimbus's detail " + map);
        return i;
    }

    private void checkOwnMaster() throws Exception {
        StormClusterState stormClusterState = this.data.getStormClusterState();
        int i = 0;
        while (i < 10) {
            if (stormClusterState.leader_existed()) {
                String str = stormClusterState.get_leader_host();
                if (this.hostPort.equals(str)) {
                    return;
                } else {
                    LOG.warn("Current Nimbus has start thrift, but fail to own zk master :" + str);
                }
            }
            i++;
            JStormUtils.sleepMs(this.sleepTime);
        }
        LOG.error("Current Nimubs fail to own nimbus_master, should halt process");
        JStormUtils.halt_process(0, "Current Nimubs fail to own nimbus_master, should halt process");
    }
}
