/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.core.metrics;

import java.io.Serializable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.NoOpMetricsSink;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.metrics.MetricsSink;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder;

@Experimental(value=Experimental.Kind.METRICS)
public class MetricsPusher
implements Serializable {
    private MetricsSink metricsSink;
    private long period;
    @Nullable
    private transient ScheduledFuture<?> scheduledFuture;
    private transient PipelineResult pipelineResult;
    private MetricsContainerStepMap metricsContainerStepMap;

    public MetricsPusher(MetricsContainerStepMap metricsContainerStepMap, MetricsOptions pipelineOptions, PipelineResult pipelineResult) {
        this.metricsContainerStepMap = metricsContainerStepMap;
        this.pipelineResult = pipelineResult;
        this.period = pipelineOptions.getMetricsPushPeriod();
        this.metricsSink = InstanceBuilder.ofType(MetricsSink.class).fromClass(pipelineOptions.getMetricsSink()).withArg(MetricsOptions.class, pipelineOptions).build();
    }

    public void start() {
        if (!(this.metricsSink instanceof NoOpMetricsSink)) {
            ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MetricsPusher-thread").build());
            this.scheduledFuture = scheduler.scheduleAtFixedRate(this::run, 0L, this.period, TimeUnit.SECONDS);
        }
    }

    private void tearDown() {
        this.pushMetrics();
        if (!this.scheduledFuture.isCancelled()) {
            this.scheduledFuture.cancel(true);
        }
    }

    private void run() {
        PipelineResult.State pipelineState;
        this.pushMetrics();
        if (this.pipelineResult != null && (pipelineState = this.pipelineResult.getState()).isTerminal()) {
            this.tearDown();
        }
    }

    private void pushMetrics() {
        if (!(this.metricsSink instanceof NoOpMetricsSink)) {
            try {
                MetricResults metricResults = MetricsContainerStepMap.asAttemptedOnlyMetricResults(this.metricsContainerStepMap);
                MetricQueryResults metricQueryResults = metricResults.allMetrics();
                if (Iterables.size(metricQueryResults.getDistributions()) != 0 || Iterables.size(metricQueryResults.getGauges()) != 0 || Iterables.size(metricQueryResults.getCounters()) != 0) {
                    this.metricsSink.writeMetrics(metricQueryResults);
                }
            }
            catch (Exception e) {
                MetricsPushException metricsPushException = new MetricsPushException(e);
                metricsPushException.printStackTrace();
            }
        }
    }

    public static class MetricsPushException
    extends Exception {
        MetricsPushException(Throwable cause) {
            super(cause);
        }
    }
}

