/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.cluster;

import com.alibaba.jstorm.cache.JStormCache;
import com.alibaba.jstorm.callback.ClusterStateCallback;
import com.alibaba.jstorm.callback.WatcherCallBack;
import com.alibaba.jstorm.cluster.ClusterState;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.zk.Zookeeper;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedClusterState
implements ClusterState {
    private static Logger LOG = LoggerFactory.getLogger(DistributedClusterState.class);
    private Zookeeper zkobj = new Zookeeper();
    private CuratorFramework zk;
    private WatcherCallBack watcher;
    private ConcurrentHashMap<UUID, ClusterStateCallback> callbacks = new ConcurrentHashMap();
    private Map<Object, Object> conf;
    private AtomicBoolean active;
    private JStormCache zkCache;

    public DistributedClusterState(Map<Object, Object> _conf) throws Exception {
        this.conf = _conf;
        CuratorFramework _zk = this.mkZk();
        String path = String.valueOf(this.conf.get("storm.zookeeper.root"));
        this.zkobj.mkdirs(_zk, path);
        _zk.close();
        this.active = new AtomicBoolean(true);
        this.watcher = new WatcherCallBack(){

            @Override
            public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) {
                if (DistributedClusterState.this.active.get()) {
                    if (!state.equals((Object)Watcher.Event.KeeperState.SyncConnected)) {
                        LOG.warn("Received event " + state + ":" + type + ":" + path + " with disconnected Zookeeper.");
                    } else {
                        LOG.info("Received event " + state + ":" + type + ":" + path);
                    }
                    if (!type.equals((Object)Watcher.Event.EventType.None)) {
                        for (Map.Entry e : DistributedClusterState.this.callbacks.entrySet()) {
                            ClusterStateCallback fn = (ClusterStateCallback)e.getValue();
                            fn.execute(type, path);
                        }
                    }
                }
            }
        };
        this.zk = null;
        this.zk = this.mkZk(this.watcher);
    }

    private CuratorFramework mkZk() throws IOException {
        return this.zkobj.mkClient(this.conf, (List)this.conf.get("storm.zookeeper.servers"), this.conf.get("storm.zookeeper.port"), "");
    }

    private CuratorFramework mkZk(WatcherCallBack watcher) throws NumberFormatException, IOException {
        return this.zkobj.mkClient(this.conf, (List)this.conf.get("storm.zookeeper.servers"), this.conf.get("storm.zookeeper.port"), String.valueOf(this.conf.get("storm.zookeeper.root")), watcher);
    }

    @Override
    public void close() {
        this.active.set(false);
        this.zk.close();
    }

    @Override
    public void delete_node(String path) throws Exception {
        if (this.zkCache != null) {
            this.zkCache.remove(path);
        }
        this.zkobj.deletereRcursive(this.zk, path);
    }

    @Override
    public List<String> get_children(String path, boolean watch) throws Exception {
        return this.zkobj.getChildren(this.zk, path, watch);
    }

    @Override
    public byte[] get_data(String path, boolean watch) throws Exception {
        byte[] ret = null;
        if (!watch && this.zkCache != null) {
            ret = (byte[])this.zkCache.get(path);
        }
        if (ret != null) {
            return ret;
        }
        ret = this.zkobj.getData(this.zk, path, watch);
        if (this.zkCache != null) {
            this.zkCache.put(path, ret);
        }
        return ret;
    }

    @Override
    public Integer get_version(String path, boolean watch) throws Exception {
        Integer ret = this.zkobj.getVersion(this.zk, path, watch);
        return ret;
    }

    @Override
    public byte[] get_data_sync(String path, boolean watch) throws Exception {
        byte[] ret = null;
        ret = this.zkobj.getData(this.zk, path, watch);
        if (this.zkCache != null && ret != null) {
            this.zkCache.put(path, ret);
        }
        return ret;
    }

    @Override
    public void mkdirs(String path) throws Exception {
        this.zkobj.mkdirs(this.zk, path);
    }

    @Override
    public void set_data(String path, byte[] data) throws Exception {
        if ((long)data.length > JStormUtils.SIZE_1_K * 800L) {
            throw new Exception("Writing 800k+ data into ZK is not allowed!, data size is " + data.length);
        }
        if (this.zkobj.exists(this.zk, path, false)) {
            this.zkobj.setData(this.zk, path, data);
        } else {
            this.zkobj.mkdirs(this.zk, PathUtils.parent_path(path));
            this.zkobj.createNode(this.zk, path, data, CreateMode.PERSISTENT);
        }
        if (this.zkCache != null) {
            this.zkCache.put(path, data);
        }
    }

    @Override
    public void set_ephemeral_node(String path, byte[] data) throws Exception {
        this.zkobj.mkdirs(this.zk, PathUtils.parent_path(path));
        if (this.zkobj.exists(this.zk, path, false)) {
            this.zkobj.setData(this.zk, path, data);
        } else {
            this.zkobj.createNode(this.zk, path, data, CreateMode.EPHEMERAL);
        }
        if (this.zkCache != null) {
            this.zkCache.put(path, data);
        }
    }

    @Override
    public UUID register(ClusterStateCallback callback) {
        UUID id = UUID.randomUUID();
        this.callbacks.put(id, callback);
        return id;
    }

    @Override
    public ClusterStateCallback unregister(UUID id) {
        return this.callbacks.remove(id);
    }

    @Override
    public boolean node_existed(String path, boolean watch) throws Exception {
        return this.zkobj.existsNode(this.zk, path, watch);
    }

    @Override
    public void tryToBeLeader(String path, byte[] host) throws Exception {
        this.zkobj.createNode(this.zk, path, host, CreateMode.EPHEMERAL);
    }

    public JStormCache getZkCache() {
        return this.zkCache;
    }

    public void setZkCache(JStormCache zkCache) {
        this.zkCache = zkCache;
    }
}

