package org.apache.samza.container;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.util.MetricsReporterLoader;
import org.apache.samza.util.ThreadUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/container/ContainerHeartbeatMonitor.class */
public class ContainerHeartbeatMonitor {
    private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
    private static final ThreadFactory THREAD_FACTORY = new HeartbeatThreadFactory();
    private static final CoordinatorStreamValueSerde SERDE = new CoordinatorStreamValueSerde("set-config");
    private static final String SOURCE_NAME = "SamzaContainer";

    @VisibleForTesting
    static final int SCHEDULE_MS = 60000;

    @VisibleForTesting
    static final int SHUTDOWN_TIMOUT_MS = 120000;
    private final Runnable onContainerExpired;
    private final ScheduledExecutorService scheduler;
    private final String containerExecutionId;
    private final MetadataStore coordinatorStreamStore;
    private final long sleepDurationForReconnectWithAM;
    private final boolean isApplicationMasterHighAvailabilityEnabled;
    private final long retryCount;
    private ContainerHeartbeatClient containerHeartbeatClient;
    private ContainerHeartbeatMetrics metrics;
    private Map<String, MetricsReporter> reporters;
    private String coordinatorUrl;
    private boolean started;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/samza/container/ContainerHeartbeatMonitor$ContainerHeartbeatMetrics.class */
    public static final class ContainerHeartbeatMetrics {
        private static final String GROUP = "ContainerHeartbeatMonitor";
        private static final String HEARTBEAT_DISCOVERY_TIME_MS = "heartbeat-discovery-time-ms";
        private static final String HEARTBEAT_ESTABLISHED_FAILURE_COUNT = "heartbeat-established-failure-count";
        private static final String HEARTBEAT_ESTABLISHED_WITH_NEW_AM_COUNT = "heartbeat-established-with-new-am-count";
        private static final String HEARTBEAT_EXPIRED_COUNT = "heartbeat-expired-count";
        private final Counter heartbeatEstablishedFailureCount;
        private final Counter heartbeatEstablishedWithNewAmCount;
        private final Counter heartbeatExpiredCount;
        private final Gauge<Long> heartbeatDiscoveryTime;

        public ContainerHeartbeatMetrics(MetricsRegistry metricsRegistry) {
            this.heartbeatEstablishedFailureCount = metricsRegistry.newCounter(GROUP, HEARTBEAT_ESTABLISHED_FAILURE_COUNT);
            this.heartbeatEstablishedWithNewAmCount = metricsRegistry.newCounter(GROUP, HEARTBEAT_ESTABLISHED_WITH_NEW_AM_COUNT);
            this.heartbeatExpiredCount = metricsRegistry.newCounter(GROUP, HEARTBEAT_EXPIRED_COUNT);
            this.heartbeatDiscoveryTime = metricsRegistry.newGauge(GROUP, HEARTBEAT_DISCOVERY_TIME_MS, 0L);
        }

        @VisibleForTesting
        Counter getHeartbeatEstablishedFailureCount() {
            return this.heartbeatEstablishedFailureCount;
        }

        @VisibleForTesting
        Counter getHeartbeatEstablishedWithNewAmCount() {
            return this.heartbeatEstablishedWithNewAmCount;
        }

