/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ha;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.DummyHAService;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HealthMonitor;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestHealthMonitor {
    private static final Log LOG = LogFactory.getLog(TestHealthMonitor.class);
    private AtomicInteger createProxyCount = new AtomicInteger(0);
    private volatile boolean throwOOMEOnCreate = false;
    private HealthMonitor hm;
    private DummyHAService svc;

    @Before
    public void setupHM() throws InterruptedException, IOException {
        Configuration conf = new Configuration();
        conf.setInt("ipc.client.connect.max.retries", 1);
        conf.setInt("ha.health-monitor.check-interval.ms", 50);
        conf.setInt("ha.health-monitor.connect-retry-interval.ms", 50);
        conf.setInt("ha.health-monitor.sleep-after-disconnect.ms", 50);
        this.svc = new DummyHAService(HAServiceProtocol.HAServiceState.ACTIVE, null);
        this.hm = new HealthMonitor(conf, this.svc){

            @Override
            protected HAServiceProtocol createProxy() throws IOException {
                TestHealthMonitor.this.createProxyCount.incrementAndGet();
                if (TestHealthMonitor.this.throwOOMEOnCreate) {
                    throw new OutOfMemoryError("oome");
                }
                return super.createProxy();
            }
        };
        LOG.info((Object)"Starting health monitor");
        this.hm.start();
        LOG.info((Object)"Waiting for HEALTHY signal");
        this.waitForState(this.hm, HealthMonitor.State.SERVICE_HEALTHY);
    }

    @Test(timeout=15000L)
    public void testMonitor() throws Exception {
        LOG.info((Object)"Mocking bad health check, waiting for UNHEALTHY");
        this.svc.isHealthy = false;
        this.waitForState(this.hm, HealthMonitor.State.SERVICE_UNHEALTHY);
        LOG.info((Object)"Returning to healthy state, waiting for HEALTHY");
        this.svc.isHealthy = true;
        this.waitForState(this.hm, HealthMonitor.State.SERVICE_HEALTHY);
        LOG.info((Object)"Returning an IOException, as if node went down");
        int countBefore = this.createProxyCount.get();
        this.svc.actUnreachable = true;
        this.waitForState(this.hm, HealthMonitor.State.SERVICE_NOT_RESPONDING);
        while (this.createProxyCount.get() < countBefore + 3) {
            Thread.sleep(10L);
        }
        LOG.info((Object)"Returning to healthy state, waiting for HEALTHY");
        this.svc.actUnreachable = false;
        this.waitForState(this.hm, HealthMonitor.State.SERVICE_HEALTHY);
        this.hm.shutdown();
        this.hm.join();
        Assert.assertFalse((boolean)this.hm.isAlive());
    }

    @Test(timeout=15000L)
    public void testHealthMonitorDies() throws Exception {
        LOG.info((Object)"Mocking RTE in health monitor, waiting for FAILED");
        this.throwOOMEOnCreate = true;
        this.svc.actUnreachable = true;
        this.waitForState(this.hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
        this.hm.shutdown();
        this.hm.join();
        Assert.assertFalse((boolean)this.hm.isAlive());
    }

    @Test(timeout=15000L)
    public void testCallbackThrowsRTE() throws Exception {
        this.hm.addCallback(new HealthMonitor.Callback(){

            @Override
            public void enteredState(HealthMonitor.State newState) {
                throw new RuntimeException("Injected RTE");
            }
        });
        LOG.info((Object)"Mocking bad health check, waiting for UNHEALTHY");
        this.svc.isHealthy = false;
        this.waitForState(this.hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
    }

    private void waitForState(HealthMonitor hm, HealthMonitor.State state) throws InterruptedException {
        long st = Time.now();
        while (Time.now() - st < 2000L) {
            if (hm.getHealthState() == state) {
                return;
            }
            Thread.sleep(50L);
        }
        Assert.assertEquals((Object)((Object)state), (Object)((Object)hm.getHealthState()));
    }
}

