package org.apache.flink.runtime.rpc.akka;

import akka.AkkaException;
import akka.actor.AbstractActor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ChildRestartStats;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.japi.pf.DeciderBuilder;
import akka.pattern.Patterns;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.PartialFunction;
import scala.collection.Iterable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/SupervisorActor.class */
public class SupervisorActor extends AbstractActor {
    private static final Logger LOG = LoggerFactory.getLogger(SupervisorActor.class);
    private final Executor terminationFutureExecutor;
    private final Map<ActorRef, AkkaRpcActorRegistration> registeredAkkaRpcActors = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/SupervisorActor$ActorRegistration.class */
    public static final class ActorRegistration {
        private final ActorRef actorRef;
        private final CompletableFuture<Void> terminationFuture;

        private ActorRegistration(ActorRef actorRef, CompletableFuture<Void> completableFuture) {
            this.actorRef = actorRef;
            this.terminationFuture = completableFuture;
        }

        public ActorRef getActorRef() {
            return this.actorRef;
        }

        public CompletableFuture<Void> getTerminationFuture() {
            return this.terminationFuture;
        }

        public static ActorRegistration create(ActorRef actorRef, CompletableFuture<Void> completableFuture) {
            return new ActorRegistration(actorRef, completableFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/SupervisorActor$AkkaRpcActorRegistration.class */
    public static final class AkkaRpcActorRegistration {
        private final String endpointId;
        private final CompletableFuture<Void> internalTerminationFuture;
        private final CompletableFuture<Void> externalTerminationFuture;

        @Nullable
        private Throwable errorCause;

        private AkkaRpcActorRegistration(String str) {
            this.endpointId = str;
            this.internalTerminationFuture = new CompletableFuture<>();
            this.externalTerminationFuture = new CompletableFuture<>();
            this.errorCause = null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CompletableFuture<Void> getInternalTerminationFuture() {
            return this.internalTerminationFuture;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CompletableFuture<Void> getExternalTerminationFuture() {
            return this.externalTerminationFuture;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getEndpointId() {
            return this.endpointId;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void terminate(Executor executor) {
            CompletableFuture<Void> completableFuture = this.internalTerminationFuture;
            if (this.errorCause == null) {
                this.internalTerminationFuture.completeExceptionally(new AkkaRpcException(String.format("RpcEndpoint %s did not complete the internal termination future.", this.endpointId)));
            } else if (!this.internalTerminationFuture.completeExceptionally(this.errorCause)) {
                completableFuture = this.internalTerminationFuture.handle((r5, th) -> {
                    if (th != null) {
                        this.errorCause.addSuppressed(th);
                    }
                    throw new CompletionException(this.errorCause);
                });
            }
            FutureUtils.forwardAsync(completableFuture, this.externalTerminationFuture, executor);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void terminateExceptionally(Throwable th, Executor executor) {
            executor.execute(() -> {
                this.externalTerminationFuture.completeExceptionally(th);
            });
        }

        public void markFailed(Throwable th) {
            if (this.errorCause == null) {
                this.errorCause = th;
            } else {
                this.errorCause.addSuppressed(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/SupervisorActor$StartAkkaRpcActor.class */
    public static final class StartAkkaRpcActor {
        private final PropsFactory propsFactory;
        private final String endpointId;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/SupervisorActor$StartAkkaRpcActor$PropsFactory.class */
        public interface PropsFactory {
            Props create(CompletableFuture<Void> completableFuture);
        }

        private StartAkkaRpcActor(PropsFactory propsFactory, String str) {
            this.propsFactory = propsFactory;
            this.endpointId = str;
        }

        public String getEndpointId() {
            return this.endpointId;
        }

        public PropsFactory getPropsFactory() {
            return this.propsFactory;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static StartAkkaRpcActor create(PropsFactory propsFactory, String str) {
            return new StartAkkaRpcActor(propsFactory, str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/SupervisorActor$StartAkkaRpcActorResponse.class */
    public static final class StartAkkaRpcActorResponse {

        @Nullable
        private final ActorRegistration actorRegistration;

        @Nullable
        private final Throwable error;

        private StartAkkaRpcActorResponse(@Nullable ActorRegistration actorRegistration, @Nullable Throwable th) {
            this.actorRegistration = actorRegistration;
            this.error = th;
        }

        public <X extends Throwable> ActorRegistration orElseThrow(Function<? super Throwable, ? extends X> function) throws Throwable {
            if (this.actorRegistration != null) {
                return this.actorRegistration;
            }
            throw function.apply(this.error);
        }

        public static StartAkkaRpcActorResponse success(ActorRegistration actorRegistration) {
            return new StartAkkaRpcActorResponse(actorRegistration, null);
        }

        public static StartAkkaRpcActorResponse failure(Throwable th) {
            return new StartAkkaRpcActorResponse(null, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/SupervisorActor$SupervisorActorSupervisorStrategy.class */
    public final class SupervisorActorSupervisorStrategy extends SupervisorStrategy {
        private SupervisorActorSupervisorStrategy() {
        }

        public PartialFunction<Throwable, SupervisorStrategy.Directive> decider() {
            return DeciderBuilder.match(Exception.class, exc -> {
                return SupervisorStrategy.stop();
            }).build();
        }

        public boolean loggingEnabled() {
            return false;
        }

        public void handleChildTerminated(ActorContext actorContext, ActorRef actorRef, Iterable<ActorRef> iterable) {
            SupervisorActor.this.akkaRpcActorTerminated(actorRef);
        }

        public void processFailure(ActorContext actorContext, boolean z, ActorRef actorRef, Throwable th, ChildRestartStats childRestartStats, Iterable<ChildRestartStats> iterable) {
            Preconditions.checkArgument(!z, "The supervisor strategy should never restart an actor.");
            SupervisorActor.this.akkaRpcActorFailed(actorRef, th);
        }
    }

    SupervisorActor(Executor executor) {
        this.terminationFutureExecutor = executor;
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(StartAkkaRpcActor.class, this::createStartAkkaRpcActorMessage).matchAny(this::handleUnknownMessage).build();
    }

    public void postStop() throws Exception {
        LOG.debug("Stopping supervisor actor.");
        super.postStop();
        Iterator<AkkaRpcActorRegistration> it = this.registeredAkkaRpcActors.values().iterator();
        while (it.hasNext()) {
            terminateAkkaRpcActorOnStop(it.next());
        }
        this.registeredAkkaRpcActors.clear();
    }

    /* renamed from: supervisorStrategy, reason: merged with bridge method [inline-methods] */
    public SupervisorActorSupervisorStrategy m2301supervisorStrategy() {
        return new SupervisorActorSupervisorStrategy();
    }

    private void terminateAkkaRpcActorOnStop(AkkaRpcActorRegistration akkaRpcActorRegistration) {
        akkaRpcActorRegistration.terminateExceptionally(new AkkaRpcException(String.format("Unexpected closing of %s with name %s.", getClass().getSimpleName(), akkaRpcActorRegistration.getEndpointId())), this.terminationFutureExecutor);
    }

    private void createStartAkkaRpcActorMessage(StartAkkaRpcActor startAkkaRpcActor) {
        String endpointId = startAkkaRpcActor.getEndpointId();
        AkkaRpcActorRegistration akkaRpcActorRegistration = new AkkaRpcActorRegistration(endpointId);
        Props create = startAkkaRpcActor.getPropsFactory().create(akkaRpcActorRegistration.getInternalTerminationFuture());
        LOG.debug("Starting {} with name {}.", create.actorClass().getSimpleName(), endpointId);
        try {
            ActorRef actorOf = getContext().actorOf(create, endpointId);
            this.registeredAkkaRpcActors.put(actorOf, akkaRpcActorRegistration);
            getSender().tell(StartAkkaRpcActorResponse.success(ActorRegistration.create(actorOf, akkaRpcActorRegistration.getExternalTerminationFuture())), getSelf());
        } catch (AkkaException e) {
            getSender().tell(StartAkkaRpcActorResponse.failure(e), getSelf());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void akkaRpcActorTerminated(ActorRef actorRef) {
        AkkaRpcActorRegistration removeAkkaRpcActor = removeAkkaRpcActor(actorRef);
        LOG.debug("AkkaRpcActor {} has terminated.", actorRef.path());
        removeAkkaRpcActor.terminate(this.terminationFutureExecutor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void akkaRpcActorFailed(ActorRef actorRef, Throwable th) {
        LOG.warn("AkkaRpcActor {} has failed. Shutting it down now.", actorRef.path(), th);
        for (Map.Entry<ActorRef, AkkaRpcActorRegistration> entry : this.registeredAkkaRpcActors.entrySet()) {
            ActorRef key = entry.getKey();
            if (key.equals(actorRef)) {
                entry.getValue().markFailed(new AkkaRpcException(String.format("Stopping actor %s because it failed.", actorRef.path()), th));
            } else {
                entry.getValue().markFailed(new AkkaRpcException(String.format("Stopping actor %s because its sibling %s has failed.", key.path(), actorRef.path())));
            }
        }
        getContext().getSystem().terminate();
    }

    private AkkaRpcActorRegistration removeAkkaRpcActor(ActorRef actorRef) {
        return (AkkaRpcActorRegistration) Optional.ofNullable(this.registeredAkkaRpcActors.remove(actorRef)).orElseThrow(() -> {
            return new IllegalStateException(String.format("Could not find actor %s.", actorRef.path()));
        });
    }

    private void handleUnknownMessage(Object obj) {
        AkkaUnknownMessageException akkaUnknownMessageException = new AkkaUnknownMessageException(String.format("Cannot handle unknown message %s.", obj));
        getSender().tell(new Status.Failure(akkaUnknownMessageException), getSelf());
        throw akkaUnknownMessageException;
    }

    public static String getActorName() {
        return "rpc";
    }

    public static ActorRef startSupervisorActor(ActorSystem actorSystem, Executor executor) {
        return actorSystem.actorOf(Props.create(SupervisorActor.class, new Object[]{executor}).withDispatcher("akka.actor.supervisor-dispatcher"), getActorName());
    }

    public static StartAkkaRpcActorResponse startAkkaRpcActor(ActorRef actorRef, StartAkkaRpcActor.PropsFactory propsFactory, String str) {
        CompletableFuture completableFuture = Patterns.ask(actorRef, createStartAkkaRpcActorMessage(propsFactory, str), RpcUtils.INF_DURATION).toCompletableFuture();
        Class<StartAkkaRpcActorResponse> cls = StartAkkaRpcActorResponse.class;
        StartAkkaRpcActorResponse.class.getClass();
        return (StartAkkaRpcActorResponse) completableFuture.thenApply(cls::cast).join();
    }

    public static StartAkkaRpcActor createStartAkkaRpcActorMessage(StartAkkaRpcActor.PropsFactory propsFactory, String str) {
        return StartAkkaRpcActor.create(propsFactory, str);
    }
}
