/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.metric;

import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.TopologyMetric;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.cache.JStormCache;
import com.alibaba.jstorm.cache.RocksDBCache;
import com.alibaba.jstorm.cache.TimeoutMemCache;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.metric.MetaType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.utils.OSInfo;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JStormMetricCache {
    private static final Logger LOG = LoggerFactory.getLogger(JStormMetricCache.class);
    public static final String TIMEOUT_MEM_CACHE_CLASS = TimeoutMemCache.class.getName();
    public static final String ROCKS_DB_CACHE_CLASS = RocksDBCache.class.getName();
    protected final Object lock = new Object();
    protected JStormCache cache = null;
    protected static final String METRIC_META_PREFIX = "__metric.meta__";
    protected static final String SENT_METRIC_META_PREFIX = "__saved.metric.meta__";
    protected static final String ALL_TOPOLOGIES_KEY = "__all.topologies__";
    protected static final String TOPOLOGY_SAMPLE_RATE = "__topology.sample.rate__";
    protected static final String METRIC_DATA_PREFIX = "__metric.data__";
    protected static final String METRIC_DATA_30M_COMPONENT = "__metric.data.comp__";
    protected static final String METRIC_DATA_30M_TASK = "__metric.data.task__";
    protected static final String METRIC_DATA_30M_STREAM = "__metric.data.stream__";
    protected static final String METRIC_DATA_30M_WORKER = "__metric.data.worker__";
    protected static final String METRIC_DATA_30M_NETTY = "__metric.data.netty__";
    protected static final String METRIC_DATA_30M_TOPOLOGY = "__metric.data.topology__";
    protected final StormClusterState zkCluster;

    public String getNimbusCacheClass(Map conf) {
        boolean isLinux = OSInfo.isLinux();
        boolean isMac = OSInfo.isMac();
        boolean isLocal = StormConfig.local_mode(conf);
        if (isLocal) {
            return TIMEOUT_MEM_CACHE_CLASS;
        }
        if (!isLinux && !isMac) {
            return TIMEOUT_MEM_CACHE_CLASS;
        }
        String nimbusCacheClass = ConfigExtension.getNimbusCacheClass(conf);
        if (!StringUtils.isBlank((String)nimbusCacheClass)) {
            return nimbusCacheClass;
        }
        return ROCKS_DB_CACHE_CLASS;
    }

    public JStormMetricCache(Map conf, StormClusterState zkCluster) {
        String dbCacheClass = this.getNimbusCacheClass(conf);
        LOG.info("JStorm metrics cache will use {}", (Object)dbCacheClass);
        boolean reset = ConfigExtension.getMetricCacheReset(conf);
        try {
            this.cache = (JStormCache)Utils.newInstance(dbCacheClass);
            String dbDir = StormConfig.metricDbDir(conf);
            conf.put("rocksdb.root.dir", dbDir);
            conf.put("rocksdb.reset", reset);
            this.cache.init(conf);
        }
        catch (Exception e) {
            if (!reset && this.cache != null) {
                LOG.error("Failed to init rocks db, will reset and try to re-init...");
                conf.put("rocksdb.reset", true);
                try {
                    this.cache.init(conf);
                }
                catch (Exception ex) {
                    LOG.error("Error", (Throwable)ex);
                }
            }
            LOG.error("Failed to create metrics cache!", (Throwable)e);
            throw new RuntimeException(e);
        }
        this.zkCluster = zkCluster;
    }

    public JStormCache getCache() {
        return this.cache;
    }

    public JStormCache put(String k, Object v) {
        this.cache.put(k, v);
        return this.cache;
    }

    public JStormCache putMetricData(String topologyId, TopologyMetric tpMetric) {
        HashMap<String, Object> batchData = new HashMap<String, Object>();
        long ts = System.currentTimeMillis();
        int tp = 0;
        int comp = 0;
        int task = 0;
        int stream = 0;
        int worker = 0;
        int netty = 0;
        if (tpMetric.get_componentMetric().get_metrics_size() > 0) {
            batchData.put(METRIC_DATA_30M_COMPONENT + topologyId, new Object[]{ts, tpMetric.get_componentMetric()});
            comp += tpMetric.get_componentMetric().get_metrics_size();
        }
        if (tpMetric.get_taskMetric().get_metrics_size() > 0) {
            this.tryCombineMetricInfo(METRIC_DATA_30M_TASK + topologyId, tpMetric.get_taskMetric(), MetaType.TASK, ts);
            task += tpMetric.get_taskMetric().get_metrics_size();
        }
        if (tpMetric.get_streamMetric().get_metrics_size() > 0) {
            this.tryCombineMetricInfo(METRIC_DATA_30M_STREAM + topologyId, tpMetric.get_streamMetric(), MetaType.STREAM, ts);
            stream += tpMetric.get_streamMetric().get_metrics_size();
        }
        if (tpMetric.get_workerMetric().get_metrics_size() > 0) {
            this.tryCombineMetricInfo(METRIC_DATA_30M_WORKER + topologyId, tpMetric.get_workerMetric(), MetaType.WORKER, ts);
            worker += tpMetric.get_workerMetric().get_metrics_size();
        }
        if (tpMetric.get_nettyMetric().get_metrics_size() > 0) {
            this.tryCombineMetricInfo(METRIC_DATA_30M_NETTY + topologyId, tpMetric.get_nettyMetric(), MetaType.NETTY, ts);
            netty += tpMetric.get_nettyMetric().get_metrics_size();
        }
        if (tpMetric.get_topologyMetric().get_metrics_size() > 0) {
            String keyPrefix = METRIC_DATA_30M_TOPOLOGY + topologyId + "-";
            int page = this.getRingAvailableIndex(keyPrefix);
            batchData.put(keyPrefix + page, new Object[]{ts, tpMetric.get_topologyMetric()});
            tp += tpMetric.get_topologyMetric().get_metrics_size();
        }
        LOG.info("caching metric data for topology:{},tp:{},comp:{},task:{},stream:{},worker:{},netty:{},cost:{}", new Object[]{topologyId, tp, comp, task, stream, worker, netty, System.currentTimeMillis() - ts});
        return this.putBatch(batchData);
    }

    private int getRingAvailableIndex(String keyPrefix) {
        int page = 0;
        long last_ts = 0L;
        for (int idx = 1; idx <= 30; ++idx) {
            long timestamp;
            String key = keyPrefix + idx;
            if (this.cache.get(key) == null || (timestamp = ((Long)((Object[])this.cache.get(key))[0]).longValue()) <= last_ts) continue;
            last_ts = timestamp;
            page = idx;
        }
        page = page < 30 ? ++page : 1;
        return page;
    }

    private void tryCombineMetricInfo(String key, MetricInfo incoming, MetaType metaType, long ts) {
        Object data = this.cache.get(key);
        if (data != null) {
            try {
                Object[] parts = (Object[])data;
                MetricInfo old = (MetricInfo)parts[1];
                LOG.info("combine {} metrics, old:{}, new:{}", new Object[]{metaType, old.get_metrics_size(), incoming.get_metrics_size()});
                old.get_metrics().putAll(incoming.get_metrics());
                this.cache.put(key, new Object[]{ts, old});
            }
            catch (Exception ignored) {
                this.cache.remove(key);
                this.cache.put(key, new Object[]{ts, incoming});
            }
        } else {
            this.cache.put(key, new Object[]{ts, incoming});
        }
    }

    public List<MetricInfo> getMetricData(String topologyId, MetaType metaType) {
        Object obj;
        TreeMap<Long, MetricInfo> retMap = new TreeMap<Long, MetricInfo>();
        String key = null;
        if (metaType == MetaType.COMPONENT) {
            key = METRIC_DATA_30M_COMPONENT + topologyId;
        } else if (metaType == MetaType.TASK) {
            key = METRIC_DATA_30M_TASK + topologyId;
        } else if (metaType == MetaType.STREAM) {
            key = METRIC_DATA_30M_STREAM + topologyId;
        } else if (metaType == MetaType.WORKER) {
            key = METRIC_DATA_30M_WORKER + topologyId;
        } else if (metaType == MetaType.NETTY) {
            key = METRIC_DATA_30M_NETTY + topologyId;
        } else if (metaType == MetaType.TOPOLOGY) {
            String keyPrefix = METRIC_DATA_30M_TOPOLOGY + topologyId + "-";
            for (int i = 1; i <= 30; ++i) {
                Object obj2 = this.cache.get(keyPrefix + i);
                if (obj2 == null) continue;
                Object[] objects = (Object[])obj2;
                retMap.put((Long)objects[0], (MetricInfo)objects[1]);
            }
        }
        if (key != null && (obj = this.cache.get(key)) != null) {
            Object[] objects = (Object[])obj;
            retMap.put((Long)objects[0], (MetricInfo)objects[1]);
        }
        ArrayList ret = Lists.newArrayList(retMap.values());
        int cnt = 0;
        for (MetricInfo metricInfo : ret) {
            cnt += metricInfo.get_metrics_size();
        }
        LOG.info("getMetricData, topology:{}, meta type:{}, metric info size:{}, total metric size:{}", new Object[]{topologyId, metaType, ret.size(), cnt});
        return ret;
    }

    public JStormCache putBatch(Map<String, Object> kv) {
        if (kv.size() > 0) {
            this.cache.putBatch(kv);
        }
        return this.cache;
    }

    public Object get(String k) {
        return this.cache.get(k);
    }

    public void remove(String k) {
        this.cache.remove(k);
    }

    public void removeTopology(String topologyId) {
        this.removeTopologyMeta(topologyId);
        this.removeTopologyData(topologyId);
    }

    protected void removeTopologyMeta(String topologyId) {
        this.cache.remove(METRIC_META_PREFIX + topologyId);
    }

    protected void removeTopologyData(String topologyId) {
        long start = System.currentTimeMillis();
        this.cache.remove(METRIC_DATA_PREFIX + topologyId);
        HashSet<String> metricDataKeys = new HashSet<String>();
        for (int i = 1; i <= 30; ++i) {
            String metricDataKeySuffix = topologyId + "-" + i;
            metricDataKeys.add(METRIC_DATA_30M_TOPOLOGY + metricDataKeySuffix);
        }
        metricDataKeys.add(METRIC_DATA_30M_COMPONENT + topologyId);
        metricDataKeys.add(METRIC_DATA_30M_TASK + topologyId);
        metricDataKeys.add(METRIC_DATA_30M_STREAM + topologyId);
        metricDataKeys.add(METRIC_DATA_30M_WORKER + topologyId);
        metricDataKeys.add(METRIC_DATA_30M_NETTY + topologyId);
        this.cache.removeBatch(metricDataKeys);
        LOG.info("removing metric cache of topology:{}, cost:{}", (Object)topologyId, (Object)(System.currentTimeMillis() - start));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregisterWorker(String topologyId, String host, int port) {
        String prefix = MetricUtils.workerMetricPrefix(topologyId, host, port);
        Object object = this.lock;
        synchronized (object) {
            Object data;
            Map nodes = (Map)this.cache.get(METRIC_META_PREFIX + topologyId);
            if (nodes != null) {
                Iterator keyIterator = nodes.keySet().iterator();
                while (keyIterator.hasNext()) {
                    String metricName = (String)keyIterator.next();
                    if (!(metricName = metricName.charAt(0) + metricName.substring(2, metricName.length())).startsWith(prefix)) continue;
                    keyIterator.remove();
                }
                this.cache.put(METRIC_META_PREFIX + topologyId, nodes);
            }
            if ((data = this.cache.get(METRIC_DATA_30M_WORKER + topologyId)) != null) {
                Object[] parts = (Object[])data;
                MetricInfo old = (MetricInfo)parts[1];
                Iterator<String> oldKeys = old.get_metrics().keySet().iterator();
                while (oldKeys.hasNext()) {
                    String metricName = oldKeys.next();
                    if (!(metricName = metricName.charAt(0) + metricName.substring(2, metricName.length())).startsWith(prefix)) continue;
                    oldKeys.remove();
                    LOG.info("remove dead worker metric : {}", (Object)metricName);
                }
                this.cache.put(METRIC_DATA_30M_WORKER + topologyId, data);
            }
        }
    }

    public Map<String, Long> getMeta(String topologyId) {
        return (Map)this.cache.get(METRIC_META_PREFIX + topologyId);
    }

    public void putMeta(String topologyId, Object v) {
        this.cache.put(METRIC_META_PREFIX + topologyId, v);
    }

    public void putSampleRate(String topologyId, double sampleRate) {
        this.cache.put(TOPOLOGY_SAMPLE_RATE + topologyId, sampleRate);
    }

    public void removeSampleRate(String topologyId) {
        this.cache.remove(TOPOLOGY_SAMPLE_RATE + topologyId);
    }

    public double getSampleRate(String topologyId) {
        String rate = (String)this.cache.get(TOPOLOGY_SAMPLE_RATE + topologyId);
        if (rate == null) {
            return 0.05;
        }
        return Double.parseDouble(rate);
    }

    public Map<String, Long> getSentMeta(String topologyId) {
        return (Map)this.cache.get(SENT_METRIC_META_PREFIX + topologyId);
    }

    public void putSentMeta(String topologyId, Object allMetricMeta) {
        this.cache.put(SENT_METRIC_META_PREFIX + topologyId, allMetricMeta);
    }
}

