package org.apache.flink.runtime.metrics;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.View;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/metrics/MetricRegistryImpl.class */
public class MetricRegistryImpl implements MetricRegistry {
    static final Logger LOG = LoggerFactory.getLogger(MetricRegistryImpl.class);
    private final Object lock;
    private final List<MetricReporter> reporters;
    private final ScheduledExecutorService executor;
    private final ScopeFormats scopeFormats;
    private final char globalDelimiter;
    private final List<Character> delimiters;
    private final CompletableFuture<Void> terminationFuture;
    private final long maximumFramesize;

    @Nullable
    private MetricQueryService queryService;

    @Nullable
    private RpcService metricQueryServiceRpcService;
    private ViewUpdater viewUpdater;
    private boolean isShutdown;

    /* loaded from: input_file:org/apache/flink/runtime/metrics/MetricRegistryImpl$ReporterTask.class */
    private static final class ReporterTask extends TimerTask {
        private final Scheduled reporter;

        private ReporterTask(Scheduled scheduled) {
            this.reporter = scheduled;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            try {
                this.reporter.report();
            } catch (Throwable th) {
                MetricRegistryImpl.LOG.warn("Error while reporting metrics", th);
            }
        }
    }

    public MetricRegistryImpl(MetricRegistryConfiguration metricRegistryConfiguration) {
        this(metricRegistryConfiguration, Collections.emptyList());
    }

