package com.alibaba.jstorm.daemon.nimbus;

import backtype.storm.Config;
import backtype.storm.generated.TopologyTaskHbInfo;
import backtype.storm.nimbus.ITopologyActionNotifierPlugin;
import backtype.storm.nimbus.NimbusInfo;
import backtype.storm.scheduler.INimbus;
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.BufferInputStream;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.blobstore.AtomicOutputStream;
import com.alibaba.jstorm.blobstore.BlobStore;
import com.alibaba.jstorm.blobstore.BlobStoreUtils;
import com.alibaba.jstorm.cache.JStormCache;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.cluster.StormZkClusterState;
import com.alibaba.jstorm.config.ConfigUpdateHandler;
import com.alibaba.jstorm.daemon.nimbus.metric.ClusterMetricsRunnable;
import com.alibaba.jstorm.metric.JStormMetricCache;
import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.task.TkHbCacheTime;
import com.alibaba.jstorm.utils.ExpiredCallback;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeCacheMap;
import com.alibaba.jstorm.utils.TimeUtils;
import java.io.IOException;
import java.nio.channels.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/NimbusData.class */
public class NimbusData {
    private static final Logger LOG = LoggerFactory.getLogger(NimbusData.class);
    private final Map<Object, Object> conf;
    private StormClusterState stormClusterState;
    private ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache;
    private TimeCacheMap<Object, Object> downloaders;
    private TimeCacheMap<Object, Object> uploaders;
    private TimeCacheMap<Object, Object> blobDownloaders;
    private TimeCacheMap<Object, Object> blobUploaders;
    private TimeCacheMap<Object, Object> blobListers;
    private BlobStore blobStore;
    private NimbusInfo nimbusHostPortInfo;
    private boolean isLaunchedCleaner;
    private boolean isLaunchedMonitor;
    private NimbusCache nimbusCache;
    private int startTime;
    private final ScheduledExecutorService scheduExec;
    private AtomicInteger submittedCount;
    private StatusTransition statusTransition;
    private static final int SCHEDULE_THREAD_NUM = 12;
    private final INimbus inimubs;
    private final boolean localMode;
    private volatile boolean isLeader;
    private ClusterMetricsRunnable metricRunnable;
    private TimeCacheMap<String, Object> pendingSubmitTopologies;
    private Map<String, Integer> topologyTaskTimeout;
    private Map<String, TopologyTaskHbInfo> tasksHeartbeat;
    private final JStormMetricCache metricCache;
    private final String clusterName;
    private JStormMetricsReporter metricsReporter;
    private ITopologyActionNotifierPlugin nimbusNotify;
    private final ConfigUpdateHandler configUpdateHandler;
    private AtomicBoolean isShutdown = new AtomicBoolean(false);
    private ConcurrentHashMap<String, Semaphore> topologyIdtoSem = new ConcurrentHashMap<>();

    public NimbusData(Map map, INimbus iNimbus) throws Exception {
        this.conf = map;
        createFileHandler();
        mkBlobCacheMap();
        this.nimbusHostPortInfo = NimbusInfo.fromConf(map);
        this.blobStore = BlobStoreUtils.getNimbusBlobStore(map, this.nimbusHostPortInfo);
        this.isLaunchedCleaner = false;
        this.isLaunchedMonitor = false;
        this.submittedCount = new AtomicInteger(0);
        this.stormClusterState = Cluster.mk_storm_cluster_state(map);
        createCache();
        this.taskHeartbeatsCache = new ConcurrentHashMap<>();
        this.scheduExec = Executors.newScheduledThreadPool(12);
        this.statusTransition = new StatusTransition(this);
        this.startTime = TimeUtils.current_time_secs();
        this.inimubs = iNimbus;
        this.localMode = StormConfig.local_mode(map);
        this.metricCache = new JStormMetricCache(map, this.stormClusterState);
        this.clusterName = ConfigExtension.getClusterName(map);
        this.pendingSubmitTopologies = new TimeCacheMap<>(600);
        this.topologyTaskTimeout = new ConcurrentHashMap();
        this.tasksHeartbeat = new ConcurrentHashMap();
        this.metricsReporter = new JStormMetricsReporter(this);
        this.metricRunnable = ClusterMetricsRunnable.mkInstance(this);
        this.configUpdateHandler = (ConfigUpdateHandler) Utils.newInstance(ConfigExtension.getNimbusConfigUpdateHandlerClass(map));
        if (map.containsKey(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN)) {
            this.nimbusNotify = (ITopologyActionNotifierPlugin) Utils.newInstance((String) map.get(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN));
        } else {
            this.nimbusNotify = null;
        }
    }

