package org.apache.flink.runtime.query;

import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.Recover;
import java.net.ConnectException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService;
import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateClient;
import org.apache.flink.runtime.query.netty.UnknownKvStateID;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.util.Try;

/* loaded from: input_file:org/apache/flink/runtime/query/QueryableStateClient.class */
public class QueryableStateClient {
    private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);
    private final KvStateLocationLookupService lookupService;
    private final KvStateClient kvStateClient;
    private final ExecutionContext executionContext;
    private final ConcurrentMap<Tuple2<JobID, String>, Future<KvStateLocation>> lookupCache;
    private final ActorSystem actorSystem;

    public QueryableStateClient(Configuration configuration) throws Exception {
        this.lookupCache = new ConcurrentHashMap();
        Preconditions.checkNotNull(configuration, "Configuration");
        LeaderRetrievalService createLeaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
        String string = configuration.getString("akka.ask.timeout", ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
        FiniteDuration apply = FiniteDuration.apply(string);
        if (!apply.isFinite()) {
            throw new IllegalConfigurationException("akka.ask.timeout is not a finite timeout ('" + string + "')");
        }
        FiniteDuration finiteDuration = apply;
        AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory fixedDelayLookupRetryStrategyFactory = new AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory(configuration.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRIES), FiniteDuration.apply(configuration.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRY_DELAY), "ms"));
        this.actorSystem = AkkaUtils.createActorSystem(configuration, new Some(new Tuple2("", 0)));
        AkkaKvStateLocationLookupService akkaKvStateLocationLookupService = new AkkaKvStateLocationLookupService(createLeaderRetrievalService, this.actorSystem, finiteDuration, fixedDelayLookupRetryStrategyFactory);
        int integer = configuration.getInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS);
        KvStateClient kvStateClient = new KvStateClient(integer == 0 ? Runtime.getRuntime().availableProcessors() : integer, new DisabledKvStateRequestStats());
        this.lookupService = akkaKvStateLocationLookupService;
        this.kvStateClient = kvStateClient;
        this.executionContext = this.actorSystem.dispatcher();
        this.lookupService.start();
    }

    public QueryableStateClient(KvStateLocationLookupService kvStateLocationLookupService, KvStateClient kvStateClient, ExecutionContext executionContext) {
        this.lookupCache = new ConcurrentHashMap();
        this.lookupService = (KvStateLocationLookupService) Preconditions.checkNotNull(kvStateLocationLookupService, "KvStateLocationLookupService");
        this.kvStateClient = (KvStateClient) Preconditions.checkNotNull(kvStateClient, "KvStateClient");
        this.executionContext = (ExecutionContext) Preconditions.checkNotNull(executionContext, "ExecutionContext");
        this.actorSystem = null;
        this.lookupService.start();
    }

    public ExecutionContext getExecutionContext() {
        return this.executionContext;
    }

    public void shutDown() {
        try {
            this.lookupService.shutDown();
        } catch (Throwable th) {
            LOG.error("Failed to shut down KvStateLookupService", th);
        }
        try {
            this.kvStateClient.shutDown();
        } catch (Throwable th2) {
            LOG.error("Failed to shut down KvStateClient", th2);
        }
        if (this.actorSystem != null) {
            try {
                this.actorSystem.shutdown();
            } catch (Throwable th3) {
                LOG.error("Failed to shut down ActorSystem");
            }
        }
    }

    public Future<byte[]> getKvState(final JobID jobID, final String str, final int i, final byte[] bArr) {
        return getKvState(jobID, str, i, bArr, false).recoverWith(new Recover<Future<byte[]>>() { // from class: org.apache.flink.runtime.query.QueryableStateClient.1
            /* renamed from: recover, reason: merged with bridge method [inline-methods] */
            public Future<byte[]> m597recover(Throwable th) throws Throwable {
                return ((th instanceof UnknownKvStateID) || (th instanceof UnknownKvStateKeyGroupLocation) || (th instanceof UnknownKvStateLocation) || (th instanceof ConnectException)) ? QueryableStateClient.this.getKvState(jobID, str, i, bArr, true) : Futures.failed(th);
            }
        }, this.executionContext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<byte[]> getKvState(JobID jobID, String str, final int i, final byte[] bArr, boolean z) {
        return getKvStateLookupInfo(jobID, str, z).flatMap(new Mapper<KvStateLocation, Future<byte[]>>() { // from class: org.apache.flink.runtime.query.QueryableStateClient.2
            public Future<byte[]> apply(KvStateLocation kvStateLocation) {
                int computeKeyGroupForKeyHash = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(i, kvStateLocation.getNumKeyGroups());
                KvStateServerAddress kvStateServerAddress = kvStateLocation.getKvStateServerAddress(computeKeyGroupForKeyHash);
                if (kvStateServerAddress == null) {
                    return Futures.failed(new UnknownKvStateKeyGroupLocation());
                }
                return QueryableStateClient.this.kvStateClient.getKvState(kvStateServerAddress, kvStateLocation.getKvStateID(computeKeyGroupForKeyHash), bArr);
            }
        }, this.executionContext);
    }

    private Future<KvStateLocation> getKvStateLookupInfo(JobID jobID, String str, boolean z) {
        if (z) {
            Future<KvStateLocation> kvStateLookupInfo = this.lookupService.getKvStateLookupInfo(jobID, str);
            this.lookupCache.put(new Tuple2<>(jobID, str), kvStateLookupInfo);
            return kvStateLookupInfo;
        }
        Tuple2<JobID, String> tuple2 = new Tuple2<>(jobID, str);
        Future<KvStateLocation> future = this.lookupCache.get(tuple2);
        if (future == null) {
            Future<KvStateLocation> kvStateLookupInfo2 = this.lookupService.getKvStateLookupInfo(jobID, str);
            Future<KvStateLocation> putIfAbsent = this.lookupCache.putIfAbsent(tuple2, kvStateLookupInfo2);
            return putIfAbsent == null ? kvStateLookupInfo2 : putIfAbsent;
        }
        if (!future.isCompleted() || !((Try) future.value().get()).isFailure()) {
            return future;
        }
        Future<KvStateLocation> kvStateLookupInfo3 = this.lookupService.getKvStateLookupInfo(jobID, str);
        return this.lookupCache.replace(tuple2, future, kvStateLookupInfo3) ? kvStateLookupInfo3 : this.lookupCache.get(tuple2);
    }
}
