/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.runtime.leaderelection;

import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class ZooKeeperLeaderElectionITCase
extends TestLogger {
    private static final Duration TEST_TIMEOUT = Duration.ofMinutes(5L);
    private static final Time RPC_TIMEOUT = Time.minutes((long)1L);
    private static TestingServer zkServer;
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    @BeforeClass
    public static void setup() throws Exception {
        zkServer = new TestingServer(true);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (zkServer != null) {
            zkServer.close();
            zkServer = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobExecutionOnClusterWithLeaderChange() throws Exception {
        int numDispatchers = 3;
        int numTMs = 2;
        int numSlotsPerTM = 2;
        Configuration configuration = ZooKeeperTestUtils.createZooKeeperHAConfig((String)zkServer.getConnectString(), (String)this.tempFolder.newFolder().getAbsolutePath());
        configuration.setLong(ClusterOptions.REFUSED_REGISTRATION_DELAY, 50L);
        TestingMiniClusterConfiguration miniClusterConfiguration = new TestingMiniClusterConfiguration.Builder().setConfiguration(configuration).setNumberDispatcherResourceManagerComponents(3).setNumTaskManagers(2).setNumSlotsPerTaskManager(2).build();
        LeaderRetrievalService dispatcherLeaderRetriever = null;
        try (TestingMiniCluster miniCluster = new TestingMiniCluster(miniClusterConfiguration);){
            Deadline timeout = Deadline.fromNow((Duration)TEST_TIMEOUT);
            miniCluster.start();
            int parallelism = 4;
            JobGraph jobGraph = this.createJobGraph(4);
            miniCluster.submitJob(jobGraph).get();
            Collection dispatcherResourceManagerComponents = miniCluster.getDispatcherResourceManagerComponents();
            NewLeaderRetriever newLeaderRetriever = new NewLeaderRetriever();
            HighAvailabilityServices highAvailabilityServices = miniCluster.getHighAvailabilityServices();
            dispatcherLeaderRetriever = highAvailabilityServices.getDispatcherLeaderRetriever();
            dispatcherLeaderRetriever.start((LeaderRetrievalListener)newLeaderRetriever);
            for (int i = 0; i < 2; ++i) {
                DispatcherResourceManagerComponent<?> leadingDispatcherResourceManagerComponent = this.getLeadingDispatcherResourceManagerComponent(dispatcherResourceManagerComponents, newLeaderRetriever);
                Dispatcher dispatcher = leadingDispatcherResourceManagerComponent.getDispatcher();
                CommonTestUtils.waitUntilCondition(() -> dispatcher.requestJobStatus(jobGraph.getJobID(), RPC_TIMEOUT).get() == JobStatus.RUNNING, (Deadline)timeout, (long)50L);
                leadingDispatcherResourceManagerComponent.closeAsync();
            }
            DispatcherResourceManagerComponent<?> leadingDispatcherResourceManagerComponent = this.getLeadingDispatcherResourceManagerComponent(dispatcherResourceManagerComponents, newLeaderRetriever);
            CompletableFuture jobResultFuture = leadingDispatcherResourceManagerComponent.getDispatcher().requestJobResult(jobGraph.getJobID(), RPC_TIMEOUT);
            BlockingOperator.unblock();
            Assert.assertThat((Object)((JobResult)jobResultFuture.get()).isSuccess(), (Matcher)Matchers.is((Object)true));
        }
        finally {
            if (dispatcherLeaderRetriever != null) {
                dispatcherLeaderRetriever.stop();
            }
        }
    }

    @Nonnull
    protected DispatcherResourceManagerComponent<?> getLeadingDispatcherResourceManagerComponent(Collection<DispatcherResourceManagerComponent<?>> dispatcherResourceManagerComponents, NewLeaderRetriever newLeaderRetriever) throws Exception {
        Tuple2<String, UUID> leaderInformation = newLeaderRetriever.waitUntilNewLeader().get();
        String leaderAddress = (String)leaderInformation.f0;
        return ZooKeeperLeaderElectionITCase.findLeadingDispatcherResourceManagerComponent(dispatcherResourceManagerComponents, leaderAddress).orElseThrow(() -> new Exception(String.format("Could not find the leading Dispatcher with address %s", leaderAddress)));
    }

    @Nonnull
    private static Optional<DispatcherResourceManagerComponent<?>> findLeadingDispatcherResourceManagerComponent(Collection<DispatcherResourceManagerComponent<?>> dispatcherResourceManagerComponents, String address) {
        for (DispatcherResourceManagerComponent<?> dispatcherResourceManagerComponent : dispatcherResourceManagerComponents) {
            if (!dispatcherResourceManagerComponent.getDispatcher().getAddress().equals(address)) continue;
            return Optional.of(dispatcherResourceManagerComponent);
        }
        return Optional.empty();
    }

    private JobGraph createJobGraph(int parallelism) {
        BlockingOperator.isBlocking = true;
        JobVertex vertex = new JobVertex("blocking operator");
        vertex.setParallelism(parallelism);
        vertex.setInvokableClass(BlockingOperator.class);
        return new JobGraph("Blocking test job", new JobVertex[]{vertex});
    }

    public static class BlockingOperator
    extends AbstractInvokable {
        private static final Object lock = new Object();
        private static volatile boolean isBlocking = true;

        public BlockingOperator(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            Object object = lock;
            synchronized (object) {
                while (isBlocking) {
                    lock.wait();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static void unblock() {
            Object object = lock;
            synchronized (object) {
                isBlocking = false;
                lock.notifyAll();
            }
        }
    }

    private static class NewLeaderRetriever
    extends LeaderRetriever {
        private final Object lock = new Object();
        @Nullable
        private Tuple2<String, UUID> lastAddress = null;
        private CompletableFuture<Tuple2<String, UUID>> newLeaderFuture = new CompletableFuture();

        private NewLeaderRetriever() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        CompletableFuture<Tuple2<String, UUID>> waitUntilNewLeader() {
            Object object = this.lock;
            synchronized (object) {
                if (this.newLeaderFuture.isDone()) {
                    CompletableFuture<Tuple2<String, UUID>> newLeader = this.newLeaderFuture;
                    this.newLeaderFuture = new CompletableFuture();
                    return newLeader;
                }
                return this.newLeaderFuture.thenApply(stringUUIDTuple2 -> {
                    Object object = this.lock;
                    synchronized (object) {
                        this.newLeaderFuture = new CompletableFuture();
                    }
                    return stringUUIDTuple2;
                });
            }
        }

        protected void notifyNewLeaderAddress(CompletableFuture<Tuple2<String, UUID>> newLeaderAddressFuture) {
            newLeaderAddressFuture.whenComplete((newLeaderAddress, throwable) -> {
                Object object = this.lock;
                synchronized (object) {
                    if (throwable != null) {
                        this.newLeaderFuture.completeExceptionally((Throwable)throwable);
                    } else if (!newLeaderAddress.equals(this.lastAddress)) {
                        this.lastAddress = newLeaderAddress;
                        if (this.newLeaderFuture.isDone()) {
                            this.newLeaderFuture = CompletableFuture.completedFuture(newLeaderAddress);
                        } else {
                            this.newLeaderFuture.complete((Tuple2<String, UUID>)newLeaderAddress);
                        }
                    }
                }
            });
        }
    }
}