    public void init() {
        this.metricsReporter.init();
        this.metricRunnable.init();
        this.configUpdateHandler.init(this.conf);
        if (this.nimbusNotify != null) {
            this.nimbusNotify.prepare(this.conf);
        }
    }

    public void createFileHandler() {
        ExpiredCallback<Object, Object> expiredCallback = new ExpiredCallback<Object, Object>() { // from class: com.alibaba.jstorm.daemon.nimbus.NimbusData.1
            @Override // com.alibaba.jstorm.utils.ExpiredCallback
            public void expire(Object obj, Object obj2) {
                try {
                    NimbusData.LOG.info("Close file " + String.valueOf(obj));
                    if (obj2 != null) {
                        if (obj2 instanceof Channel) {
                            ((Channel) obj2).close();
                        } else if (obj2 instanceof BufferFileInputStream) {
                            ((BufferFileInputStream) obj2).close();
                        }
                    }
                } catch (IOException e) {
                    NimbusData.LOG.error(e.getMessage(), e);
                }
            }
        };
        int intValue = JStormUtils.parseInt(this.conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30).intValue();
        this.uploaders = new TimeCacheMap<>(intValue, expiredCallback);
        this.downloaders = new TimeCacheMap<>(intValue, expiredCallback);
    }

    public void mkBlobCacheMap() {
        ExpiredCallback<Object, Object> expiredCallback = new ExpiredCallback<Object, Object>() { // from class: com.alibaba.jstorm.daemon.nimbus.NimbusData.2
            @Override // com.alibaba.jstorm.utils.ExpiredCallback
            public void expire(Object obj, Object obj2) {
                try {
                    NimbusData.LOG.debug("Close blob file " + String.valueOf(obj));
                    if (obj2 != null) {
                        if (obj2 instanceof AtomicOutputStream) {
                            AtomicOutputStream atomicOutputStream = (AtomicOutputStream) obj2;
                            atomicOutputStream.cancel();
                            atomicOutputStream.close();
                        } else if (obj2 instanceof BufferInputStream) {
                            ((BufferInputStream) obj2).close();
                        }
                    }
                } catch (IOException e) {
                    NimbusData.LOG.error(e.getMessage(), e);
                }
            }
        };
        int intValue = JStormUtils.parseInt(this.conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30).intValue();
        this.blobUploaders = new TimeCacheMap<>(intValue, expiredCallback);
        this.blobDownloaders = new TimeCacheMap<>(intValue, expiredCallback);
        this.blobListers = new TimeCacheMap<>(intValue, (ExpiredCallback) null);
    }

    public void createCache() throws IOException {
        this.nimbusCache = new NimbusCache(this.conf, this.stormClusterState);
        ((StormZkClusterState) this.stormClusterState).setCache(this.nimbusCache.getMemCache());
    }

    public String getClusterName() {
        return this.clusterName;
    }

    public int uptime() {
        return TimeUtils.current_time_secs() - this.startTime;
    }

    public Map<Object, Object> getConf() {
        return this.conf;
    }

    public StormClusterState getStormClusterState() {
        return this.stormClusterState;
    }

    public void setStormClusterState(StormClusterState stormClusterState) {
        this.stormClusterState = stormClusterState;
    }

    public ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> getTaskHeartbeatsCache() {
        return this.taskHeartbeatsCache;
    }

