/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorRef;
import akka.actor.Status;
import akka.actor.UntypedActor;
import akka.dispatch.Futures;
import akka.japi.Procedure;
import akka.pattern.Patterns;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
import org.apache.flink.runtime.rpc.akka.messages.Processing;
import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>>
extends UntypedActor {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActor.class);
    private final T rpcEndpoint;
    private final MainThreadValidatorUtil mainThreadValidator;
    private final CompletableFuture<Void> terminationFuture;

    AkkaRpcActor(T rpcEndpoint, CompletableFuture<Void> terminationFuture) {
        this.rpcEndpoint = (RpcEndpoint)Preconditions.checkNotNull(rpcEndpoint, (String)"rpc endpoint");
        this.mainThreadValidator = new MainThreadValidatorUtil((RpcEndpoint<?>)rpcEndpoint);
        this.terminationFuture = (CompletableFuture)Preconditions.checkNotNull(terminationFuture);
    }

    public void postStop() throws Exception {
        super.postStop();
        this.terminationFuture.complete(null);
    }

    public void onReceive(Object message) {
        if (message.equals((Object)Processing.START)) {
            this.getContext().become((Procedure)new Procedure<Object>(){

                public void apply(Object msg) throws Exception {
                    if (msg.equals((Object)Processing.STOP)) {
                        AkkaRpcActor.this.getContext().unbecome();
                    } else {
                        AkkaRpcActor.this.handleMessage(msg);
                    }
                }
            });
        } else {
            LOG.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.", (Object)this.rpcEndpoint.getClass().getName(), (Object)message.getClass().getName());
            if (!this.getSender().equals((Object)ActorRef.noSender())) {
                this.getSender().tell((Object)new Status.Failure((Throwable)new AkkaRpcException("Discard message, because the rpc endpoint has not been started yet.")), this.getSelf());
            }
        }
    }

    private void handleMessage(Object message) {
        this.mainThreadValidator.enterMainThread();
        try {
            if (message instanceof RunAsync) {
                this.handleRunAsync((RunAsync)message);
            } else if (message instanceof CallAsync) {
                this.handleCallAsync((CallAsync)message);
            } else if (message instanceof RpcInvocation) {
                this.handleRpcInvocation((RpcInvocation)message);
            } else {
                LOG.warn("Received message of unknown type {} with value {}. Dropping this message!", (Object)message.getClass().getName(), message);
            }
        }
        finally {
            this.mainThreadValidator.exitMainThread();
        }
    }

    private void handleRpcInvocation(RpcInvocation rpcInvocation) {
        block14: {
            RpcConnectionException rpcException;
            Method rpcMethod = null;
            try {
                String methodName = rpcInvocation.getMethodName();
                Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
                rpcMethod = this.lookupRpcMethod(methodName, parameterTypes);
            }
            catch (ClassNotFoundException e) {
                LOG.error("Could not load method arguments.", (Throwable)e);
                rpcException = new RpcConnectionException("Could not load method arguments.", e);
                this.getSender().tell((Object)new Status.Failure((Throwable)rpcException), this.getSelf());
            }
            catch (IOException e) {
                LOG.error("Could not deserialize rpc invocation message.", (Throwable)e);
                rpcException = new RpcConnectionException("Could not deserialize rpc invocation message.", e);
                this.getSender().tell((Object)new Status.Failure((Throwable)rpcException), this.getSelf());
            }
            catch (NoSuchMethodException e) {
                LOG.error("Could not find rpc method for rpc invocation.", (Throwable)e);
                rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", e);
                this.getSender().tell((Object)new Status.Failure((Throwable)rpcException), this.getSelf());
            }
            if (rpcMethod != null) {
                try {
                    Object result;
                    if (rpcMethod.getReturnType().equals(Void.TYPE)) {
                        rpcMethod.invoke(this.rpcEndpoint, rpcInvocation.getArgs());
                        break block14;
                    }
                    try {
                        result = rpcMethod.invoke(this.rpcEndpoint, rpcInvocation.getArgs());
                    }
                    catch (InvocationTargetException e) {
                        LOG.trace("Reporting back error thrown in remote procedure {}", (Object)rpcMethod, (Object)e);
                        this.getSender().tell((Object)new Status.Failure(e.getTargetException()), this.getSelf());
                        return;
                    }
                    if (result instanceof org.apache.flink.runtime.concurrent.Future) {
                        final org.apache.flink.runtime.concurrent.Future future = (org.apache.flink.runtime.concurrent.Future)result;
                        if (future instanceof FlinkFuture) {
                            FlinkFuture flinkFuture = (FlinkFuture)future;
                            Patterns.pipe(flinkFuture.getScalaFuture(), (ExecutionContext)this.getContext().dispatcher()).to(this.getSender());
                        } else {
                            Patterns.pipe((Future)Futures.future((Callable)new Callable<Object>(){

                                @Override
                                public Object call() throws Exception {
                                    return future.get();
                                }
                            }, (ExecutionContext)this.getContext().dispatcher()), (ExecutionContext)this.getContext().dispatcher());
                        }
                    } else {
                        this.getSender().tell((Object)new Status.Success(result), this.getSelf());
                    }
                }
                catch (Throwable e) {
                    LOG.error("Error while executing remote procedure call {}.", (Object)rpcMethod, (Object)e);
                    this.getSender().tell((Object)new Status.Failure(e), this.getSelf());
                }
            }
        }
    }

    private void handleCallAsync(CallAsync callAsync) {
        if (callAsync.getCallable() == null) {
            String result = "Received a " + callAsync.getClass().getName() + " message with an empty callable field. This indicates that this message has been serialized prior to sending the message. The " + callAsync.getClass().getName() + " is only supported with local communication.";
            LOG.warn(result);
            this.getSender().tell((Object)new Status.Failure((Throwable)new Exception(result)), this.getSelf());
        } else {
            try {
                Object result = callAsync.getCallable().call();
                this.getSender().tell((Object)new Status.Success(result), this.getSelf());
            }
            catch (Throwable e) {
                this.getSender().tell((Object)new Status.Failure(e), this.getSelf());
            }
        }
    }

    private void handleRunAsync(RunAsync runAsync) {
        if (runAsync.getRunnable() == null) {
            LOG.warn("Received a {} message with an empty runnable field. This indicates that this message has been serialized prior to sending the message. The {} is only supported with local communication.", (Object)runAsync.getClass().getName(), (Object)runAsync.getClass().getName());
        } else {
            long delayNanos;
            long timeToRun = runAsync.getTimeNanos();
            if (timeToRun == 0L || (delayNanos = timeToRun - System.nanoTime()) <= 0L) {
                try {
                    runAsync.getRunnable().run();
                }
                catch (Throwable t) {
                    LOG.error("Caught exception while executing runnable in main thread.", t);
                    ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
                }
            } else {
                FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
                RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);
                this.getContext().system().scheduler().scheduleOnce(delay, this.getSelf(), (Object)message, (ExecutionContext)this.getContext().dispatcher(), ActorRef.noSender());
            }
        }
    }

    private Method lookupRpcMethod(String methodName, Class<?>[] parameterTypes) throws NoSuchMethodException {
        return this.rpcEndpoint.getClass().getMethod(methodName, parameterTypes);
    }
}

