package org.apache.flink.runtime.webmonitor.metrics;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.OnFailure;
import akka.dispatch.OnSuccess;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.class */
public class MetricFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
    private final ActorSystem actorSystem;
    private final JobManagerRetriever retriever;
    private final ExecutionContext ctx;
    private final FiniteDuration timeout = new FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(), TimeUnit.MILLISECONDS);
    private MetricStore metrics = new MetricStore();
    private MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer();
    private long lastUpdateTime;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/metrics/MetricFetcher$BasicGateway.class */
    public static class BasicGateway {
        private final ActorRef actor;

        private BasicGateway(ActorRef actorRef) {
            this.actor = actorRef;
        }

        public Future<Object> ask(Object obj, FiniteDuration finiteDuration) {
            return Patterns.ask(this.actor, obj, new Timeout(finiteDuration));
        }
    }

    public MetricFetcher(ActorSystem actorSystem, JobManagerRetriever jobManagerRetriever, ExecutionContext executionContext) {
        this.actorSystem = (ActorSystem) Preconditions.checkNotNull(actorSystem);
        this.retriever = (JobManagerRetriever) Preconditions.checkNotNull(jobManagerRetriever);
        this.ctx = (ExecutionContext) Preconditions.checkNotNull(executionContext);
    }

    public MetricStore getMetricStore() {
        return this.metrics;
    }

    public void update() {
        synchronized (this) {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastUpdateTime > 10000) {
                this.lastUpdateTime = currentTimeMillis;
                fetchMetrics();
            }
        }
    }

    private void fetchMetrics() {
        try {
            Option<Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = this.retriever.getJobManagerGatewayAndWebPort();
            if (jobManagerGatewayAndWebPort.isDefined()) {
                ActorGateway actorGateway = (ActorGateway) ((Tuple2) jobManagerGatewayAndWebPort.get())._1();
                Future<Object> ask = actorGateway.ask(new RequestJobDetails(true, true), this.timeout);
                ask.onSuccess(new OnSuccess<Object>() { // from class: org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.1
                    public void onSuccess(Object obj) throws Throwable {
                        MultipleJobsDetails multipleJobsDetails = (MultipleJobsDetails) obj;
                        ArrayList arrayList = new ArrayList();
                        for (JobDetails jobDetails : multipleJobsDetails.getRunningJobs()) {
                            arrayList.add(jobDetails.getJobId().toString());
                        }
                        for (JobDetails jobDetails2 : multipleJobsDetails.getFinishedJobs()) {
                            arrayList.add(jobDetails2.getJobId().toString());
                        }
                        synchronized (MetricFetcher.this.metrics) {
                            MetricFetcher.this.metrics.jobs.keySet().retainAll(arrayList);
                        }
                    }
                }, this.ctx);
                logErrorOnFailure(ask, "Fetching of JobDetails failed.");
                String path = actorGateway.path();
                queryMetrics(this.actorSystem.actorFor(path.substring(0, path.lastIndexOf(47) + 1) + "MetricQueryService"));
                Future<Object> ask2 = actorGateway.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), this.timeout);
                ask2.onSuccess(new OnSuccess<Object>() { // from class: org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.2
                    public void onSuccess(Object obj) throws Throwable {
                        Iterable<Instance> asJavaIterable = ((JobManagerMessages.RegisteredTaskManagers) obj).asJavaIterable();
                        ArrayList arrayList = new ArrayList();
                        for (Instance instance : asJavaIterable) {
                            arrayList.add(instance.getId().toString());
                            String address = instance.getTaskManagerGateway().getAddress();
                            MetricFetcher.this.queryMetrics(MetricFetcher.this.actorSystem.actorFor(address.substring(0, address.lastIndexOf(47) + 1) + "MetricQueryService_" + instance.getTaskManagerID().getResourceIdString()));
                        }
                        synchronized (MetricFetcher.this.metrics) {
                            MetricFetcher.this.metrics.taskManagers.keySet().retainAll(arrayList);
                        }
                    }
                }, this.ctx);
                logErrorOnFailure(ask2, "Fetchin list of registered TaskManagers failed.");
            }
        } catch (Exception e) {
            LOG.warn("Exception while fetching metrics.", e);
        }
    }

    private void logErrorOnFailure(Future<Object> future, final String str) {
        future.onFailure(new OnFailure() { // from class: org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.3
            public void onFailure(Throwable th) throws Throwable {
                MetricFetcher.LOG.debug(str, th);
            }
        }, this.ctx);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void queryMetrics(ActorRef actorRef) {
        Future<Object> ask = new BasicGateway(actorRef).ask(MetricQueryService.getCreateDump(), this.timeout);
        ask.onSuccess(new OnSuccess<Object>() { // from class: org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.4
            public void onSuccess(Object obj) throws Throwable {
                MetricFetcher.this.addMetrics(obj);
            }
        }, this.ctx);
        logErrorOnFailure(ask, "Fetching metrics failed.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addMetrics(Object obj) {
        Iterator it = this.deserializer.deserialize((MetricDumpSerialization.MetricSerializationResult) obj).iterator();
        while (it.hasNext()) {
            this.metrics.add((MetricDump) it.next());
        }
    }
}