    public Map<Integer, TkHbCacheTime> getTaskHeartbeatsCache(String str, boolean z) {
        Map<Integer, TkHbCacheTime> map = this.taskHeartbeatsCache.get(str);
        if (map == null && z) {
            map = new ConcurrentHashMap();
            Map<Integer, TkHbCacheTime> putIfAbsent = this.taskHeartbeatsCache.putIfAbsent(str, map);
            if (putIfAbsent != null) {
                map = putIfAbsent;
            }
        }
        return map;
    }

    public void setTaskHeartbeatsCache(ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> concurrentHashMap) {
        this.taskHeartbeatsCache = concurrentHashMap;
    }

    public TimeCacheMap<Object, Object> getDownloaders() {
        return this.downloaders;
    }

    public void setDownloaders(TimeCacheMap<Object, Object> timeCacheMap) {
        this.downloaders = timeCacheMap;
    }

    public TimeCacheMap<Object, Object> getUploaders() {
        return this.uploaders;
    }

    public void setUploaders(TimeCacheMap<Object, Object> timeCacheMap) {
        this.uploaders = timeCacheMap;
    }

    public int getStartTime() {
        return this.startTime;
    }

    public void setStartTime(int i) {
        this.startTime = i;
    }

    public AtomicInteger getSubmittedCount() {
        return this.submittedCount;
    }

    public ScheduledExecutorService getScheduExec() {
        return this.scheduExec;
    }

    public StatusTransition getStatusTransition() {
        return this.statusTransition;
    }

    public void cleanup() {
        this.nimbusCache.cleanup();
        LOG.info("Successfully shutdown Cache");
        try {
            this.stormClusterState.disconnect();
            LOG.info("Successfully shutdown ZK Cluster Instance");
        } catch (Exception e) {
        }
        try {
            this.scheduExec.shutdown();
            LOG.info("Successfully shutdown threadpool");
        } catch (Exception e2) {
        }
        this.uploaders.cleanup();
        this.downloaders.cleanup();
        this.blobUploaders.cleanup();
        this.blobDownloaders.cleanup();
        this.blobListers.cleanup();
        this.blobStore.shutdown();
    }

    public INimbus getInimubs() {
        return this.inimubs;
    }

    public boolean isLocalMode() {
        return this.localMode;
    }

    public boolean isLeader() {
        return this.isLeader;
    }

    public void setLeader(boolean z) {
        this.isLeader = z;
    }

    public AtomicBoolean getIsShutdown() {
        return this.isShutdown;
    }

    public JStormCache getMemCache() {
        return this.nimbusCache.getMemCache();
    }

    public JStormCache getDbCache() {
        return this.nimbusCache.getDbCache();
    }

    public NimbusCache getNimbusCache() {
        return this.nimbusCache;
    }

    public JStormMetricCache getMetricCache() {
        return this.metricCache;
    }

    public TimeCacheMap<String, Object> getPendingSubmitTopologies() {
        return this.pendingSubmitTopologies;
    }

    public Map<String, Integer> getTopologyTaskTimeout() {
        return this.topologyTaskTimeout;
    }

    public Map<String, TopologyTaskHbInfo> getTasksHeartbeat() {
        return this.tasksHeartbeat;
    }

    public ITopologyActionNotifierPlugin getNimbusNotify() {
        return this.nimbusNotify;
    }

    public TimeCacheMap<Object, Object> getBlobDownloaders() {
        return this.blobDownloaders;
    }

    public TimeCacheMap<Object, Object> getBlobUploaders() {
        return this.blobUploaders;
    }

    public TimeCacheMap<Object, Object> getBlobListers() {
        return this.blobListers;
    }

    public NimbusInfo getNimbusHostPortInfo() {
        return this.nimbusHostPortInfo;
    }

    public BlobStore getBlobStore() {
        return this.blobStore;
    }

    public boolean isLaunchedCleaner() {
        return this.isLaunchedCleaner;
    }

    public void setLaunchedCleaner(boolean z) {
        this.isLaunchedCleaner = z;
    }

    public boolean isLaunchedMonitor() {
        return this.isLaunchedMonitor;
    }

    public void setLaunchedMonitor(boolean z) {
        this.isLaunchedMonitor = z;
    }

    public ConcurrentHashMap<String, Semaphore> getTopologyIdtoSem() {
        return this.topologyIdtoSem;
    }
}
