/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.akka;

import akka.actor.ActorNotFound;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ActorSystem$;
import akka.actor.Address;
import akka.pattern.AskableActorRef$;
import akka.util.Timeout$;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.RemoteAddressExtension$;
import org.apache.flink.runtime.akka.RemoteAddressExtensionImplementation;
import org.apache.flink.util.NetUtils;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Slf4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.concurrent.Await$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.Duration$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxesRunTime;

public final class AkkaUtils$ {
    public static final AkkaUtils$ MODULE$;
    private final Logger LOG;
    private final FiniteDuration INF_TIMEOUT;

    static {
        new AkkaUtils$();
    }

    public Logger LOG() {
        return this.LOG;
    }

    public FiniteDuration INF_TIMEOUT() {
        return this.INF_TIMEOUT;
    }

    public ActorSystem createLocalActorSystem(Configuration configuration) {
        Config akkaConfig = this.getAkkaConfig(configuration, (Option<Tuple2<String, Object>>)None$.MODULE$);
        return this.createActorSystem(akkaConfig);
    }

    public ActorSystem createActorSystem(Configuration configuration, Option<Tuple2<String, Object>> listeningAddress) {
        Config akkaConfig = this.getAkkaConfig(configuration, listeningAddress);
        return this.createActorSystem(akkaConfig);
    }

    public ActorSystem createActorSystem(Config akkaConfig) {
        InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
        return ActorSystem$.MODULE$.create("flink", akkaConfig);
    }

    public ActorSystem createDefaultActorSystem() {
        return this.createActorSystem(this.getDefaultAkkaConfig());
    }

    public Config getAkkaConfig(Configuration configuration, Option<Tuple2<String, Object>> listeningAddress) throws UnknownHostException {
        Option<Tuple2<String, Object>> option;
        block4: {
            Config config;
            block3: {
                Config defaultConfig;
                block2: {
                    Some some;
                    Tuple2 tuple2;
                    defaultConfig = this.getBasicAkkaConfig(configuration);
                    option = listeningAddress;
                    if (!(option instanceof Some) || (tuple2 = (Tuple2)(some = (Some)option).x()) == null) break block2;
                    String hostname = (String)tuple2._1();
                    int port = tuple2._2$mcI$sp();
                    InetAddress ipAddress = InetAddress.getByName(hostname);
                    String hostString = new StringBuilder().append((Object)"\"").append((Object)NetUtils.ipAddressToUrlString((InetAddress)ipAddress)).append((Object)"\"").toString();
                    Config remoteConfig = this.getRemoteAkkaConfig(configuration, hostString, port);
                    config = remoteConfig.withFallback((ConfigMergeable)defaultConfig);
                    break block3;
                }
                None$ none$ = None$.MODULE$;
                Option<Tuple2<String, Object>> option2 = option;
                if (none$ != null ? !none$.equals(option2) : option2 != null) break block4;
                config = defaultConfig;
            }
            return config;
        }
        throw new MatchError(option);
    }

    public Config getDefaultAkkaConfig() {
        return this.getAkkaConfig(new Configuration(), (Option<Tuple2<String, Object>>)new Some((Object)new Tuple2((Object)"", (Object)BoxesRunTime.boxToInteger((int)0))));
    }