    public MetricRegistryImpl(MetricRegistryConfiguration metricRegistryConfiguration, Collection<ReporterSetup> collection) {
        this.lock = new Object();
        this.maximumFramesize = metricRegistryConfiguration.getQueryServiceMessageSizeLimit();
        this.scopeFormats = metricRegistryConfiguration.getScopeFormats();
        this.globalDelimiter = metricRegistryConfiguration.getDelimiter();
        this.delimiters = new ArrayList(10);
        this.terminationFuture = new CompletableFuture<>();
        this.isShutdown = false;
        this.reporters = new ArrayList(4);
        this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));
        this.queryService = null;
        this.metricQueryServiceRpcService = null;
        if (collection.isEmpty()) {
            LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
            return;
        }
        for (ReporterSetup reporterSetup : collection) {
            String name = reporterSetup.getName();
            try {
                Optional<String> intervalSettings = reporterSetup.getIntervalSettings();
                TimeUnit timeUnit = TimeUnit.SECONDS;
                long j = 10;
                if (intervalSettings.isPresent()) {
                    try {
                        String[] split = intervalSettings.get().split(" ");
                        j = Long.parseLong(split[0]);
                        timeUnit = TimeUnit.valueOf(split[1]);
                    } catch (Exception e) {
                        LOG.error("Cannot parse report interval from config: " + intervalSettings + " - please use values like '10 SECONDS' or '500 MILLISECONDS'. Using default reporting interval.");
                    }
                }
                Scheduled reporter = reporterSetup.getReporter();
                String name2 = reporter.getClass().getName();
                if (reporter instanceof Scheduled) {
                    LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", new Object[]{Long.valueOf(j), timeUnit.name(), name, name2});
                    this.executor.scheduleWithFixedDelay(new ReporterTask(reporter), j, j, timeUnit);
                } else {
                    LOG.info("Reporting metrics for reporter {} of type {}.", name, name2);
                }
                this.reporters.add(reporter);
                String orElse = reporterSetup.getDelimiter().orElse(String.valueOf(this.globalDelimiter));
                if (orElse.length() != 1) {
                    LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", new Object[]{orElse, name, Character.valueOf(this.globalDelimiter)});
                    orElse = String.valueOf(this.globalDelimiter);
                }
                this.delimiters.add(Character.valueOf(orElse.charAt(0)));
            } catch (Throwable th) {
                LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", name, th);
            }
        }
    }

    public void startQueryService(RpcService rpcService, ResourceID resourceID) {
        synchronized (this.lock) {
            Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down.");
            try {
                this.metricQueryServiceRpcService = rpcService;
                this.queryService = MetricQueryService.createMetricQueryService(rpcService, resourceID, this.maximumFramesize);
                this.queryService.start();
            } catch (Exception e) {
                LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
            }
        }
    }

    @Nullable
    public RpcService getMetricQueryServiceRpcService() {
        return this.metricQueryServiceRpcService;
    }

    @Override // org.apache.flink.runtime.metrics.MetricRegistry
    @Nullable
    public String getMetricQueryServiceGatewayRpcAddress() {
        if (this.queryService != null) {
            return ((MetricQueryServiceGateway) this.queryService.getSelfGateway(MetricQueryServiceGateway.class)).getAddress();
        }
        return null;
    }

    @VisibleForTesting
    @Nullable
    MetricQueryServiceGateway getMetricQueryServiceGateway() {
        if (this.queryService != null) {
            return (MetricQueryServiceGateway) this.queryService.getSelfGateway(MetricQueryServiceGateway.class);
        }
        return null;
    }

    @Override // org.apache.flink.runtime.metrics.MetricRegistry
    public char getDelimiter() {
        return this.globalDelimiter;
    }

    @Override // org.apache.flink.runtime.metrics.MetricRegistry
    public char getDelimiter(int i) {
        try {
            return this.delimiters.get(i).charValue();
        } catch (IndexOutOfBoundsException e) {
            LOG.warn("Delimiter for reporter index {} not found, returning global delimiter.", Integer.valueOf(i));
            return this.globalDelimiter;
        }
    }

    @Override // org.apache.flink.runtime.metrics.MetricRegistry
    public int getNumberReporters() {
        return this.reporters.size();
    }

    @VisibleForTesting
    public List<MetricReporter> getReporters() {
        return this.reporters;
    }

    public boolean isShutdown() {
        boolean z;
        synchronized (this.lock) {
            z = this.isShutdown;
        }
        return z;
    }

    public CompletableFuture<Void> shutdown() {
        synchronized (this.lock) {
            if (this.isShutdown) {
                return this.terminationFuture;
            }
            this.isShutdown = true;
            ArrayList arrayList = new ArrayList(3);
            Time seconds = Time.seconds(1L);
            if (this.metricQueryServiceRpcService != null) {
                arrayList.add(this.metricQueryServiceRpcService.stopService());
            }
            Throwable th = null;
            Iterator<MetricReporter> it = this.reporters.iterator();
            while (it.hasNext()) {
                try {
                    it.next().close();
                } catch (Throwable th2) {
                    th = ExceptionUtils.firstOrSuppressed(th2, th);
                }
            }
            this.reporters.clear();
            if (th != null) {
                arrayList.add(FutureUtils.completedExceptionally(new FlinkException("Could not shut down the metric reporters properly.", th)));
            }
            arrayList.add(ExecutorUtils.nonBlockingShutdown(seconds.toMilliseconds(), TimeUnit.MILLISECONDS, new ExecutorService[]{this.executor}));
            FutureUtils.completeAll(arrayList).whenComplete((r4, th3) -> {
                if (th3 != null) {
                    this.terminationFuture.completeExceptionally(th3);
                } else {
                    this.terminationFuture.complete(null);
                }
            });
            return this.terminationFuture;
        }
    }

    @Override // org.apache.flink.runtime.metrics.MetricRegistry
    public ScopeFormats getScopeFormats() {
        return this.scopeFormats;
    }

    @Override // org.apache.flink.runtime.metrics.MetricRegistry
    public void register(Metric metric, String str, AbstractMetricGroup abstractMetricGroup) {
        synchronized (this.lock) {
            if (isShutdown()) {
                LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down.");
            } else {
                if (this.reporters != null) {
                    for (int i = 0; i < this.reporters.size(); i++) {
                        MetricReporter metricReporter = this.reporters.get(i);
                        if (metricReporter != null) {
                            try {
                                metricReporter.notifyOfAddedMetric(metric, str, new FrontMetricGroup(i, abstractMetricGroup));
                            } catch (Exception e) {
                                LOG.warn("Error while registering metric.", e);
                            }
                        }
                    }
                }
                try {
                    if (this.queryService != null) {
                        this.queryService.addMetric(str, metric, abstractMetricGroup);
                    }
                } catch (Exception e2) {
                    LOG.warn("Error while registering metric.", e2);
                }
                try {
                    if (metric instanceof View) {
                        if (this.viewUpdater == null) {
                            this.viewUpdater = new ViewUpdater(this.executor);
                        }
                        this.viewUpdater.notifyOfAddedView((View) metric);
                    }
                } catch (Exception e3) {
                    LOG.warn("Error while registering metric.", e3);
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.metrics.MetricRegistry
    public void unregister(Metric metric, String str, AbstractMetricGroup abstractMetricGroup) {
        synchronized (this.lock) {
            if (isShutdown()) {
                LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down.");
            } else {
                if (this.reporters != null) {
                    for (int i = 0; i < this.reporters.size(); i++) {
                        try {
                            MetricReporter metricReporter = this.reporters.get(i);
                            if (metricReporter != null) {
                                metricReporter.notifyOfRemovedMetric(metric, str, new FrontMetricGroup(i, abstractMetricGroup));
                            }
                        } catch (Exception e) {
                            LOG.warn("Error while registering metric.", e);
                        }
                    }
                }
                try {
                    if (this.queryService != null) {
                        this.queryService.removeMetric(metric);
                    }
                } catch (Exception e2) {
                    LOG.warn("Error while registering metric.", e2);
                }
                try {
                    if ((metric instanceof View) && this.viewUpdater != null) {
                        this.viewUpdater.notifyOfRemovedView((View) metric);
                    }
                } catch (Exception e3) {
                    LOG.warn("Error while registering metric.", e3);
                }
            }
        }
    }

    @VisibleForTesting
    @Nullable
    MetricQueryService getQueryService() {
        return this.queryService;
    }
}
