/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.cluster;

import backtype.storm.generated.TopologyTaskHbInfo;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.cache.JStormCache;
import com.alibaba.jstorm.callback.ClusterStateCallback;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.ClusterState;
import com.alibaba.jstorm.cluster.DistributedClusterState;
import com.alibaba.jstorm.cluster.StormBase;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormStatus;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.AssignmentBak;
import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.task.backpressure.SourceBackpressureInfo;
import com.alibaba.jstorm.task.error.TaskError;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StormZkClusterState
implements StormClusterState {
    private static Logger LOG = LoggerFactory.getLogger(StormZkClusterState.class);
    private ClusterState cluster_state;
    private ConcurrentHashMap<String, RunnableCallback> assignment_info_callback;
    private AtomicReference<RunnableCallback> supervisors_callback;
    private AtomicReference<RunnableCallback> assignments_callback;
    private ConcurrentHashMap<String, RunnableCallback> storm_base_callback;
    private AtomicReference<RunnableCallback> master_callback;
    private UUID state_id;
    private boolean solo;
    private static final String TASK_IS_DEAD = "is dead on";

    public StormZkClusterState(Object cluster_state_spec) throws Exception {
        String[] pathlist;
        if (cluster_state_spec instanceof ClusterState) {
            this.solo = false;
            this.cluster_state = (ClusterState)cluster_state_spec;
        } else {
            this.solo = true;
            this.cluster_state = new DistributedClusterState((Map)cluster_state_spec);
        }
        this.assignment_info_callback = new ConcurrentHashMap();
        this.supervisors_callback = new AtomicReference<Object>(null);
        this.assignments_callback = new AtomicReference<Object>(null);
        this.storm_base_callback = new ConcurrentHashMap();
        this.master_callback = new AtomicReference<Object>(null);
        this.state_id = this.cluster_state.register(new ClusterStateCallback(){

            @Override
            public <T> Object execute(T ... args) {
                if (args == null) {
                    LOG.warn("Input args is null");
                    return null;
                }
                if (args.length < 2) {
                    LOG.warn("Input args is invalid, args length:" + args.length);
                    return null;
                }
                Watcher.Event.EventType zkEventTypes = (Watcher.Event.EventType)args[0];
                String path = (String)args[1];
                List<String> toks = PathUtils.tokenize_path(path);
                int size = toks.size();
                if (size >= 1) {
                    String params = null;
                    String root = toks.get(0);
                    RunnableCallback fn = null;
                    if (root.equals("assignments")) {
                        if (size == 1) {
                            fn = StormZkClusterState.this.assignments_callback.getAndSet(null);
                        } else {
                            params = toks.get(1);
                            fn = (RunnableCallback)StormZkClusterState.this.assignment_info_callback.remove(params);
                        }
                    } else if (root.equals("supervisors")) {
                        fn = StormZkClusterState.this.supervisors_callback.getAndSet(null);
                    } else if (root.equals("topology") && size > 1) {
                        params = toks.get(1);
                        fn = (RunnableCallback)StormZkClusterState.this.storm_base_callback.remove(params);
                    } else if (root.equals("nimbus_master")) {
                        fn = StormZkClusterState.this.master_callback.getAndSet(null);
                    } else {
                        LOG.error("Unknown callback for subtree " + path);
                    }
                    if (fn != null) {
                        fn.run();
                    }
                }
                return null;
            }
        });
        for (String path : pathlist = JStormUtils.mk_arr(Cluster.SUPERVISORS_SUBTREE, Cluster.STORMS_SUBTREE, Cluster.ASSIGNMENTS_SUBTREE, Cluster.ASSIGNMENTS_BAK_SUBTREE, Cluster.TASKS_SUBTREE, Cluster.TASKBEATS_SUBTREE, Cluster.TASKERRORS_SUBTREE, Cluster.METRIC_SUBTREE, Cluster.BACKPRESSURE_SUBTREE)) {
            this.cluster_state.mkdirs(path);
        }
    }

    public void setCache(JStormCache simpleCache) {
        if (this.cluster_state instanceof DistributedClusterState) {
            ((DistributedClusterState)this.cluster_state).setZkCache(simpleCache);
        }
    }

    public Object getObject(String path, boolean callback) throws Exception {
        byte[] data = this.cluster_state.get_data(path, callback);
        return Utils.maybe_deserialize(data);
    }

    public Object getObjectSync(String path, boolean callback) throws Exception {
        byte[] data = this.cluster_state.get_data_sync(path, callback);
        return Utils.maybe_deserialize(data);
    }

    public String getString(String path, boolean callback) throws Exception {
        byte[] data = this.cluster_state.get_data(path, callback);
        return new String(data);
    }

    public void deleteObject(String path) {
        try {
            this.cluster_state.delete_node(path);
        }
        catch (Exception e) {
            LOG.warn("Failed to delete node " + path);
        }
    }

    public void setObject(String path, Object obj) throws Exception {
        if (obj instanceof byte[]) {
            this.cluster_state.set_data(path, (byte[])obj);
        } else if (obj instanceof String) {
            this.cluster_state.set_data(path, ((String)obj).getBytes());
        } else {
            this.cluster_state.set_data(path, Utils.serialize(obj));
        }
    }

    public void setTempObject(String path, Object obj) throws Exception {
        if (obj instanceof byte[]) {
            this.cluster_state.set_ephemeral_node(path, (byte[])obj);
        } else if (obj instanceof String) {
            this.cluster_state.set_ephemeral_node(path, ((String)obj).getBytes());
        } else {
            this.cluster_state.set_ephemeral_node(path, Utils.serialize(obj));
        }
    }

    @Override
    public void disconnect() {
        this.cluster_state.unregister(this.state_id);
        if (this.solo) {
            this.cluster_state.close();
        }
    }

    public void remove_storm(String topologyId, boolean needSleep) {
        this.deleteObject(Cluster.assignment_path(topologyId));
        if (needSleep) {
            JStormUtils.sleepMs(10000L);
        }
        try {
            this.deleteObject(Cluster.storm_task_root(topologyId));
            this.teardown_heartbeats(topologyId);
            this.teardown_task_errors(topologyId);
            this.teardown_backpressure(topologyId);
            this.deleteObject(Cluster.metric_path(topologyId));
        }
        catch (Exception e) {
            LOG.warn("Failed to delete task root and monitor root for" + topologyId);
        }
        this.remove_storm_base(topologyId);
    }

    @Override
    public void remove_storm(String topologyId) throws Exception {
        this.remove_storm(topologyId, true);
    }

    @Override
    public void try_remove_storm(String topologyId) {
        this.remove_storm(topologyId, false);
    }

    @Override
    public Assignment assignment_info(String topologyId, RunnableCallback callback) throws Exception {
        if (callback != null) {
            this.assignment_info_callback.put(topologyId, callback);
        }
        String assgnmentPath = Cluster.assignment_path(topologyId);
        return (Assignment)this.getObject(assgnmentPath, callback != null);
    }

    @Override
    public Integer assignment_version(String topologyId, RunnableCallback callback) throws Exception {
        if (callback != null) {
            this.assignment_info_callback.put(topologyId, callback);
        }
        String assgnmentPath = Cluster.assignment_path(topologyId);
        return this.cluster_state.get_version(assgnmentPath, callback != null);
    }

    @Override
    public List<String> assignments(RunnableCallback callback) throws Exception {
        if (callback != null) {
            this.assignments_callback.set(callback);
        }
        return this.cluster_state.get_children(Cluster.ASSIGNMENTS_SUBTREE, callback != null);
    }

    @Override
    public void set_assignment(String topologyId, Assignment info) throws Exception {
        this.setObject(Cluster.assignment_path(topologyId), info);
    }

    @Override
    public AssignmentBak assignment_bak(String topologyName) throws Exception {
        String assgnmentBakPath = Cluster.assignment_bak_path(topologyName);
        return (AssignmentBak)this.getObject(assgnmentBakPath, false);
    }

    @Override
    public void backup_assignment(String topologyName, AssignmentBak info) throws Exception {
        this.setObject(Cluster.assignment_bak_path(topologyName), info);
    }

    @Override
    public StormBase storm_base(String topologyId, RunnableCallback callback) throws Exception {
        if (callback != null) {
            this.storm_base_callback.put(topologyId, callback);
        }
        return (StormBase)this.getObject(Cluster.storm_path(topologyId), callback != null);
    }

    @Override
    public void activate_storm(String topologyId, StormBase stormBase) throws Exception {
        String stormPath = Cluster.storm_path(topologyId);
        this.setObject(stormPath, stormBase);
    }

    @Override
    public void remove_storm_base(String topologyId) {
        this.deleteObject(Cluster.storm_path(topologyId));
    }

    @Override
    public void update_storm(String topologyId, StormStatus newElems) throws Exception {
        StormBase base = this.storm_base(topologyId, null);
        if (base != null) {
            base.setStatus(newElems);
            this.setObject(Cluster.storm_path(topologyId), base);
        }
    }

    @Override
    public void set_storm_monitor(String topologyId, boolean isEnable) throws Exception {
        StormBase base = this.storm_base(topologyId, null);
        if (base != null) {
            base.setEnableMonitor(isEnable);
            this.setObject(Cluster.storm_path(topologyId), base);
        }
    }

    @Override
    public List<String> active_storms() throws Exception {
        return this.cluster_state.get_children(Cluster.STORMS_SUBTREE, false);
    }

    @Override
    public void topology_heartbeat(String topologyId, TopologyTaskHbInfo info) throws Exception {
        String taskPath = Cluster.taskbeat_storm_root(topologyId);
        this.setObject(taskPath, info);
    }

    @Override
    public TopologyTaskHbInfo topology_heartbeat(String topologyId) throws Exception {
        String taskPath = Cluster.taskbeat_storm_root(topologyId);
        return (TopologyTaskHbInfo)this.getObject(taskPath, false);
    }

    @Override
    public List<String> heartbeat_storms() throws Exception {
        return this.cluster_state.get_children(Cluster.TASKBEATS_SUBTREE, false);
    }

    @Override
    public void teardown_heartbeats(String topologyId) {
        try {
            String taskbeatPath = Cluster.taskbeat_storm_root(topologyId);
            this.deleteObject(taskbeatPath);
        }
        catch (Exception e) {
            LOG.warn("Could not teardown heartbeats for " + topologyId, (Throwable)e);
        }
    }

    @Override
    public void report_task_error(String topologyId, int taskId, Throwable error) throws Exception {
        this.report_task_error(topologyId, taskId, JStormUtils.getErrorInfo(error), "fatal", 700);
    }

    @Override
    public void report_task_error(String topology_id, int task_id, String error) throws Exception {
        this.report_task_error(topology_id, task_id, error, "fatal", 700);
    }

    @Override
    public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code) throws Exception {
        this.report_task_error(topology_id, task_id, error, error_level, error_code, 1800);
    }

    @Override
    public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code, int duration_secs) throws Exception {
        this.report_task_error(topology_id, task_id, error, error_level, error_code, duration_secs, null);
    }

    @Override
    public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code, int duration_secs, String tag) throws Exception {
        boolean found = false;
        String path = Cluster.taskerror_path(topology_id, task_id);
        this.cluster_state.mkdirs(path);
        ArrayList<Integer> children = new ArrayList<Integer>();
        int timeSecs = TimeUtils.current_time_secs();
        String timestampPath = path + "/" + timeSecs;
        TaskError taskError = new TaskError(error, error_level, error_code, timeSecs, duration_secs);
        for (String str : this.cluster_state.get_children(path, false)) {
            String errorPath = path + "/" + str;
            Object obj = this.getObject(errorPath, false);
            if (obj == null) {
                this.deleteObject(errorPath);
                continue;
            }
            TaskError errorInfo = (TaskError)obj;
            if (errorInfo.getError().equals(error) || tag != null && errorInfo.getError().startsWith(tag)) {
                this.cluster_state.delete_node(errorPath);
                this.setObject(timestampPath, taskError);
                found = true;
                break;
            }
            children.add(Integer.parseInt(str));
        }
        if (!found) {
            Collections.sort(children);
            while (children.size() >= 3) {
                this.deleteObject(path + "/" + children.remove(0));
            }
            this.setObject(timestampPath, taskError);
        }
        this.setLastErrInfo(topology_id, duration_secs, timeSecs);
    }

    private void setLastErrInfo(String topologyId, int duration, int timeStamp) throws Exception {
        String lastErrTopoPath = Cluster.lasterror_path(topologyId);
        HashMap<Integer, String> lastErrInfo = null;
        try {
            lastErrInfo = (HashMap<Integer, String>)this.getObject(lastErrTopoPath, false);
        }
        catch (Exception e) {
            LOG.error("Failed to get last error time. Remove the corrupt node for " + topologyId, (Throwable)e);
            this.remove_lastErr_time(topologyId);
            lastErrInfo = null;
        }
        if (lastErrInfo == null) {
            lastErrInfo = new HashMap<Integer, String>();
        }
        lastErrInfo.put(duration, timeStamp + "");
        this.setObject(lastErrTopoPath, lastErrInfo);
    }

    @Override
    public void remove_task_error(String topologyId, int taskId) throws Exception {
        String path = Cluster.taskerror_path(topologyId, taskId);
        this.cluster_state.delete_node(path);
    }

    @Override
    public Map<Integer, String> topo_lastErr_time(String topologyId) throws Exception {
        String path = Cluster.lasterror_path(topologyId);
        return (Map)this.getObject(path, false);
    }

    @Override
    public void remove_lastErr_time(String topologyId) throws Exception {
        String path = Cluster.lasterror_path(topologyId);
        this.deleteObject(path);
    }

    @Override
    public List<String> task_error_storms() throws Exception {
        return this.cluster_state.get_children(Cluster.TASKERRORS_SUBTREE, false);
    }

    @Override
    public List<String> task_error_ids(String topologyId) throws Exception {
        return this.cluster_state.get_children(Cluster.taskerror_storm_root(topologyId), false);
    }

    @Override
    public List<String> task_error_time(String topologyId, int taskId) throws Exception {
        String path = Cluster.taskerror_path(topologyId, taskId);
        if (!this.cluster_state.node_existed(path, false)) {
            return new ArrayList<String>();
        }
        return this.cluster_state.get_children(path, false);
    }

    @Override
    public void remove_task(String topologyId, Set<Integer> taskIds) throws Exception {
        String tasksPath = Cluster.storm_task_root(topologyId);
        Object data = this.getObject(tasksPath, false);
        if (data != null) {
            Map taskInfoMap = (Map)data;
            for (Integer taskId : taskIds) {
                taskInfoMap.remove(taskId);
            }
            this.setObject(tasksPath, taskInfoMap);
        }
    }

    @Override
    public TaskError task_error_info(String topologyId, int taskId, long timeStamp) throws Exception {
        String path = Cluster.taskerror_path(topologyId, taskId);
        path = path + "/" + timeStamp;
        return (TaskError)this.getObject(path, false);
    }

    @Override
    public List<TaskError> task_errors(String topologyId, int taskId) throws Exception {
        ArrayList<TaskError> errors = new ArrayList<TaskError>();
        String path = Cluster.taskerror_path(topologyId, taskId);
        if (!this.cluster_state.node_existed(path, false)) {
            return errors;
        }
        List<String> children = this.cluster_state.get_children(path, false);
        for (String str : children) {
            Object obj = this.getObject(path + "/" + str, false);
            if (obj == null) continue;
            TaskError error = (TaskError)obj;
            errors.add(error);
        }
        Collections.sort(errors, new Comparator<TaskError>(){

            @Override
            public int compare(TaskError o1, TaskError o2) {
                if (o1.getTimSecs() > o2.getTimSecs()) {
                    return 1;
                }
                if (o1.getTimSecs() < o2.getTimSecs()) {
                    return -1;
                }
                return 0;
            }
        });
        return errors;
    }

    @Override
    public void teardown_task_errors(String topologyId) {
        try {
            String taskerrPath = Cluster.taskerror_storm_root(topologyId);
            this.deleteObject(taskerrPath);
        }
        catch (Exception e) {
            LOG.error("Could not teardown errors for " + topologyId, (Throwable)e);
        }
    }

    @Override
    public void set_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap) throws Exception {
        String stormTaskPath = Cluster.storm_task_root(topologyId);
        if (taskInfoMap != null) {
            this.setObject(stormTaskPath, taskInfoMap);
        }
    }

    @Override
    public void add_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap) throws Exception {
        String stormTaskPath = Cluster.storm_task_root(topologyId);
        Object data = this.getObject(stormTaskPath, false);
        if (data != null) {
            ((Map)data).putAll(taskInfoMap);
            this.setObject(stormTaskPath, data);
        }
    }

    @Override
    public List<String> task_storms() throws Exception {
        return this.cluster_state.get_children(Cluster.TASKS_SUBTREE, false);
    }

    @Override
    public Set<Integer> task_ids(String stromId) throws Exception {
        String stormTaskPath = Cluster.storm_task_root(stromId);
        Object data = this.getObject(stormTaskPath, false);
        if (data == null) {
            return null;
        }
        return ((Map)data).keySet();
    }

    @Override
    public Set<Integer> task_ids_by_componentId(String topologyId, String componentId) throws Exception {
        String stormTaskPath = Cluster.storm_task_root(topologyId);
        Object data = this.getObject(stormTaskPath, false);
        if (data == null) {
            return null;
        }
        Map taskInfoMap = (Map)data;
        HashSet<Integer> rtn = new HashSet<Integer>();
        Set taskIds = taskInfoMap.keySet();
        for (Integer taskId : taskIds) {
            TaskInfo taskInfo = (TaskInfo)taskInfoMap.get(taskId);
            if (taskInfo == null || !taskInfo.getComponentId().equalsIgnoreCase(componentId)) continue;
            rtn.add(taskId);
        }
        return rtn;
    }

    @Override
    public Map<Integer, TaskInfo> task_all_info(String topologyId) throws Exception {
        String taskPath = Cluster.storm_task_root(topologyId);
        Object data = this.getObject(taskPath, false);
        if (data == null) {
            return null;
        }
        return (Map)data;
    }

    @Override
    public SupervisorInfo supervisor_info(String supervisorId) throws Exception {
        String supervisorPath = Cluster.supervisor_path(supervisorId);
        return (SupervisorInfo)this.getObject(supervisorPath, false);
    }

    @Override
    public List<String> supervisors(RunnableCallback callback) throws Exception {
        if (callback != null) {
            this.supervisors_callback.set(callback);
        }
        return this.cluster_state.get_children(Cluster.SUPERVISORS_SUBTREE, callback != null);
    }

    @Override
    public void supervisor_heartbeat(String supervisorId, SupervisorInfo info) throws Exception {
        String supervisorPath = Cluster.supervisor_path(supervisorId);
        this.setTempObject(supervisorPath, info);
    }

    @Override
    public String get_leader_host() throws Exception {
        return new String(this.cluster_state.get_data(Cluster.MASTER_SUBTREE, false));
    }

    @Override
    public boolean leader_existed() throws Exception {
        return this.cluster_state.node_existed(Cluster.MASTER_SUBTREE, false);
    }

    @Override
    public List<String> get_nimbus_slaves() throws Exception {
        return this.cluster_state.get_children(Cluster.NIMBUS_SLAVE_SUBTREE, false);
    }

    @Override
    public String get_nimbus_slave_time(String host) throws Exception {
        String path = Cluster.NIMBUS_SLAVE_SUBTREE + "/" + host;
        return this.getString(path, false);
    }

    @Override
    public void update_nimbus_slave(String host, int time) throws Exception {
        this.setTempObject(Cluster.NIMBUS_SLAVE_SUBTREE + "/" + host, String.valueOf(time));
    }

    @Override
    public void unregister_nimbus_host(String host) throws Exception {
        this.deleteObject(Cluster.NIMBUS_SLAVE_SUBTREE + "/" + host);
    }

    @Override
    public void update_nimbus_detail(String hostPort, Map map) throws Exception {
        this.cluster_state.set_ephemeral_node(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE + "/" + hostPort, Utils.serialize(map));
    }

    @Override
    public Map get_nimbus_detail(String hostPort, boolean watch) throws Exception {
        byte[] data = this.cluster_state.get_data(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE + "/" + hostPort, watch);
        return (Map)Utils.maybe_deserialize(data);
    }

    @Override
    public void unregister_nimbus_detail(String hostPort) throws Exception {
        this.cluster_state.delete_node(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE + "/" + hostPort);
    }

    @Override
    public boolean try_to_be_leader(String path, String host, RunnableCallback callback) throws Exception {
        if (callback != null) {
            this.master_callback.set(callback);
        }
        try {
            this.cluster_state.tryToBeLeader(path, host.getBytes());
        }
        catch (KeeperException.NodeExistsException e) {
            this.cluster_state.node_existed(path, true);
            LOG.info("leader is alive");
            return false;
        }
        return true;
    }

    @Override
    public void set_topology_metric(String topologyId, Object metric) throws Exception {
        String path = Cluster.metric_path(topologyId);
        this.setObject(path, metric);
    }

    @Override
    public Object get_topology_metric(String topologyId) throws Exception {
        return this.getObject(Cluster.metric_path(topologyId), false);
    }

    @Override
    public List<String> get_metrics() throws Exception {
        return this.cluster_state.get_children(Cluster.METRIC_SUBTREE, false);
    }

    @Override
    public List<String> list_dirs(String path, boolean watch) throws Exception {
        List<String> subDirs = null;
        subDirs = this.cluster_state.get_children(path, watch);
        return subDirs;
    }

    @Override
    public List<String> backpressureInfos() throws Exception {
        return this.cluster_state.get_children(Cluster.BACKPRESSURE_SUBTREE, false);
    }

    @Override
    public void set_backpressure_info(String topologyId, Map<String, SourceBackpressureInfo> sourceToBackpressureInfo) throws Exception {
        String path = Cluster.backpressure_path(topologyId);
        this.cluster_state.set_data(path, Utils.serialize(sourceToBackpressureInfo));
    }

    @Override
    public Map<String, SourceBackpressureInfo> get_backpressure_info(String topologyId) throws Exception {
        String path = Cluster.backpressure_path(topologyId);
        byte[] data = this.cluster_state.get_data(path, false);
        return (Map)Utils.maybe_deserialize(data);
    }

    @Override
    public void teardown_backpressure(String topologyId) {
        try {
            String backpressurePath = Cluster.backpressure_path(topologyId);
            this.cluster_state.delete_node(backpressurePath);
        }
        catch (Exception e) {
            LOG.warn("Could not teardown backpressure info for " + topologyId, (Throwable)e);
        }
    }
}