    private Config getBasicAkkaConfig(Configuration configuration) {
        int akkaThroughput = configuration.getInteger("akka.throughput", ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT);
        boolean lifecycleEvents = configuration.getBoolean("akka.log.lifecycle.events", ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS);
        String jvmExitOnFatalError = configuration.getBoolean("akka.jvm-exit-on-fatal-error", false) ? "on" : "off";
        String logLifecycleEvents = lifecycleEvents ? "on" : "off";
        String logLevel = this.getLogLevel();
        String config = new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n        |akka {\n        | daemonic = on\n        |\n        | loggers = [\"akka.event.slf4j.Slf4jLogger\"]\n        | logging-filter = \"akka.event.slf4j.Slf4jLoggingFilter\"\n        | log-config-on-start = off\n        |\n        | jvm-exit-on-fatal-error = ", "\n        |\n        | serialize-messages = off\n        |\n        | loglevel = ", "\n        | stdout-loglevel = OFF\n        |\n        | log-dead-letters = ", "\n        | log-dead-letters-during-shutdown = ", "\n        |\n        | actor {\n        |   guardian-supervisor-strategy = \"akka.actor.StoppingSupervisorStrategy\"\n        |   default-dispatcher {\n        |     throughput = ", "\n        |\n        |     fork-join-executor {\n        |       parallelism-factor = 2.0\n        |     }\n        |   }\n        | }\n        |}\n      "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{jvmExitOnFatalError, logLevel, logLifecycleEvents, logLifecycleEvents, BoxesRunTime.boxToInteger((int)akkaThroughput)})))).stripMargin();
        return ConfigFactory.parseString((String)config);
    }

    private Config getRemoteAkkaConfig(Configuration configuration, String hostname, int port) {
        Duration akkaAskTimeout = Duration$.MODULE$.apply(configuration.getString("akka.ask.timeout", ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT));
        String startupTimeout = configuration.getString("akka.startup-timeout", akkaAskTimeout.toString());
        String transportHeartbeatInterval = configuration.getString("akka.transport.heartbeat.interval", ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL);
        String transportHeartbeatPause = configuration.getString("akka.transport.heartbeat.pause", ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE);
        double transportThreshold = configuration.getDouble("akka.transport.threshold", ConfigConstants.DEFAULT_AKKA_TRANSPORT_THRESHOLD);
        String watchHeartbeatInterval = configuration.getString("akka.watch.heartbeat.interval", akkaAskTimeout.$div(10.0).toString());
        String watchHeartbeatPause = configuration.getString("akka.watch.heartbeat.pause", akkaAskTimeout.toString());
        double watchThreshold = configuration.getDouble("akka.watch.threshold", ConfigConstants.DEFAULT_AKKA_WATCH_THRESHOLD);
        String akkaTCPTimeout = configuration.getString("akka.tcp.timeout", akkaAskTimeout.toString());
        String akkaFramesize = configuration.getString("akka.framesize", ConfigConstants.DEFAULT_AKKA_FRAMESIZE);
        boolean lifecycleEvents = configuration.getBoolean("akka.log.lifecycle.events", ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS);
        String logLifecycleEvents = lifecycleEvents ? "on" : "off";
        String configString = new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |akka {\n         |  actor {\n         |    provider = \"akka.remote.RemoteActorRefProvider\"\n         |  }\n         |\n         |  remote {\n         |    startup-timeout = ", "\n         |\n         |    transport-failure-detector{\n         |      acceptable-heartbeat-pause = ", "\n         |      heartbeat-interval = ", "\n         |      threshold = ", "\n         |    }\n         |\n         |    watch-failure-detector{\n         |      heartbeat-interval = ", "\n         |      acceptable-heartbeat-pause = ", "\n         |      threshold = ", "\n         |    }\n         |\n         |    netty {\n         |      tcp {\n         |        transport-class = \"akka.remote.transport.netty.NettyTransport\"\n         |        port = ", "\n         |        connection-timeout = ", "\n         |        maximum-frame-size = ", "\n         |        tcp-nodelay = on\n         |      }\n         |    }\n         |\n         |    log-remote-lifecycle-events = ", "\n         |  }\n         |}\n       "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{startupTimeout, transportHeartbeatPause, transportHeartbeatInterval, BoxesRunTime.boxToDouble((double)transportThreshold), watchHeartbeatInterval, watchHeartbeatPause, BoxesRunTime.boxToDouble((double)watchThreshold), BoxesRunTime.boxToInteger((int)port), akkaTCPTimeout, akkaFramesize, logLifecycleEvents})))).stripMargin();
        String hostnameConfigString = hostname != null && new StringOps(Predef$.MODULE$.augmentString(hostname)).nonEmpty() ? new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n           |akka {\n           |  remote {\n           |    netty {\n           |      tcp {\n           |        hostname = ", "\n           |      }\n           |    }\n           |  }\n           |}\n         "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{hostname})))).stripMargin() : "";
        return ConfigFactory.parseString((String)new StringBuilder().append((Object)configString).append((Object)hostnameConfigString).toString());
    }

    public String getLogLevel() {
        return this.LOG().isTraceEnabled() ? "TRACE" : (this.LOG().isDebugEnabled() ? "DEBUG" : (this.LOG().isInfoEnabled() ? "INFO" : (this.LOG().isWarnEnabled() ? "WARNING" : (this.LOG().isErrorEnabled() ? "ERROR" : "OFF"))));
    }

    public Future<ActorRef> getChild(ActorRef parent, String child, ActorSystem system, FiniteDuration timeout) {
        return system.actorSelection(parent.path().$div(child)).resolveOne(Timeout$.MODULE$.durationToTimeout(timeout));
    }

    public Future<ActorRef> getActorRefFuture(String path, ActorSystem system, FiniteDuration timeout) {
        return system.actorSelection(path).resolveOne(Timeout$.MODULE$.durationToTimeout(timeout));
    }

    public ActorRef getActorRef(String path, ActorSystem system, FiniteDuration timeout) throws IOException {
        try {
            Future<ActorRef> future = this.getActorRefFuture(path, system, timeout);
            return (ActorRef)Await$.MODULE$.result(future, (Duration)timeout);
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            boolean bl = throwable2 instanceof ActorNotFound ? true : throwable2 instanceof TimeoutException;
            if (bl) {
                throw new IOException(new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Actor at ", " not reachable. "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{path}))).append((Object)"Please make sure that the actor is running and its port is reachable.").toString(), throwable2);
            }
            if (throwable2 instanceof IOException) {
                IOException iOException = (IOException)throwable2;
                throw new IOException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Could not connect to the actor at ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{path})), iOException);
            }
            throw throwable;
        }
    }

    public <T> Future<T> retry(Function0<T> body, int tries, ExecutionContext executionContext) {
        return Future$.MODULE$.apply(body, executionContext).recoverWith((PartialFunction)new Serializable(body, tries, executionContext){
            public static final long serialVersionUID = 0L;
            private final Function0 body$1;
            private final int tries$1;
            private final ExecutionContext executionContext$1;

            public final <A1 extends Throwable, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                Object object;
                A1 A1 = x1;
                if (A1 != null) {
                    A1 A12 = A1;
                    object = this.tries$1 > 0 ? AkkaUtils$.MODULE$.retry(this.body$1, this.tries$1 - 1, this.executionContext$1) : Future$.MODULE$.failed(A12);
                } else {
                    object = function1.apply(x1);
                }
                return (B1)object;
            }

            public final boolean isDefinedAt(Throwable x1) {
                Throwable throwable = x1;
                boolean bl = throwable != null;
                return bl;
            }
            {
                this.body$1 = body$1;
                this.tries$1 = tries$1;
                this.executionContext$1 = executionContext$1;
            }
        }, executionContext);
    }

    public <T> Future<T> retry(Callable<T> callable, int tries, ExecutionContext executionContext) {
        return this.retry((Function0<T>)new Serializable(callable){
            public static final long serialVersionUID = 0L;
            private final Callable callable$1;

            public final T apply() {
                return (T)this.callable$1.call();
            }
            {
                this.callable$1 = callable$1;
            }
        }, tries, executionContext);
    }

    public Future<Object> retry(ActorRef target, Object message, int tries, ExecutionContext executionContext, FiniteDuration timeout) {
        return AskableActorRef$.MODULE$.$qmark$extension(akka.pattern.package$.MODULE$.ask(target), message, Timeout$.MODULE$.durationToTimeout(timeout)).recoverWith((PartialFunction)new Serializable(target, message, tries, executionContext, timeout){
            public static final long serialVersionUID = 0L;
            private final ActorRef target$1;
            private final Object message$1;
            private final int tries$2;
            private final ExecutionContext executionContext$2;
            private final FiniteDuration timeout$1;

            public final <A1 extends Throwable, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                Object object;
                A1 A1 = x2;
                if (A1 != null) {
                    A1 A12 = A1;
                    object = this.tries$2 > 0 ? AkkaUtils$.MODULE$.retry(this.target$1, this.message$1, this.tries$2 - 1, this.executionContext$2, this.timeout$1) : Future$.MODULE$.failed(A12);
                } else {
                    object = function1.apply(x2);
                }
                return (B1)object;
            }

            public final boolean isDefinedAt(Throwable x2) {
                Throwable throwable = x2;
                boolean bl = throwable != null;
                return bl;
            }
            {
                this.target$1 = target$1;
                this.message$1 = message$1;
                this.tries$2 = tries$2;
                this.executionContext$2 = executionContext$2;
                this.timeout$1 = timeout$1;
            }
        }, executionContext);
    }

    public FiniteDuration getTimeout(Configuration config) {
        Duration duration = Duration$.MODULE$.apply(config.getString("akka.ask.timeout", ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT));
        return new FiniteDuration(duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    public FiniteDuration getDefaultTimeout() {
        Duration duration = Duration$.MODULE$.apply(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
        return new FiniteDuration(duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    public FiniteDuration getLookupTimeout(Configuration config) {
        Duration duration = Duration$.MODULE$.apply(config.getString("akka.lookup.timeout", ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT));
        return new FiniteDuration(duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    public FiniteDuration getDefaultLookupTimeout() {
        Duration duration = Duration$.MODULE$.apply(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT);
        return new FiniteDuration(duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    public Address getAddress(ActorSystem system) {
        return ((RemoteAddressExtensionImplementation)RemoteAddressExtension$.MODULE$.apply(system)).address();
    }

    public String getAkkaURL(ActorSystem system, ActorRef actor) {
        Address address = this.getAddress(system);
        return actor.path().toStringWithAddress(address);
    }

    public String getAkkaURL(ActorSystem system, String path) {
        Address address = this.getAddress(system);
        return new StringBuilder().append((Object)address.toString()).append((Object)path).toString();
    }

    public InetSocketAddress getInetSockeAddressFromAkkaURL(String akkaURL) throws Exception {
        try {
            int protocolonPos = akkaURL.indexOf("://");
            if (protocolonPos == -1 || protocolonPos >= akkaURL.length() - 4) {
                throw new MalformedURLException();
            }
            URL url = new URL(new StringBuilder().append((Object)"http://").append((Object)akkaURL.substring(protocolonPos + 3)).toString());
            if (url.getHost() == null || url.getPort() == -1) {
                throw new MalformedURLException();
            }
            return new InetSocketAddress(url.getHost(), url.getPort());
        }
        catch (MalformedURLException malformedURLException) {
            throw new Exception(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Could not retrieve InetSocketAddress from Akka URL ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{akkaURL})));
        }
    }

    private AkkaUtils$() {
        MODULE$ = this;
        this.LOG = LoggerFactory.getLogger(this.getClass());
        this.INF_TIMEOUT = new package.DurationInt(package$.MODULE$.DurationInt(21474835)).seconds();
    }
}

