/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.query;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Scheduler;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.Recover;
import akka.pattern.Patterns;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateLocationLookupService;
import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.runtime.query.UnknownJobManager;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.PartialFunction;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;

class AkkaKvStateLocationLookupService
implements KvStateLocationLookupService,
LeaderRetrievalListener {
    private static final Logger LOG = LoggerFactory.getLogger(KvStateLocationLookupService.class);
    private static final Future<ActorGateway> UNKNOWN_JOB_MANAGER = Futures.failed((Throwable)new UnknownJobManager());
    private final LeaderRetrievalService leaderRetrievalService;
    private final ActorSystem actorSystem;
    private final FiniteDuration askTimeout;
    private final LookupRetryStrategyFactory retryStrategyFactory;
    private volatile Future<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER;

    AkkaKvStateLocationLookupService(LeaderRetrievalService leaderRetrievalService, ActorSystem actorSystem, FiniteDuration askTimeout, LookupRetryStrategyFactory retryStrategyFactory) {
        this.leaderRetrievalService = (LeaderRetrievalService)Preconditions.checkNotNull((Object)leaderRetrievalService, (String)"Leader retrieval service");
        this.actorSystem = (ActorSystem)Preconditions.checkNotNull((Object)actorSystem, (String)"Actor system");
        this.askTimeout = (FiniteDuration)Preconditions.checkNotNull((Object)askTimeout, (String)"Ask Timeout");
        this.retryStrategyFactory = (LookupRetryStrategyFactory)Preconditions.checkNotNull((Object)retryStrategyFactory, (String)"Retry strategy factory");
    }

    @Override
    public void start() {
        try {
            this.leaderRetrievalService.start(this);
        }
        catch (Exception e) {
            LOG.error("Failed to start leader retrieval service", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public void shutDown() {
        try {
            this.leaderRetrievalService.stop();
        }
        catch (Exception e) {
            LOG.error("Failed to stop leader retrieval service", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public Future<KvStateLocation> getKvStateLookupInfo(JobID jobId, String registrationName) {
        return this.getKvStateLookupInfo(jobId, registrationName, this.retryStrategyFactory.createRetryStrategy());
    }

    private Future<KvStateLocation> getKvStateLookupInfo(final JobID jobId, final String registrationName, final LookupRetryStrategy lookupRetryStrategy) {
        return this.jobManagerFuture.flatMap((Function1)new Mapper<ActorGateway, Future<Object>>(){

            public Future<Object> apply(ActorGateway jobManager) {
                KvStateMessage.LookupKvStateLocation msg = new KvStateMessage.LookupKvStateLocation(jobId, registrationName);
                return jobManager.ask(msg, AkkaKvStateLocationLookupService.this.askTimeout);
            }
        }, (ExecutionContext)this.actorSystem.dispatcher()).mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class)).recoverWith((PartialFunction)new Recover<Future<KvStateLocation>>(){

            public Future<KvStateLocation> recover(Throwable failure) throws Throwable {
                if (failure instanceof UnknownJobManager && lookupRetryStrategy.tryRetry()) {
                    return Patterns.after((FiniteDuration)lookupRetryStrategy.getRetryDelay(), (Scheduler)AkkaKvStateLocationLookupService.this.actorSystem.scheduler(), (ExecutionContext)AkkaKvStateLocationLookupService.this.actorSystem.dispatcher(), (Callable)new Callable<Future<KvStateLocation>>(){

                        @Override
                        public Future<KvStateLocation> call() throws Exception {
                            return AkkaKvStateLocationLookupService.this.getKvStateLookupInfo(jobId, registrationName, lookupRetryStrategy);
                        }
                    });
                }
                return Futures.failed((Throwable)failure);
            }
        }, (ExecutionContext)this.actorSystem.dispatcher());
    }

    @Override
    public void notifyLeaderAddress(String leaderAddress, final UUID leaderSessionID) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received leader address notification {}:{}", (Object)leaderAddress, (Object)leaderSessionID);
        }
        this.jobManagerFuture = leaderAddress == null ? UNKNOWN_JOB_MANAGER : AkkaUtils.getActorRefFuture(leaderAddress, this.actorSystem, this.askTimeout).map((Function1)new Mapper<ActorRef, ActorGateway>(){

            public ActorGateway apply(ActorRef actorRef) {
                return new AkkaActorGateway(actorRef, leaderSessionID);
            }
        }, (ExecutionContext)this.actorSystem.dispatcher());
    }

    @Override
    public void handleError(Exception exception) {
        this.jobManagerFuture = Futures.failed((Throwable)exception);
    }

    static class FixedDelayLookupRetryStrategyFactory
    implements LookupRetryStrategyFactory {
        private final int maxRetries;
        private final FiniteDuration retryDelay;

        FixedDelayLookupRetryStrategyFactory(int maxRetries, FiniteDuration retryDelay) {
            this.maxRetries = maxRetries;
            this.retryDelay = retryDelay;
        }

        @Override
        public LookupRetryStrategy createRetryStrategy() {
            return new FixedDelayLookupRetryStrategy(this.maxRetries, this.retryDelay);
        }

        private static class FixedDelayLookupRetryStrategy
        implements LookupRetryStrategy {
            private final Object retryLock = new Object();
            private final int maxRetries;
            private final FiniteDuration retryDelay;
            private int numRetries;

            public FixedDelayLookupRetryStrategy(int maxRetries, FiniteDuration retryDelay) {
                Preconditions.checkArgument((maxRetries >= 0 ? 1 : 0) != 0, (Object)"Negative number maximum retries");
                this.maxRetries = maxRetries;
                this.retryDelay = (FiniteDuration)Preconditions.checkNotNull((Object)retryDelay, (String)"Retry delay");
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public FiniteDuration getRetryDelay() {
                Object object = this.retryLock;
                synchronized (object) {
                    return this.retryDelay;
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public boolean tryRetry() {
                Object object = this.retryLock;
                synchronized (object) {
                    if (this.numRetries < this.maxRetries) {
                        ++this.numRetries;
                        return true;
                    }
                    return false;
                }
            }
        }
    }

    static class DisabledLookupRetryStrategyFactory
    implements LookupRetryStrategyFactory {
        private static final DisabledLookupRetryStrategy RETRY_STRATEGY = new DisabledLookupRetryStrategy();

        DisabledLookupRetryStrategyFactory() {
        }

        @Override
        public LookupRetryStrategy createRetryStrategy() {
            return RETRY_STRATEGY;
        }

        private static class DisabledLookupRetryStrategy
        implements LookupRetryStrategy {
            private DisabledLookupRetryStrategy() {
            }

            @Override
            public FiniteDuration getRetryDelay() {
                return FiniteDuration.Zero();
            }

            @Override
            public boolean tryRetry() {
                return false;
            }
        }
    }

    static interface LookupRetryStrategyFactory {
        public LookupRetryStrategy createRetryStrategy();
    }

    static interface LookupRetryStrategy {
        public FiniteDuration getRetryDelay();

        public boolean tryRetry();
    }
}

