/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.nimbus;

import backtype.storm.generated.TopologyTaskHbInfo;
import backtype.storm.nimbus.ITopologyActionNotifierPlugin;
import backtype.storm.scheduler.INimbus;
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.TimeCacheMap;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.cache.JStormCache;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.cluster.StormZkClusterState;
import com.alibaba.jstorm.daemon.nimbus.NimbusCache;
import com.alibaba.jstorm.daemon.nimbus.StatusTransition;
import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable;
import com.alibaba.jstorm.metric.JStormMetricCache;
import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.task.TkHbCacheTime;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import java.io.IOException;
import java.nio.channels.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NimbusData {
    private static final Logger LOG = LoggerFactory.getLogger(NimbusData.class);
    private Map<Object, Object> conf;
    private StormClusterState stormClusterState;
    private ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache;
    private TimeCacheMap<Object, Object> downloaders;
    private TimeCacheMap<Object, Object> uploaders;
    private NimbusCache nimbusCache;
    private int startTime;
    private final ScheduledExecutorService scheduExec;
    private AtomicInteger submittedCount;
    private StatusTransition statusTransition;
    private static final int SCHEDULE_THREAD_NUM = 8;
    private final INimbus inimubs;
    private final boolean localMode;
    private volatile boolean isLeader;
    private AtomicBoolean isShutdown = new AtomicBoolean(false);
    private TopologyMetricsRunnable metricRunnable;
    private AsyncLoopThread metricLoopThread;
    private TimeCacheMap<String, Object> pendingSubmitTopologys;
    private Map<String, Integer> topologyTaskTimeout;
    private Map<String, TopologyTaskHbInfo> tasksHeartbeat;
    private final JStormMetricCache metricCache;
    private final String clusterName;
    private JStormMetricsReporter metricsReporter;
    private ITopologyActionNotifierPlugin nimbusNotify;

    public NimbusData(Map conf, INimbus inimbus) throws Exception {
        this.conf = conf;
        this.createFileHandler();
        this.submittedCount = new AtomicInteger(0);
        this.stormClusterState = Cluster.mk_storm_cluster_state(conf);
        this.createCache();
        this.taskHeartbeatsCache = new ConcurrentHashMap();
        this.scheduExec = Executors.newScheduledThreadPool(8);
        this.statusTransition = new StatusTransition(this);
        this.startTime = TimeUtils.current_time_secs();
        this.inimubs = inimbus;
        this.localMode = StormConfig.local_mode(conf);
        this.metricCache = new JStormMetricCache(conf, this.stormClusterState);
        this.clusterName = ConfigExtension.getClusterName(conf);
        this.metricRunnable = new TopologyMetricsRunnable(this);
        this.metricRunnable.init();
        this.pendingSubmitTopologys = new TimeCacheMap(1800);
        this.topologyTaskTimeout = new ConcurrentHashMap<String, Integer>();
        this.tasksHeartbeat = new ConcurrentHashMap<String, TopologyTaskHbInfo>();
        if (!this.localMode) {
            this.startMetricThreads();
        }
        if (conf.containsKey("nimbus.topology.action.notifier.plugin.class")) {
            String string = (String)conf.get("nimbus.topology.action.notifier.plugin.class");
            this.nimbusNotify = (ITopologyActionNotifierPlugin)Utils.newInstance(string);
            this.nimbusNotify.prepare(conf);
        } else {
            this.nimbusNotify = null;
        }
    }

    public void startMetricThreads() {
        this.metricRunnable.start();
        this.metricsReporter = new JStormMetricsReporter(this);
        this.metricsReporter.init();
    }

    public void createFileHandler() {
        TimeCacheMap.ExpiredCallback<Object, Object> expiredCallback = new TimeCacheMap.ExpiredCallback<Object, Object>(){

            @Override
            public void expire(Object key, Object val) {
                try {
                    LOG.info("Close file " + String.valueOf(key));
                    if (val != null) {
                        if (val instanceof Channel) {
                            Channel channel = (Channel)val;
                            channel.close();
                        } else if (val instanceof BufferFileInputStream) {
                            BufferFileInputStream is = (BufferFileInputStream)val;
                            is.close();
                        }
                    }
                }
                catch (IOException e) {
                    LOG.error(e.getMessage(), (Throwable)e);
                }
            }
        };
        int file_copy_expiration_secs = JStormUtils.parseInt(this.conf.get("nimbus.file.copy.expiration.secs"), 30);
        this.uploaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback);
        this.downloaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback);
    }

    public void createCache() throws IOException {
        this.nimbusCache = new NimbusCache(this.conf, this.stormClusterState);
        ((StormZkClusterState)this.stormClusterState).setCache(this.nimbusCache.getMemCache());
    }

    public String getClusterName() {
        return this.clusterName;
    }

    public int uptime() {
        return TimeUtils.current_time_secs() - this.startTime;
    }

    public Map<Object, Object> getConf() {
        return this.conf;
    }

    public void setConf(Map<Object, Object> conf) {
        this.conf = conf;
    }

    public StormClusterState getStormClusterState() {
        return this.stormClusterState;
    }

    public void setStormClusterState(StormClusterState stormClusterState) {
        this.stormClusterState = stormClusterState;
    }

    public ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> getTaskHeartbeatsCache() {
        return this.taskHeartbeatsCache;
    }

    public Map<Integer, TkHbCacheTime> getTaskHeartbeatsCache(String topologyId, boolean createIfNotExist) {
        Map<Integer, TkHbCacheTime> ret = null;
        ret = this.taskHeartbeatsCache.get(topologyId);
        if (ret == null && createIfNotExist) {
            ret = new ConcurrentHashMap<Integer, TkHbCacheTime>();
            this.taskHeartbeatsCache.put(topologyId, ret);
        }
        return ret;
    }

    public void setTaskHeartbeatsCache(ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache) {
        this.taskHeartbeatsCache = taskHeartbeatsCache;
    }

    public TimeCacheMap<Object, Object> getDownloaders() {
        return this.downloaders;
    }

    public void setDownloaders(TimeCacheMap<Object, Object> downloaders) {
        this.downloaders = downloaders;
    }

    public TimeCacheMap<Object, Object> getUploaders() {
        return this.uploaders;
    }

    public void setUploaders(TimeCacheMap<Object, Object> uploaders) {
        this.uploaders = uploaders;
    }

    public int getStartTime() {
        return this.startTime;
    }

    public void setStartTime(int startTime) {
        this.startTime = startTime;
    }

    public AtomicInteger getSubmittedCount() {
        return this.submittedCount;
    }

    public ScheduledExecutorService getScheduExec() {
        return this.scheduExec;
    }

    public StatusTransition getStatusTransition() {
        return this.statusTransition;
    }

    public void cleanup() {
        this.nimbusCache.cleanup();
        LOG.info("Successfully shutdown Cache");
        try {
            this.stormClusterState.disconnect();
            LOG.info("Successfully shutdown ZK Cluster Instance");
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            this.scheduExec.shutdown();
            LOG.info("Successfully shutdown threadpool");
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.uploaders.cleanup();
        this.downloaders.cleanup();
    }

    public INimbus getInimubs() {
        return this.inimubs;
    }

    public boolean isLocalMode() {
        return this.localMode;
    }

    public boolean isLeader() {
        return this.isLeader;
    }

    public void setLeader(boolean isLeader) {
        this.isLeader = isLeader;
    }

    public AtomicBoolean getIsShutdown() {
        return this.isShutdown;
    }

    public JStormCache getMemCache() {
        return this.nimbusCache.getMemCache();
    }

    public JStormCache getDbCache() {
        return this.nimbusCache.getDbCache();
    }

    public NimbusCache getNimbusCache() {
        return this.nimbusCache;
    }

    public JStormMetricCache getMetricCache() {
        return this.metricCache;
    }

    public final TopologyMetricsRunnable getMetricRunnable() {
        return this.metricRunnable;
    }

    public TimeCacheMap<String, Object> getPendingSubmitTopoloygs() {
        return this.pendingSubmitTopologys;
    }

    public Map<String, Integer> getTopologyTaskTimeout() {
        return this.topologyTaskTimeout;
    }

    public Map<String, TopologyTaskHbInfo> getTasksHeartbeat() {
        return this.tasksHeartbeat;
    }

    public ITopologyActionNotifierPlugin getNimbusNotify() {
        return this.nimbusNotify;
    }
}