        @VisibleForTesting
        Counter getHeartbeatExpiredCount() {
            return this.heartbeatExpiredCount;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void incrementHeartbeatEstablishedFailureCount() {
            this.heartbeatEstablishedFailureCount.inc();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void incrementHeartbeatEstablishedWithNewAmCount() {
            this.heartbeatEstablishedWithNewAmCount.inc();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void incrementHeartbeatExpiredCount() {
            this.heartbeatExpiredCount.inc();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setHeartbeatDiscoveryTime(long j) {
            this.heartbeatDiscoveryTime.set(Long.valueOf(j));
        }
    }

    /* loaded from: input_file:org/apache/samza/container/ContainerHeartbeatMonitor$HeartbeatThreadFactory.class */
    private static class HeartbeatThreadFactory implements ThreadFactory {
        private static final String PREFIX = "Samza-" + ContainerHeartbeatMonitor.class.getSimpleName() + "-";
        private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();

        private HeartbeatThreadFactory() {
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        }
    }

    public ContainerHeartbeatMonitor(Runnable runnable, String str, String str2, MetadataStore metadataStore, Config config) {
        this(runnable, new ContainerHeartbeatClient(str, str2), Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY), str, str2, metadataStore, config);
    }

    @VisibleForTesting
    ContainerHeartbeatMonitor(Runnable runnable, ContainerHeartbeatClient containerHeartbeatClient, ScheduledExecutorService scheduledExecutorService, String str, String str2, MetadataStore metadataStore, Config config) {
        this.started = false;
        this.onContainerExpired = runnable;
        this.containerHeartbeatClient = containerHeartbeatClient;
        this.scheduler = scheduledExecutorService;
        this.coordinatorUrl = str;
        this.containerExecutionId = str2;
        this.coordinatorStreamStore = metadataStore;
        JobConfig jobConfig = new JobConfig(config);
        this.isApplicationMasterHighAvailabilityEnabled = jobConfig.getApplicationMasterHighAvailabilityEnabled();
        this.retryCount = jobConfig.getContainerHeartbeatRetryCount();
        this.sleepDurationForReconnectWithAM = jobConfig.getContainerHeartbeatRetrySleepDurationMs();
        initializeMetrics(config);
    }

    public void start() {
        if (this.started) {
            LOG.warn("Skipping attempt to start an already started ContainerHeartbeatMonitor.");
            return;
        }
        LOG.info("Starting ContainerHeartbeatMonitor");
        this.scheduler.scheduleAtFixedRate(() -> {
            if (this.containerHeartbeatClient.requestHeartbeat().isAlive()) {
                return;
            }
            this.metrics.incrementHeartbeatExpiredCount();
            if (this.isApplicationMasterHighAvailabilityEnabled) {
                LOG.warn("Failed to establish connection with {}. Checking for new AM", this.coordinatorUrl);
                try {
                    if (checkAndEstablishConnectionWithNewAM()) {
                        return;
                    }
                } catch (Exception e) {
                    LOG.error("Exception trying to connect with new AM", e);
                    this.metrics.incrementHeartbeatEstablishedFailureCount();
                    forceExit("failure in establishing connection with new AM", SHUTDOWN_TIMOUT_MS);
                    return;
                }
            }
            forceExit("Graceful shutdown timeout expired. Force exiting.", SHUTDOWN_TIMOUT_MS);
        }, 0L, JobConfig.DRAIN_MONITOR_POLL_INTERVAL_MILLIS_DEFAULT, TimeUnit.MILLISECONDS);
        this.started = true;
    }

    public void stop() {
        if (this.started) {
            LOG.info("Stopping ContainerHeartbeatMonitor");
            this.scheduler.shutdown();
            this.reporters.values().forEach((v0) -> {
                v0.stop();
            });
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:11:0x0050, code lost:
    
        org.apache.samza.container.ContainerHeartbeatMonitor.LOG.info("Found new AM: {}. Establishing heartbeat with the new AM.", r0);
        r6.coordinatorUrl = r0;
        r6.containerHeartbeatClient = createContainerHeartbeatClient(r6.coordinatorUrl, r6.containerExecutionId);
        r7 = r6.containerHeartbeatClient.requestHeartbeat().isAlive();
        org.apache.samza.container.ContainerHeartbeatMonitor.LOG.info("Received heartbeat response: {} from new AM: {}", java.lang.Boolean.valueOf(r7), r6.coordinatorUrl);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private boolean checkAndEstablishConnectionWithNewAM() {
        /*
            r6 = this;
            r0 = 0
            r7 = r0
            r0 = 1
            r8 = r0
            long r0 = java.lang.System.currentTimeMillis()
            r9 = r0
        L8:
            r0 = r8
            long r0 = (long) r0
            r1 = r6
            long r1 = r1.retryCount
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 > 0) goto Lb1
            org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde r0 = org.apache.samza.container.ContainerHeartbeatMonitor.SERDE
            r1 = r6
            org.apache.samza.metadatastore.MetadataStore r1 = r1.coordinatorStreamStore
            java.lang.String r2 = "yarn.am.tracking.url"
            byte[] r1 = r1.get(r2)
            java.lang.String r0 = r0.m96fromBytes(r1)
            r11 = r0
            r0 = r6
            java.lang.String r0 = r0.coordinatorUrl     // Catch: java.lang.InterruptedException -> L95
            r1 = r11
            boolean r0 = r0.equals(r1)     // Catch: java.lang.InterruptedException -> L95
            if (r0 == 0) goto L50
            org.slf4j.Logger r0 = org.apache.samza.container.ContainerHeartbeatMonitor.LOG     // Catch: java.lang.InterruptedException -> L95
            java.lang.String r1 = "Attempt {} to discover new AM. Sleep for {}ms before next attempt."
            r2 = r8
            java.lang.Integer r2 = java.lang.Integer.valueOf(r2)     // Catch: java.lang.InterruptedException -> L95
            r3 = r6
            long r3 = r3.sleepDurationForReconnectWithAM     // Catch: java.lang.InterruptedException -> L95
            java.lang.Long r3 = java.lang.Long.valueOf(r3)     // Catch: java.lang.InterruptedException -> L95
            r0.info(r1, r2, r3)     // Catch: java.lang.InterruptedException -> L95
            r0 = r6
            long r0 = r0.sleepDurationForReconnectWithAM     // Catch: java.lang.InterruptedException -> L95
            java.lang.Thread.sleep(r0)     // Catch: java.lang.InterruptedException -> L95
            goto L92
        L50:
            org.slf4j.Logger r0 = org.apache.samza.container.ContainerHeartbeatMonitor.LOG     // Catch: java.lang.InterruptedException -> L95
            java.lang.String r1 = "Found new AM: {}. Establishing heartbeat with the new AM."
            r2 = r11
            r0.info(r1, r2)     // Catch: java.lang.InterruptedException -> L95
            r0 = r6
            r1 = r11
            r0.coordinatorUrl = r1     // Catch: java.lang.InterruptedException -> L95
            r0 = r6
            r1 = r6
            r2 = r6
            java.lang.String r2 = r2.coordinatorUrl     // Catch: java.lang.InterruptedException -> L95
            r3 = r6
            java.lang.String r3 = r3.containerExecutionId     // Catch: java.lang.InterruptedException -> L95
            org.apache.samza.container.ContainerHeartbeatClient r1 = r1.createContainerHeartbeatClient(r2, r3)     // Catch: java.lang.InterruptedException -> L95
            r0.containerHeartbeatClient = r1     // Catch: java.lang.InterruptedException -> L95
            r0 = r6
            org.apache.samza.container.ContainerHeartbeatClient r0 = r0.containerHeartbeatClient     // Catch: java.lang.InterruptedException -> L95
            org.apache.samza.container.ContainerHeartbeatResponse r0 = r0.requestHeartbeat()     // Catch: java.lang.InterruptedException -> L95
            boolean r0 = r0.isAlive()     // Catch: java.lang.InterruptedException -> L95
            r7 = r0
            org.slf4j.Logger r0 = org.apache.samza.container.ContainerHeartbeatMonitor.LOG     // Catch: java.lang.InterruptedException -> L95
            java.lang.String r1 = "Received heartbeat response: {} from new AM: {}"
            r2 = r7
            java.lang.Boolean r2 = java.lang.Boolean.valueOf(r2)     // Catch: java.lang.InterruptedException -> L95
            r3 = r6
            java.lang.String r3 = r3.coordinatorUrl     // Catch: java.lang.InterruptedException -> L95
            r0.info(r1, r2, r3)     // Catch: java.lang.InterruptedException -> L95
            goto Lb1
        L92:
            goto Lab
        L95:
            r12 = move-exception
            org.slf4j.Logger r0 = org.apache.samza.container.ContainerHeartbeatMonitor.LOG
            java.lang.String r1 = "Interrupted during sleep."
            r0.warn(r1)
            org.apache.samza.SamzaException r0 = new org.apache.samza.SamzaException
            r1 = r0
            r2 = r12
            r1.<init>(r2)
            throw r0
        Lab:
            int r8 = r8 + 1
            goto L8
        Lb1:
            r0 = r6
            org.apache.samza.container.ContainerHeartbeatMonitor$ContainerHeartbeatMetrics r0 = r0.metrics
            long r1 = java.lang.System.currentTimeMillis()
            r2 = r9
            long r1 = r1 - r2
            org.apache.samza.container.ContainerHeartbeatMonitor.ContainerHeartbeatMetrics.access$100(r0, r1)
            r0 = r7
            if (r0 == 0) goto Lcb
            r0 = r6
            org.apache.samza.container.ContainerHeartbeatMonitor$ContainerHeartbeatMetrics r0 = r0.metrics
            org.apache.samza.container.ContainerHeartbeatMonitor.ContainerHeartbeatMetrics.access$200(r0)
            goto Ld2
        Lcb:
            r0 = r6
            org.apache.samza.container.ContainerHeartbeatMonitor$ContainerHeartbeatMetrics r0 = r0.metrics
            org.apache.samza.container.ContainerHeartbeatMonitor.ContainerHeartbeatMetrics.access$300(r0)
        Ld2:
            r0 = r7
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.samza.container.ContainerHeartbeatMonitor.checkAndEstablishConnectionWithNewAM():boolean");
    }

    private void initializeMetrics(Config config) {
        this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), SOURCE_NAME);
        MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
        this.metrics = new ContainerHeartbeatMetrics(metricsRegistryMap);
        this.reporters.values().forEach(metricsReporter -> {
            metricsReporter.register(SOURCE_NAME, metricsRegistryMap);
        });
    }

    @VisibleForTesting
    ContainerHeartbeatClient createContainerHeartbeatClient(String str, String str2) {
        return new ContainerHeartbeatClient(str, str2);
    }

    @VisibleForTesting
    ContainerHeartbeatMetrics getMetrics() {
        return this.metrics;
    }

    private void forceExit(String str, int i) {
        this.scheduler.schedule(() -> {
            if (this.started) {
                this.reporters.values().forEach((v0) -> {
                    v0.stop();
                });
            }
            LOG.error(str);
            ThreadUtil.logThreadDump("Thread dump at heartbeat monitor: due to " + str);
            System.exit(1);
        }, i, TimeUnit.MILLISECONDS);
        this.onContainerExpired.run();
    }
}
