package org.apache.flink.runtime.rpc.pekko;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.runtime.concurrent.ClassLoadingUtils;
import org.apache.flink.runtime.rpc.Local;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.exceptions.EndpointNotStartedException;
import org.apache.flink.runtime.rpc.exceptions.HandshakeException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.runtime.rpc.pekko.exceptions.RpcInvalidStateException;
import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Status;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.class */
public class PekkoRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected final T rpcEndpoint;
    private final ClassLoader flinkClassLoader;
    private final MainThreadValidatorUtil mainThreadValidator;
    private final CompletableFuture<Boolean> terminationFuture;
    private final int version;
    private final long maximumFramesize;
    private final AtomicBoolean rpcEndpointStopped;
    private final boolean forceSerialization;
    private final Map<String, String> loggingContext;
    private volatile RpcEndpointTerminationResult rpcEndpointTerminationResult;

    @Nonnull
    private State state;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/pekko/PekkoRpcActor$RpcEndpointTerminationResult.class */
    public static final class RpcEndpointTerminationResult {
        private static final RpcEndpointTerminationResult SUCCESS = new RpcEndpointTerminationResult(null);

        @Nullable
        private final Throwable failureCause;

        private RpcEndpointTerminationResult(@Nullable Throwable th) {
            this.failureCause = th;
        }

        public boolean isSuccess() {
            return this.failureCause == null;
        }

        public Throwable getFailureCause() {
            Preconditions.checkState(this.failureCause != null);
            return this.failureCause;
        }

        private static RpcEndpointTerminationResult success() {
            return SUCCESS;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static RpcEndpointTerminationResult failure(Throwable th) {
            return new RpcEndpointTerminationResult(th);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static RpcEndpointTerminationResult of(@Nullable Throwable th) {
            return th == null ? success() : failure(th);
        }

        static /* synthetic */ RpcEndpointTerminationResult access$400() {
            return success();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/pekko/PekkoRpcActor$StartedState.class */
    public enum StartedState implements State {
        STARTED;

        @Override // org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.State
        public State start(PekkoRpcActor<?> pekkoRpcActor, ClassLoader classLoader) {
            return STARTED;
        }

        @Override // org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.State
        public State stop() {
            return StoppedState.STOPPED;
        }

        @Override // org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.State
        public State terminate(PekkoRpcActor<?> pekkoRpcActor, ClassLoader classLoader) {
            CompletableFuture completedExceptionally;
            ((PekkoRpcActor) pekkoRpcActor).mainThreadValidator.enterMainThread();
            try {
                try {
                    completedExceptionally = (CompletableFuture) ClassLoadingUtils.runWithContextClassLoader(() -> {
                        return pekkoRpcActor.rpcEndpoint.internalCallOnStop();
                    }, classLoader);
                    ((PekkoRpcActor) pekkoRpcActor).mainThreadValidator.exitMainThread();
                } catch (Throwable th) {
                    completedExceptionally = FutureUtils.completedExceptionally(new RpcException(String.format("Failure while stopping RpcEndpoint %s.", pekkoRpcActor.rpcEndpoint.getEndpointId()), th));
                    ((PekkoRpcActor) pekkoRpcActor).mainThreadValidator.exitMainThread();
                }
                completedExceptionally.whenComplete((r4, th2) -> {
                    pekkoRpcActor.stop(RpcEndpointTerminationResult.of(th2));
                });
                return TerminatingState.TERMINATING;
            } catch (Throwable th3) {
                ((PekkoRpcActor) pekkoRpcActor).mainThreadValidator.exitMainThread();
                throw th3;
            }
        }

        @Override // org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.State
        public boolean isRunning() {
            return true;
        }
    }

    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/pekko/PekkoRpcActor$State.class */
    interface State {
        default State start(PekkoRpcActor<?> pekkoRpcActor, ClassLoader classLoader) {
            throw new RpcInvalidStateException(invalidStateTransitionMessage(StartedState.STARTED));
        }

        default State stop() {
            throw new RpcInvalidStateException(invalidStateTransitionMessage(StoppedState.STOPPED));
        }

        default State terminate(PekkoRpcActor<?> pekkoRpcActor, ClassLoader classLoader) {
            throw new RpcInvalidStateException(invalidStateTransitionMessage(TerminatingState.TERMINATING));
        }

        default State finishTermination() {
            return TerminatedState.TERMINATED;
        }

        default boolean isRunning() {
            return false;
        }

        default String invalidStateTransitionMessage(State state) {
            return String.format("RpcActor is currently in state %s and cannot go into state %s.", this, state);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/pekko/PekkoRpcActor$StoppedState.class */
    public enum StoppedState implements State {
        STOPPED;

        @Override // org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.State
        public State start(PekkoRpcActor<?> pekkoRpcActor, ClassLoader classLoader) {
            ((PekkoRpcActor) pekkoRpcActor).mainThreadValidator.enterMainThread();
            try {
                try {
                    ClassLoadingUtils.runWithContextClassLoader(() -> {
                        pekkoRpcActor.rpcEndpoint.internalCallOnStart();
                    }, classLoader);
                    ((PekkoRpcActor) pekkoRpcActor).mainThreadValidator.exitMainThread();
                } catch (Throwable th) {
                    pekkoRpcActor.stop(RpcEndpointTerminationResult.failure(new RpcException(String.format("Could not start RpcEndpoint %s.", pekkoRpcActor.rpcEndpoint.getEndpointId()), th)));
                    ((PekkoRpcActor) pekkoRpcActor).mainThreadValidator.exitMainThread();
                }
                return StartedState.STARTED;
            } catch (Throwable th2) {
                ((PekkoRpcActor) pekkoRpcActor).mainThreadValidator.exitMainThread();
                throw th2;
            }
        }

        @Override // org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.State
        public State stop() {
            return STOPPED;
        }

        @Override // org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.State
        public State terminate(PekkoRpcActor<?> pekkoRpcActor, ClassLoader classLoader) {
            pekkoRpcActor.stop(RpcEndpointTerminationResult.access$400());
            return TerminatingState.TERMINATING;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/pekko/PekkoRpcActor$TerminatedState.class */
    public enum TerminatedState implements State {
        TERMINATED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/pekko/PekkoRpcActor$TerminatingState.class */
    public enum TerminatingState implements State {
        TERMINATING;

        @Override // org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.State
        public State terminate(PekkoRpcActor<?> pekkoRpcActor, ClassLoader classLoader) {
            return TERMINATING;
        }

        @Override // org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.State
        public boolean isRunning() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PekkoRpcActor(T t, CompletableFuture<Boolean> completableFuture, int i, long j, boolean z, ClassLoader classLoader, Map<String, String> map) {
        this.loggingContext = map;
        Preconditions.checkArgument(j > 0, "Maximum framesize must be positive.");
        this.rpcEndpoint = (T) ((RpcEndpoint) Preconditions.checkNotNull(t, "rpc endpoint"));
        this.flinkClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.forceSerialization = z;
        this.mainThreadValidator = new MainThreadValidatorUtil(t);
        this.terminationFuture = (CompletableFuture) Preconditions.checkNotNull(completableFuture);
        this.version = i;
        this.maximumFramesize = j;
        this.rpcEndpointStopped = new AtomicBoolean(false);
        this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(new RpcException(String.format("RpcEndpoint %s has not been properly stopped.", t.getEndpointId())));
        this.state = StoppedState.STOPPED;
    }

    @Override // org.apache.pekko.actor.AbstractActor, org.apache.pekko.actor.Actor
    public void postStop() throws Exception {
        super.postStop();
        if (this.rpcEndpointTerminationResult.isSuccess()) {
            this.log.debug("The RpcEndpoint {} terminated successfully.", this.rpcEndpoint.getEndpointId());
            this.terminationFuture.complete(null);
        } else {
            this.log.info("The RpcEndpoint {} failed.", this.rpcEndpoint.getEndpointId(), this.rpcEndpointTerminationResult.getFailureCause());
            this.terminationFuture.completeExceptionally(this.rpcEndpointTerminationResult.getFailureCause());
        }
        this.state = this.state.finishTermination();
    }

    @Override // org.apache.pekko.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(RemoteHandshakeMessage.class, this::handleHandshakeMessage).match(ControlMessages.class, this::handleControlMessage).matchAny(this::handleMessage).build();
    }

    private void handleMessage(Object obj) {
        MdcUtils.MdcCloseable withContext = MdcUtils.withContext(this.loggingContext);
        Throwable th = null;
        try {
            if (this.state.isRunning()) {
                this.mainThreadValidator.enterMainThread();
                try {
                    handleRpcMessage(obj);
                    this.mainThreadValidator.exitMainThread();
                } catch (Throwable th2) {
                    this.mainThreadValidator.exitMainThread();
                    throw th2;
                }
            } else {
                this.log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.", this.rpcEndpoint.getClass().getName(), obj);
                sendErrorIfSender(new EndpointNotStartedException(String.format("Discard message %s, because the rpc endpoint %s has not been started yet.", obj, getSelf().path())));
            }
            if (withContext != null) {
                if (0 == 0) {
                    withContext.close();
                    return;
                }
                try {
                    withContext.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (withContext != null) {
                if (0 != 0) {
                    try {
                        withContext.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    withContext.close();
                }
            }
            throw th4;
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r7v1 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Failed to calculate best type for var: r8v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 7, insn: 0x009a: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r7 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:27:0x009a */
    /* JADX WARN: Not initialized variable reg: 8, insn: 0x009e: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r8 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:29:0x009e */
    /* JADX WARN: Type inference failed for: r7v1, types: [org.apache.flink.util.MdcUtils$MdcCloseable] */
    /* JADX WARN: Type inference failed for: r8v0, types: [java.lang.Throwable] */
    private void handleControlMessage(ControlMessages controlMessages) {
        try {
            try {
                MdcUtils.MdcCloseable withContext = MdcUtils.withContext(this.loggingContext);
                Throwable th = null;
                switch (controlMessages) {
                    case START:
                        this.state = this.state.start(this, this.flinkClassLoader);
                        break;
                    case STOP:
                        this.state = this.state.stop();
                        break;
                    case TERMINATE:
                        this.state = this.state.terminate(this, this.flinkClassLoader);
                        break;
                    default:
                        handleUnknownControlMessage(controlMessages);
                        break;
                }
                if (withContext != null) {
                    if (0 != 0) {
                        try {
                            withContext.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        withContext.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(e);
            throw e;
        }
    }

    private void handleUnknownControlMessage(ControlMessages controlMessages) {
        String format = String.format("Received unknown control message %s. Dropping this message!", controlMessages);
        this.log.warn(format);
        sendErrorIfSender(new UnknownMessageException(format));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleRpcMessage(Object obj) {
        if (obj instanceof RunAsync) {
            handleRunAsync((RunAsync) obj);
            return;
        }
        if (obj instanceof CallAsync) {
            handleCallAsync((CallAsync) obj);
        } else if (obj instanceof RpcInvocation) {
            handleRpcInvocation((RpcInvocation) obj);
        } else {
            this.log.warn("Received message of unknown type {} with value {}. Dropping this message!", obj.getClass().getName(), obj);
            sendErrorIfSender(new UnknownMessageException("Received unknown message " + obj + " of type " + obj.getClass().getSimpleName() + '.'));
        }
    }

    private void handleHandshakeMessage(RemoteHandshakeMessage remoteHandshakeMessage) {
        MdcUtils.MdcCloseable withContext = MdcUtils.withContext(this.loggingContext);
        Throwable th = null;
        try {
            if (!isCompatibleVersion(remoteHandshakeMessage.getVersion())) {
                sendErrorIfSender(new HandshakeException(String.format("Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.", Integer.valueOf(remoteHandshakeMessage.getVersion()), Integer.valueOf(getVersion()))));
            } else if (isGatewaySupported(remoteHandshakeMessage.getRpcGateway())) {
                getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
            } else {
                sendErrorIfSender(new HandshakeException(String.format("The rpc endpoint does not support the gateway %s.", remoteHandshakeMessage.getRpcGateway().getSimpleName())));
            }
            if (withContext != null) {
                if (0 == 0) {
                    withContext.close();
                    return;
                }
                try {
                    withContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (withContext != null) {
                if (0 != 0) {
                    try {
                        withContext.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    withContext.close();
                }
            }
            throw th3;
        }
    }

    private boolean isGatewaySupported(Class<?> cls) {
        return cls.isAssignableFrom(this.rpcEndpoint.getClass());
    }

    private boolean isCompatibleVersion(int i) {
        return i == getVersion();
    }

    private int getVersion() {
        return this.version;
    }

    private void handleRpcInvocation(RpcInvocation rpcInvocation) {
        Method method = null;
        try {
            method = lookupRpcMethod(rpcInvocation.getMethodName(), rpcInvocation.getParameterTypes());
        } catch (NoSuchMethodException e) {
            this.log.error("Could not find rpc method for rpc invocation.", e);
            getSender().tell(new Status.Failure(new RpcConnectionException("Could not find rpc method for rpc invocation.", e)), getSelf());
        }
        if (method != null) {
            try {
                method.setAccessible(true);
                Method method2 = method;
                if (method.getReturnType().equals(Void.TYPE)) {
                    ClassLoadingUtils.runWithContextClassLoader(() -> {
                        return method2.invoke(this.rpcEndpoint, rpcInvocation.getArgs());
                    }, this.flinkClassLoader);
                } else {
                    try {
                        Object runWithContextClassLoader = ClassLoadingUtils.runWithContextClassLoader(() -> {
                            return method2.invoke(this.rpcEndpoint, rpcInvocation.getArgs());
                        }, this.flinkClassLoader);
                        String name = method.getName();
                        boolean z = method.getAnnotation(Local.class) != null;
                        if (runWithContextClassLoader instanceof CompletableFuture) {
                            sendAsyncResponse((CompletableFuture) runWithContextClassLoader, name, z);
                        } else {
                            sendSyncResponse(runWithContextClassLoader, name, z);
                        }
                    } catch (InvocationTargetException e2) {
                        this.log.debug("Reporting back error thrown in remote procedure {}", method, e2);
                        getSender().tell(new Status.Failure(e2.getTargetException()), getSelf());
                    }
                }
            } catch (Throwable th) {
                this.log.error("Error while executing remote procedure call {}.", method, th);
                getSender().tell(new Status.Failure(th), getSelf());
            }
        }
    }

    private void sendSyncResponse(Object obj, String str, boolean z) {
        if (!isRemoteSender(getSender()) && (!this.forceSerialization || z)) {
            getSender().tell(new Status.Success(obj), getSelf());
            return;
        }
        Either<RpcSerializedValue, RpcException> serializeRemoteResultAndVerifySize = serializeRemoteResultAndVerifySize(obj, str);
        if (serializeRemoteResultAndVerifySize.isLeft()) {
            getSender().tell(new Status.Success(serializeRemoteResultAndVerifySize.left()), getSelf());
        } else {
            getSender().tell(new Status.Failure((Throwable) serializeRemoteResultAndVerifySize.right()), getSelf());
        }
    }

    private void sendAsyncResponse(CompletableFuture<?> completableFuture, String str, boolean z) {
        ActorRef sender = getSender();
        Promise.DefaultPromise defaultPromise = new Promise.DefaultPromise();
        FutureUtils.assertNoException(completableFuture.handle((obj, th) -> {
            if (th != null) {
                defaultPromise.failure(th);
                return null;
            }
            if (!isRemoteSender(sender) && (!this.forceSerialization || z)) {
                defaultPromise.success(new Status.Success(obj));
                return null;
            }
            Either<RpcSerializedValue, RpcException> serializeRemoteResultAndVerifySize = serializeRemoteResultAndVerifySize(obj, str);
            if (serializeRemoteResultAndVerifySize.isLeft()) {
                defaultPromise.success(serializeRemoteResultAndVerifySize.left());
                return null;
            }
            defaultPromise.failure((Throwable) serializeRemoteResultAndVerifySize.right());
            return null;
        }));
        Patterns.pipe(defaultPromise.future(), getContext().dispatcher()).to(sender);
    }

    private boolean isRemoteSender(ActorRef actorRef) {
        return !actorRef.path().address().hasLocalScope();
    }

    private Either<RpcSerializedValue, RpcException> serializeRemoteResultAndVerifySize(Object obj, String str) {
        try {
            RpcSerializedValue valueOf = RpcSerializedValue.valueOf(obj);
            long serializedDataLength = valueOf.getSerializedDataLength();
            return serializedDataLength > this.maximumFramesize ? Either.Right(new RpcException("The method " + str + "'s result size " + serializedDataLength + " exceeds the maximum size " + this.maximumFramesize + " .")) : Either.Left(valueOf);
        } catch (IOException e) {
            return Either.Right(new RpcException("Failed to serialize the result for RPC call : " + str + '.', e));
        }
    }

    private void handleCallAsync(CallAsync callAsync) {
        try {
            getSender().tell(new Status.Success(ClassLoadingUtils.runWithContextClassLoader(() -> {
                return callAsync.getCallable().call();
            }, this.flinkClassLoader)), getSelf());
        } catch (Throwable th) {
            getSender().tell(new Status.Failure(th), getSelf());
        }
    }

    private void handleRunAsync(RunAsync runAsync) {
        long timeNanos = runAsync.getTimeNanos();
        if (timeNanos != 0) {
            long nanoTime = timeNanos - System.nanoTime();
            if (nanoTime > 0) {
                getContext().system().scheduler().scheduleOnce(new FiniteDuration(nanoTime, TimeUnit.NANOSECONDS), getSelf(), envelopeSelfMessage(new RunAsync(runAsync.getRunnable(), timeNanos)), getContext().dispatcher(), ActorRef.noSender());
                return;
            }
        }
        try {
            ClassLoadingUtils.runWithContextClassLoader(() -> {
                runAsync.getRunnable().run();
            }, this.flinkClassLoader);
        } catch (Throwable th) {
            this.log.error("Caught exception while executing runnable in main thread.", th);
            ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
        }
    }

    private Method lookupRpcMethod(String str, Class<?>[] clsArr) throws NoSuchMethodException {
        return this.rpcEndpoint.getClass().getMethod(str, clsArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendErrorIfSender(Throwable th) {
        if (getSender().equals(ActorRef.noSender())) {
            return;
        }
        getSender().tell(new Status.Failure(th), getSelf());
    }

    protected Object envelopeSelfMessage(Object obj) {
        return obj;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stop(RpcEndpointTerminationResult rpcEndpointTerminationResult) {
        if (this.rpcEndpointStopped.compareAndSet(false, true)) {
            this.rpcEndpointTerminationResult = rpcEndpointTerminationResult;
            getContext().stop(getSelf());
        }
    }
}
