package org.apache.samza.metrics.reporter;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.environment.EnvironmentVariables;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistryWithSource;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.metrics.MetricsVisitor;
import org.apache.samza.metrics.ReadableMetricsRegistry;
import org.apache.samza.metrics.Timer;
import org.apache.samza.serializers.Serializer;
import org.apache.samza.storage.blobstore.index.DirIndex;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/metrics/reporter/MetricsSnapshotReporter.class */
public class MetricsSnapshotReporter implements MetricsReporter, Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(MetricsSnapshotReporter.class);
    private final SystemProducer producer;
    private final SystemStream out;
    private final Duration reportingInterval;
    private final String jobName;
    private final String jobId;
    private final String containerName;
    private final String version;
    private final String samzaVersion;
    private final String host;
    private final Serializer<MetricsSnapshot> serializer;
    private final Optional<Pattern> blacklist;
    private final Clock clock;
    private final long resetTime;
    private final List<MetricsRegistryWithSource> registries = new ArrayList();
    private final Set<String> blacklistedMetrics = new HashSet();
    private final String executionEnvContainerId = (String) Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID)).orElse(DirIndex.ROOT_DIR_NAME);
    private final String samzaEpochId = (String) Optional.ofNullable(System.getenv(EnvironmentVariables.SAMZA_EPOCH_ID)).orElse(DirIndex.ROOT_DIR_NAME);
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Samza MetricsSnapshotReporter Thread-%d").setDaemon(true).build());

    public MetricsSnapshotReporter(SystemProducer systemProducer, SystemStream systemStream, Duration duration, String str, String str2, String str3, String str4, String str5, String str6, Serializer<MetricsSnapshot> serializer, Optional<Pattern> optional, Clock clock) {
        this.producer = systemProducer;
        this.out = systemStream;
        this.reportingInterval = duration;
        this.jobName = str;
        this.jobId = str2;
        this.containerName = str3;
        this.version = str4;
        this.samzaVersion = str5;
        this.host = str6;
        this.serializer = serializer;
        this.blacklist = optional;
        this.clock = clock;
        this.resetTime = this.clock.currentTimeMillis();
        LOG.info("got metrics snapshot reporter properties [job name: {}, job id: {}, containerName: {}, version: {}, samzaVersion: {}, host: {}, reportingInterval {}]", new Object[]{str, str2, str3, str4, str5, str6, duration});
    }

    public void start() {
        LOG.info("Starting producer.");
        this.producer.start();
        LOG.info("Starting reporter timer.");
        this.executor.scheduleWithFixedDelay(this, 0L, this.reportingInterval.getSeconds(), TimeUnit.SECONDS);
    }

    public void register(String str, ReadableMetricsRegistry readableMetricsRegistry) {
        this.registries.add(new MetricsRegistryWithSource(str, readableMetricsRegistry));
        LOG.info("Registering {} with producer.", str);
        this.producer.register(str);
    }

    public void stop() {
        this.executor.schedule(this, 0L, TimeUnit.SECONDS);
        LOG.info("Stopping reporter timer.");
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(60L, TimeUnit.SECONDS);
            LOG.info("Stopping producer.");
            this.producer.stop();
            if (this.executor.isTerminated()) {
                return;
            }
            LOG.warn("Unable to shutdown reporter timer.");
        } catch (InterruptedException e) {
            throw new SamzaException(e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            innerRun();
        } catch (Exception e) {
            LOG.warn("Error while reporting metrics. Will retry in " + this.reportingInterval + " seconds.", e);
        }
    }

    public void innerRun() {
        LOG.debug("Begin flushing metrics.");
        for (MetricsRegistryWithSource metricsRegistryWithSource : this.registries) {
            String source = metricsRegistryWithSource.getSource();
            ReadableMetricsRegistry registry = metricsRegistryWithSource.getRegistry();
            LOG.debug("Flushing metrics for {}.", source);
            HashMap hashMap = new HashMap();
            registry.getGroups().forEach(str -> {
                HashMap hashMap2 = new HashMap();
                registry.getGroup(str).forEach((str, metric) -> {
                    if (shouldIgnore(str, str)) {
                        return;
                    }
                    metric.visit(new MetricsVisitor() { // from class: org.apache.samza.metrics.reporter.MetricsSnapshotReporter.1
                        public void counter(Counter counter) {
                            hashMap2.put(str, Long.valueOf(counter.getCount()));
                        }

                        public <T> void gauge(Gauge<T> gauge) {
                            hashMap2.put(str, gauge.getValue());
                        }

                        public void timer(Timer timer) {
                            hashMap2.put(str, Double.valueOf(timer.getSnapshot().getAverage()));
                        }
                    });
                });
                if (hashMap2.isEmpty()) {
                    return;
                }
                hashMap.put(str, hashMap2);
            });
            if (!hashMap.isEmpty()) {
                MetricsHeader metricsHeader = new MetricsHeader(this.jobName, this.jobId, this.containerName, this.executionEnvContainerId, Optional.of(this.samzaEpochId), source, this.version, this.samzaVersion, this.host, this.clock.currentTimeMillis(), this.resetTime);
                Metrics metrics = new Metrics(hashMap);
                LOG.debug("Flushing metrics for {} to {} with header and map: header={}, map={}.", new Object[]{source, this.out, metricsHeader.getAsMap(), metrics.getAsMap()});
                MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, metrics);
                try {
                    this.producer.send(source, new OutgoingMessageEnvelope(this.out, this.host, (Object) null, this.serializer != null ? this.serializer.toBytes(metricsSnapshot) : metricsSnapshot));
                    this.producer.flush(source);
                } catch (Exception e) {
                    LOG.error(String.format("Exception when flushing metrics for source %s", source), e);
                }
            }
        }
        LOG.debug("Finished flushing metrics.");
    }

    protected boolean shouldIgnore(String str, String str2) {
        boolean isPresent = this.blacklist.isPresent();
        String str3 = str + "." + str2;
        if (isPresent && !this.blacklistedMetrics.contains(str3)) {
            if (this.blacklist.get().matcher(str3).matches()) {
                this.blacklistedMetrics.add(str3);
                LOG.debug("Samza diagnostics: blacklisted metric {} because it matched blacklist regex: {}", str3, this.blacklist.get());
            } else {
                isPresent = false;
            }
        }
        return isPresent;
    }
}
