/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest;
import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

public class ZooKeeperCompletedCheckpointStoreITCase
extends CompletedCheckpointStoreTest {
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
    private static final String CheckpointsPath = "/checkpoints";

    @AfterClass
    public static void tearDown() throws Exception {
        if (ZooKeeper != null) {
            ZooKeeper.shutdown();
        }
    }

    @Before
    public void cleanUp() throws Exception {
        ZooKeeper.deleteAll();
    }

    @Override
    protected CompletedCheckpointStore createCompletedCheckpoints(int maxNumberOfCheckpointsToRetain, ClassLoader userLoader) throws Exception {
        return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader, ZooKeeper.createClient(), CheckpointsPath, (StateStorageHelper)new StateStorageHelper<CompletedCheckpoint>(){

            public StateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
                return new LocalStateHandle((Serializable)state);
            }
        });
    }

    @Test
    public void testRecover() throws Exception {
        CompletedCheckpointStore checkpoints = this.createCompletedCheckpoints(3, ClassLoader.getSystemClassLoader());
        CompletedCheckpointStoreTest.TestCheckpoint[] expected = new CompletedCheckpointStoreTest.TestCheckpoint[]{this.createCheckpoint(0), this.createCheckpoint(1), this.createCheckpoint(2)};
        checkpoints.addCheckpoint((CompletedCheckpoint)expected[0]);
        checkpoints.addCheckpoint((CompletedCheckpoint)expected[1]);
        checkpoints.addCheckpoint((CompletedCheckpoint)expected[2]);
        Assert.assertEquals((long)3L, (long)((List)ZooKeeper.getClient().getChildren().forPath(CheckpointsPath)).size());
        Assert.assertEquals((long)3L, (long)checkpoints.getNumberOfRetainedCheckpoints());
        checkpoints.recover();
        Deadline deadline = new FiniteDuration(1L, TimeUnit.MINUTES).fromNow();
        while (deadline.hasTimeLeft() && ((List)ZooKeeper.getClient().getChildren().forPath(CheckpointsPath)).size() != 1) {
            Thread.sleep(Math.min(100L, deadline.timeLeft().toMillis()));
        }
        Assert.assertEquals((long)1L, (long)((List)ZooKeeper.getClient().getChildren().forPath(CheckpointsPath)).size());
        Assert.assertEquals((long)1L, (long)checkpoints.getNumberOfRetainedCheckpoints());
        Assert.assertEquals((Object)((Object)expected[2]), (Object)checkpoints.getLatestCheckpoint());
    }
}

