/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.test.util.AbstractTestBase;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class JobMasterStopWithSavepointIT
extends AbstractTestBase {
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final long CHECKPOINT_INTERVAL = 10L;
    private static final int PARALLELISM = 2;
    private static OneShotLatch finishingLatch;
    private static CountDownLatch invokeLatch;
    private static CountDownLatch numberOfRestarts;
    private static AtomicLong syncSavepointId;
    private static CountDownLatch checkpointsToWaitFor;
    private Path savepointDirectory;
    private MiniClusterClient clusterClient;
    private JobGraph jobGraph;

    @Test(timeout=5000L)
    public void suspendWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished() throws Exception {
        this.stopWithSavepointNormalExecutionHelper(false);
    }

    @Test(timeout=5000L)
    public void terminateWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished() throws Exception {
        this.stopWithSavepointNormalExecutionHelper(true);
    }

    private void stopWithSavepointNormalExecutionHelper(boolean terminate) throws Exception {
        List savepoints;
        this.setUpJobGraph(NoOpBlockingStreamTask.class, RestartStrategies.noRestart());
        CompletableFuture<String> savepointLocationFuture = this.stopWithSavepoint(terminate);
        MatcherAssert.assertThat((Object)this.getJobStatus(), (Matcher)Matchers.equalTo((Object)JobStatus.RUNNING));
        finishingLatch.trigger();
        String savepointLocation = savepointLocationFuture.get();
        MatcherAssert.assertThat((Object)this.getJobStatus(), (Matcher)Matchers.equalTo((Object)JobStatus.FINISHED));
        try (Stream<Path> savepointFiles = Files.list(this.savepointDirectory);){
            savepoints = savepointFiles.map(Path::getFileName).collect(Collectors.toList());
        }
        MatcherAssert.assertThat(savepoints, (Matcher)Matchers.hasItem((Object)Paths.get(savepointLocation, new String[0]).getFileName()));
    }

    @Test(timeout=5000L)
    public void throwingExceptionOnCallbackWithNoRestartsShouldFailTheSuspend() throws Exception {
        this.throwingExceptionOnCallbackWithoutRestartsHelper(false);
    }

    @Test(timeout=5000L)
    public void throwingExceptionOnCallbackWithNoRestartsShouldFailTheTerminate() throws Exception {
        this.throwingExceptionOnCallbackWithoutRestartsHelper(true);
    }

    private void throwingExceptionOnCallbackWithoutRestartsHelper(boolean terminate) throws Exception {
        this.setUpJobGraph(ExceptionOnCallbackStreamTask.class, RestartStrategies.noRestart());
        MatcherAssert.assertThat((Object)this.getJobStatus(), (Matcher)Matchers.equalTo((Object)JobStatus.RUNNING));
        try {
            this.stopWithSavepoint(terminate).get();
            Assert.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertTrue((syncSavepointId.get() > 0L ? 1 : 0) != 0);
        MatcherAssert.assertThat((Object)this.getJobStatus(), (Matcher)Matchers.equalTo((Object)JobStatus.FAILED));
    }

    @Test(timeout=5000L)
    public void throwingExceptionOnCallbackWithRestartsShouldSimplyRestartInSuspend() throws Exception {
        this.throwingExceptionOnCallbackWithRestartsHelper(false);
    }

    @Test(timeout=5000L)
    public void throwingExceptionOnCallbackWithRestartsShouldSimplyRestartInTerminate() throws Exception {
        this.throwingExceptionOnCallbackWithRestartsHelper(true);
    }

    private void throwingExceptionOnCallbackWithRestartsHelper(boolean terminate) throws Exception {
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofSeconds(10L));
        int numberOfCheckpointsToExpect = 10;
        numberOfRestarts = new CountDownLatch(2);
        checkpointsToWaitFor = new CountDownLatch(10);
        this.setUpJobGraph(ExceptionOnCallbackStreamTask.class, RestartStrategies.fixedDelayRestart((int)15, (Time)Time.milliseconds((long)10L)));
        MatcherAssert.assertThat((Object)this.getJobStatus(), (Matcher)Matchers.equalTo((Object)JobStatus.RUNNING));
        try {
            this.stopWithSavepoint(terminate).get(50L, TimeUnit.MILLISECONDS);
            Assert.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
        numberOfRestarts.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        checkpointsToWaitFor.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        Assert.assertTrue((syncSavepointId.get() > 0L ? 1 : 0) != 0);
        MatcherAssert.assertThat((Object)this.getJobStatus(), (Matcher)Matchers.equalTo((Object)JobStatus.RUNNING));
        long syncSavepoint = syncSavepointId.get();
        Assert.assertTrue((syncSavepoint > 0L && syncSavepoint < 10L ? 1 : 0) != 0);
        this.clusterClient.cancel(this.jobGraph.getJobID());
        MatcherAssert.assertThat((Object)this.getJobStatus(), (Matcher)Matchers.either((Matcher)Matchers.equalTo((Object)JobStatus.CANCELLING)).or(Matchers.equalTo((Object)JobStatus.CANCELED)));
    }

    private CompletableFuture<String> stopWithSavepoint(boolean terminate) {
        return miniClusterResource.getMiniCluster().stopWithSavepoint(this.jobGraph.getJobID(), this.savepointDirectory.toAbsolutePath().toString(), terminate);
    }

    private JobStatus getJobStatus() throws InterruptedException, ExecutionException {
        return (JobStatus)this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get();
    }

    private void setUpJobGraph(Class<? extends AbstractInvokable> invokable, RestartStrategies.RestartStrategyConfiguration restartStrategy) throws Exception {
        finishingLatch = new OneShotLatch();
        invokeLatch = new CountDownLatch(2);
        numberOfRestarts = new CountDownLatch(2);
        checkpointsToWaitFor = new CountDownLatch(10);
        syncSavepointId.set(-1L);
        this.savepointDirectory = this.temporaryFolder.newFolder().toPath();
        Assume.assumeTrue((String)"ClusterClient is not an instance of MiniClusterClient", (boolean)(miniClusterResource.getClusterClient() instanceof MiniClusterClient));
        this.clusterClient = (MiniClusterClient)miniClusterResource.getClusterClient();
        this.clusterClient.setDetached(true);
        this.jobGraph = new JobGraph(new JobVertex[0]);
        ExecutionConfig config = new ExecutionConfig();
        config.setRestartStrategy(restartStrategy);
        this.jobGraph.setExecutionConfig(config);
        JobVertex vertex = new JobVertex("testVertex");
        vertex.setInvokableClass(invokable);
        vertex.setParallelism(2);
        this.jobGraph.addVertex(vertex);
        this.jobGraph.setSnapshotSettings(new JobCheckpointingSettings(Collections.singletonList(vertex.getID()), Collections.singletonList(vertex.getID()), Collections.singletonList(vertex.getID()), new CheckpointCoordinatorConfiguration(10L, 60000L, 10L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, 0), null));
        this.clusterClient.submitJob(this.jobGraph, ClassLoader.getSystemClassLoader());
        invokeLatch.await(60L, TimeUnit.SECONDS);
        this.waitForJob();
    }

    private void waitForJob() throws Exception {
        for (int i = 0; i < 60; ++i) {
            try {
                JobStatus jobStatus = (JobStatus)this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get(60L, TimeUnit.SECONDS);
                MatcherAssert.assertThat((Object)jobStatus.isGloballyTerminalState(), (Matcher)Matchers.equalTo((Object)false));
                if (jobStatus == JobStatus.RUNNING) {
                    return;
                }
            }
            catch (ExecutionException executionException) {
                // empty catch block
            }
            Thread.sleep(1000L);
        }
        throw new AssertionError((Object)"Job did not become running within timeout.");
    }

    static {
        syncSavepointId = new AtomicLong();
    }

    public static class NoOpBlockingStreamTask
    extends StreamTaskTest.NoOpStreamTask {
        private final transient OneShotLatch finishLatch = new OneShotLatch();

        public NoOpBlockingStreamTask(Environment environment) {
            super(environment);
        }

        protected void processInput(StreamTask.ActionContext context) throws Exception {
            invokeLatch.countDown();
            this.finishLatch.await();
            context.allActionsCompleted();
        }

        public void finishTask() throws Exception {
            finishingLatch.await();
            this.finishLatch.trigger();
        }
    }

    public static class ExceptionOnCallbackStreamTask
    extends StreamTaskTest.NoOpStreamTask {
        private long synchronousSavepointId = Long.MIN_VALUE;
        private final transient OneShotLatch finishLatch = new OneShotLatch();

        public ExceptionOnCallbackStreamTask(Environment environment) {
            super(environment);
        }

        protected void processInput(StreamTask.ActionContext context) throws Exception {
            long taskIndex = this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
            if (taskIndex == 0L) {
                numberOfRestarts.countDown();
            }
            invokeLatch.countDown();
            this.finishLatch.await();
            context.allActionsCompleted();
        }

        protected void cancelTask() throws Exception {
            super.cancelTask();
            this.finishLatch.trigger();
        }

        public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
            long taskIndex;
            long checkpointId = checkpointMetaData.getCheckpointId();
            CheckpointType checkpointType = checkpointOptions.getCheckpointType();
            if (checkpointType == CheckpointType.SYNC_SAVEPOINT) {
                this.synchronousSavepointId = checkpointId;
                syncSavepointId.compareAndSet(-1L, this.synchronousSavepointId);
            }
            if ((taskIndex = (long)this.getEnvironment().getTaskInfo().getIndexOfThisSubtask()) == 0L) {
                checkpointsToWaitFor.countDown();
            }
            return super.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            long taskIndex = this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
            if (checkpointId == this.synchronousSavepointId && taskIndex == 0L) {
                throw new RuntimeException("Expected Exception");
            }
            super.notifyCheckpointComplete(checkpointId);
        }
    }
}

