/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.nio.file.CopyOption;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.core.Is;
import org.hamcrest.number.OrderingComparison;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class ZooKeeperHighAvailabilityITCase
extends TestLogger {
    private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000L);
    private static final int NUM_JMS = 1;
    private static final int NUM_TMS = 1;
    private static final int NUM_SLOTS_PER_TM = 1;
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private static File haStorageDir;
    private static TestingServer zkServer;
    private static MiniClusterWithClientResource miniClusterResource;
    private static OneShotLatch waitForCheckpointLatch;
    private static OneShotLatch failInCheckpointLatch;
    private static OneShotLatch blockSnapshotLatch;

    @BeforeClass
    public static void setup() throws Exception {
        zkServer = new TestingServer();
        Configuration config = new Configuration();
        config.setInteger("local.number-jobmanager", 1);
        config.setInteger("local.number-taskmanager", 1);
        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
        haStorageDir = TEMPORARY_FOLDER.newFolder();
        config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
        config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
        config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
        config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        config.setString("metrics.reporter.restarts.class", RestartReporter.class.getName());
        miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
        miniClusterResource.before();
    }

    @AfterClass
    public static void tearDown() throws Exception {
        miniClusterResource.after();
        zkServer.stop();
        zkServer.close();
    }

    @Test
    public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
        CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
        CheckpointBlockingFunction.successfulRestores.set(0);
        CheckpointBlockingFunction.illegalRestores.set(0);
        CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
        CheckpointBlockingFunction.failedAlready.set(false);
        waitForCheckpointLatch = new OneShotLatch();
        failInCheckpointLatch = new OneShotLatch();
        blockSnapshotLatch = new OneShotLatch();
        ClusterClient clusterClient = miniClusterResource.getClusterClient();
        Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)0L));
        env.enableCheckpointing(10L);
        File checkpointLocation = TEMPORARY_FOLDER.newFolder();
        env.setStateBackend((StateBackend)new FsStateBackend(checkpointLocation.toURI()));
        DataStreamSource source = env.addSource((SourceFunction)new UnboundedSource());
        source.keyBy((KeySelector & Serializable)str -> str).map((MapFunction)new CheckpointBlockingFunction());
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        JobID jobID = (JobID)Preconditions.checkNotNull((Object)jobGraph.getJobID());
        clusterClient.setDetached(true);
        clusterClient.submitJob(jobGraph, ZooKeeperHighAvailabilityITCase.class.getClassLoader());
        waitForCheckpointLatch.await();
        this.log.debug("Messing with HA directory");
        final File movedCheckpointLocation = TEMPORARY_FOLDER.newFolder();
        final AtomicInteger numCheckpoints = new AtomicInteger();
        Files.walkFileTree(haStorageDir.toPath(), (FileVisitor<? super Path>)new SimpleFileVisitor<Path>(){

            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
                if (file.getFileName().toString().startsWith("completedCheckpoint")) {
                    ZooKeeperHighAvailabilityITCase.this.log.debug("Moving original checkpoint file {}.", (Object)file);
                    try {
                        Files.move(file, movedCheckpointLocation.toPath().resolve(file.getFileName()), new CopyOption[0]);
                        numCheckpoints.incrementAndGet();
                    }
                    catch (IOException ioe) {
                        ZooKeeperHighAvailabilityITCase.this.log.debug("Exception while moving HA files.", (Throwable)ioe);
                    }
                }
                return FileVisitResult.CONTINUE;
            }
        });
        Assert.assertTrue((numCheckpoints.get() > 0 ? 1 : 0) != 0);
        this.log.debug("Resuming job");
        failInCheckpointLatch.trigger();
        Assert.assertNotNull((String)"fullRestarts metric could not be accessed.", (Object)RestartReporter.numRestarts);
        while (RestartReporter.numRestarts.getValue() < 5L && deadline.hasTimeLeft()) {
            Thread.sleep(50L);
        }
        Assert.assertThat((Object)RestartReporter.numRestarts.getValue(), (Matcher)Is.is((Matcher)OrderingComparison.greaterThan((Comparable)Long.valueOf(4L))));
        CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
        this.log.debug("Restored zookeeper");
        Files.walkFileTree(movedCheckpointLocation.toPath(), (FileVisitor<? super Path>)new SimpleFileVisitor<Path>(){

            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                Files.move(file, haStorageDir.toPath().resolve(file.getFileName()), new CopyOption[0]);
                return FileVisitResult.CONTINUE;
            }
        });
        blockSnapshotLatch.trigger();
        CompletableFuture jobStatusFuture = FutureUtils.retrySuccessfulWithDelay(() -> clusterClient.getJobStatus(jobID), (Time)Time.milliseconds((long)50L), (Deadline)deadline, JobStatus::isGloballyTerminalState, (ScheduledExecutor)TestingUtils.defaultScheduledExecutor());
        try {
            Assert.assertEquals((Object)JobStatus.FINISHED, jobStatusFuture.get());
        }
        catch (Throwable e) {
            StringWriter error = new StringWriter();
            try (PrintWriter out = new PrintWriter(error);){
                out.println("The job did not finish in time.");
                out.println("allowedInitializeCallsWithoutRestore= " + CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.get());
                out.println("illegalRestores= " + CheckpointBlockingFunction.illegalRestores.get());
                out.println("successfulRestores= " + CheckpointBlockingFunction.successfulRestores.get());
                out.println("afterMessWithZooKeeper= " + CheckpointBlockingFunction.afterMessWithZooKeeper.get());
                out.println("failedAlready= " + CheckpointBlockingFunction.failedAlready.get());
                out.println("currentJobStatus= " + clusterClient.getJobStatus(jobID).get());
                out.println("numRestarts= " + RestartReporter.numRestarts.getValue());
                out.println("threadDump= " + ZooKeeperHighAvailabilityITCase.generateThreadDump());
            }
            throw new AssertionError(error.toString(), ExceptionUtils.stripCompletionException((Throwable)e));
        }
        Assert.assertThat((String)"We saw illegal restores.", (Object)CheckpointBlockingFunction.illegalRestores.get(), (Matcher)Is.is((Object)0));
    }

    private static String generateThreadDump() {
        ThreadInfo[] threadInfos;
        StringBuilder dump = new StringBuilder();
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        for (ThreadInfo threadInfo : threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100)) {
            StackTraceElement[] stackTraceElements;
            dump.append('\"');
            dump.append(threadInfo.getThreadName());
            dump.append('\"');
            Thread.State state = threadInfo.getThreadState();
            dump.append(System.lineSeparator());
            dump.append("   java.lang.Thread.State: ");
            dump.append((Object)state);
            for (StackTraceElement stackTraceElement : stackTraceElements = threadInfo.getStackTrace()) {
                dump.append(System.lineSeparator());
                dump.append("        at ");
                dump.append(stackTraceElement);
            }
            dump.append(System.lineSeparator());
            dump.append(System.lineSeparator());
        }
        return dump.toString();
    }

    public static class RestartReporter
    implements MetricReporter {
        static volatile NumberOfFullRestartsGauge numRestarts = null;

        public void open(MetricConfig metricConfig) {
        }

        public void close() {
        }

        public void notifyOfAddedMetric(Metric metric, String s, MetricGroup metricGroup) {
            if (metric instanceof NumberOfFullRestartsGauge) {
                numRestarts = (NumberOfFullRestartsGauge)metric;
            }
        }

        public void notifyOfRemovedMetric(Metric metric, String s, MetricGroup metricGroup) {
        }
    }

    private static class CheckpointBlockingFunction
    extends RichMapFunction<String, String>
    implements CheckpointedFunction,
    CheckpointListener {
        static AtomicInteger allowedInitializeCallsWithoutRestore = new AtomicInteger(1);
        static AtomicInteger illegalRestores = new AtomicInteger(0);
        static AtomicInteger successfulRestores = new AtomicInteger(0);
        static AtomicBoolean afterMessWithZooKeeper = new AtomicBoolean(false);
        static AtomicBoolean failedAlready = new AtomicBoolean(false);
        static AtomicBoolean stateRecorded = new AtomicBoolean(false);
        static AtomicLong minimalCheckpointIdIncludingData = new AtomicLong(Long.MAX_VALUE);
        static AtomicBoolean checkpointCompletedIncludingData = new AtomicBoolean(false);
        private final ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor("state", (TypeSerializer)StringSerializer.INSTANCE);

        private CheckpointBlockingFunction() {
        }

        public String map(String value) throws Exception {
            this.getRuntimeContext().getState(this.stateDescriptor).update((Object)"42");
            stateRecorded.compareAndSet(false, true);
            return value;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (stateRecorded.get()) {
                minimalCheckpointIdIncludingData.compareAndSet(Long.MAX_VALUE, context.getCheckpointId());
            }
            if (checkpointCompletedIncludingData.get()) {
                waitForCheckpointLatch.trigger();
                failInCheckpointLatch.await();
                if (!failedAlready.getAndSet(true)) {
                    throw new RuntimeException("Failing on purpose.");
                }
                blockSnapshotLatch.await();
            }
        }

        public void initializeState(FunctionInitializationContext context) {
            if (!context.isRestored()) {
                int updatedValue = allowedInitializeCallsWithoutRestore.decrementAndGet();
                if (updatedValue < 0) {
                    illegalRestores.getAndIncrement();
                    throw new RuntimeException("We are not allowed any more restores.");
                }
            } else if (!afterMessWithZooKeeper.get()) {
                illegalRestores.getAndIncrement();
            } else if (successfulRestores.getAndIncrement() > 0) {
                illegalRestores.getAndIncrement();
            }
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            if (checkpointId >= minimalCheckpointIdIncludingData.get()) {
                checkpointCompletedIncludingData.compareAndSet(false, true);
            }
        }
    }

    private static class UnboundedSource
    implements SourceFunction<String> {
        private volatile boolean running = true;

        private UnboundedSource() {
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            while (this.running && !CheckpointBlockingFunction.afterMessWithZooKeeper.get()) {
                ctx.collect((Object)"hello");
                Thread.sleep(50L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }
}

