package org.apache.hama.metrics;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.metrics.MetricsConfig;
import org.apache.hama.util.ReflectionUtils;

/* loaded from: input_file:org/apache/hama/metrics/DefaultMetricsSystem.class */
final class DefaultMetricsSystem implements MetricsSystem, Runnable {
    public static final Log LOG = LogFactory.getLog(DefaultMetricsSystem.class);
    private final String prefix;
    private final ScheduledExecutorService sched;
    private final HamaConfiguration conf;
    private final MetricsConfig config;
    private final MetricsFilter sourceFilter;
    private final long period;
    public static final String MS_NAME = "MetricsSystem";
    public static final String MS_STATS_NAME = "MetricsSystem,sub=Stats";
    public static final String MS_STATS_DESC = "Metrics system metrics";
    private final AtomicBoolean state = new AtomicBoolean(false);
    private final AtomicReference<MetricsSourceAdaptor> sysSource = new AtomicReference<>();
    private final ConcurrentMap<String, MetricsSourceAdaptor> sources = new ConcurrentHashMap();
    private final ConcurrentMap<String, MetricsSinkAdaptor> sinks = new ConcurrentHashMap();
    private final BlockingQueue<List<Pair>> queue = new LinkedBlockingQueue();

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultMetricsSystem(String str, HamaConfiguration hamaConfiguration) {
        this.prefix = str;
        this.conf = hamaConfiguration;
        this.config = MetricsConfig.create(this.conf);
        this.sourceFilter = this.config.getFilter("default.source.filter");
        this.period = hamaConfiguration.getInt("bsp.metrics.period", 3);
        this.sched = Executors.newScheduledThreadPool(hamaConfiguration.getInt("bsp.metrics.threads_pool", 1));
    }

    @Override // org.apache.hama.metrics.MetricsSystem
    public final String prefix() {
        return this.prefix;
    }

    @Override // org.apache.hama.metrics.MetricsSystem
    public final long period() {
        return this.period;
    }

    @Override // org.apache.hama.metrics.MetricsSystem
    public void start() {
        if (true == this.state.get()) {
            LOG.warn("MetricsSystem has already been started.");
            return;
        }
        configure();
        this.state.set(true);
        this.sched.scheduleAtFixedRate(this, 3L, period(), TimeUnit.SECONDS);
    }

    private void configure() {
        configureSinks();
        configureSources();
    }

    private void configureSinks() {
        for (MetricsConfig.Entry<String, String> entry : this.config.subset(prefix(), "sink")) {
            String key = entry.key();
            String value = entry.value();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Configure sinks' key " + key + " value " + value);
            }
            if (key.endsWith("class")) {
                try {
                    LOG.info("Sink name to be registered:" + value);
                    register(value, "sink registered from properties file.", (MetricsSink) ReflectionUtils.newInstance(value));
                } catch (ClassNotFoundException e) {
                    LOG.warn("Class " + value + " not found.", e);
                }
            }
        }
    }

    private void configureSources() {
        registerSystemSource();
    }

    private void registerSystemSource() {
        this.sysSource.set(new MetricsSourceAdaptor(prefix(), MS_STATS_NAME, MS_STATS_DESC, MetricsFactory.createSource((Class<?>) SystemMetrics.class, new Class[]{Map.class, Map.class, HamaConfiguration.class}, new Object[]{this.sources, this.sinks, this.conf}), period() + 1, this.conf));
        this.sysSource.get().start();
    }

    @Override // org.apache.hama.metrics.MetricsSystem
    public void stop() {
        this.state.set(false);
        stopSources();
        stopSinks();
        this.sched.shutdown();
    }

    void stopSources() {
        Iterator<MetricsSourceAdaptor> it = this.sources.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        this.sysSource.get().stop();
        this.sources.clear();
    }

    void stopSinks() {
        Iterator<MetricsSinkAdaptor> it = this.sinks.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        this.sinks.clear();
    }

    @Override // org.apache.hama.metrics.MetricsSystem
    public void register(String str, String str2, MetricsSource metricsSource) {
        if (this.sources.containsKey(str)) {
            LOG.warn("Source with name `" + str + "' is already registered.");
            return;
        }
        MetricsSourceAdaptor metricsSourceAdaptor = new MetricsSourceAdaptor(prefix(), str, str2, metricsSource, period() + 1, this.conf);
        this.sources.putIfAbsent(str, metricsSourceAdaptor);
        metricsSourceAdaptor.start();
        LOG.info("Registered source " + str + ".");
    }

    @Override // org.apache.hama.metrics.MetricsSystem
    public void register(String str, String str2, MetricsSink metricsSink) {
        if (this.sinks.containsKey(str)) {
            LOG.warn("Sink with name `" + str + "' is already registered.");
            return;
        }
        MetricsSinkAdaptor metricsSinkAdaptor = new MetricsSinkAdaptor(str, str2, metricsSink, this.queue, this.conf, this.config.getFilter(prefix() + ".source.filter"), this.config.getFilter(prefix() + ".record.filter"), this.config.getFilter(prefix() + ".metric.filter"));
        this.sinks.putIfAbsent(str, metricsSinkAdaptor);
        metricsSinkAdaptor.start();
        LOG.info("Registered sink " + str + ".");
    }

    @Override // java.lang.Runnable
    public void run() {
        if (LOG.isDebugEnabled()) {
            LOG.debug(" Sinks size " + this.sinks.size());
        }
        if (0 < this.sinks.size()) {
            publishMetrics(sample());
        }
    }

    void publishMetrics(List<Pair> list) {
        for (MetricsSinkAdaptor metricsSinkAdaptor : this.sinks.values()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(" sink adptr name " + metricsSinkAdaptor.name());
            }
            metricsSinkAdaptor.putMetrics(list);
        }
    }

    private List<Pair> sample() {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, MetricsSourceAdaptor> entry : this.sources.entrySet()) {
            if (null == this.sourceFilter || this.sourceFilter.accepts(entry.getKey())) {
                MetricsSourceAdaptor value = entry.getValue();
                arrayList.add(new Pair(value.name(), snapshot(value)));
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(" Source adaptor size " + arrayList.size());
        }
        return arrayList;
    }

    private List<MetricsRecord> snapshot(MetricsSourceAdaptor metricsSourceAdaptor) {
        ArrayList arrayList = new ArrayList();
        metricsSourceAdaptor.getMetrics(arrayList);
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String hostname() {
        try {
            return InetAddress.getLocalHost().getHostName();
        } catch (Exception e) {
            LOG.error("Error getting localhost name. hostname() would return 'localhost'", e);
            return "localhost";
        }
    }

    @Override // org.apache.hama.metrics.MetricsSystem
    public MetricsSink findSink(String str) {
        MetricsSinkAdaptor metricsSinkAdaptor = this.sinks.get(str);
        if (null != metricsSinkAdaptor) {
            return metricsSinkAdaptor.sink();
        }
        return null;
    }

    @Override // org.apache.hama.metrics.MetricsSystem
    public MetricsSource findSource(String str) {
        MetricsSourceAdaptor metricsSourceAdaptor = this.sources.get(str);
        if (null != metricsSourceAdaptor) {
            return metricsSourceAdaptor.source();
        }
        return null;
    }
}
