/*
 * Decompiled with CFR 0.152.
 */
package org.deeplearning4j.scaleout.actor.core.actor;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.UntypedActor;
import akka.contrib.pattern.ClusterSingletonManager;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;
import akka.dispatch.Futures;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Function;
import akka.routing.RoundRobinPool;
import java.lang.reflect.Constructor;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.deeplearning4j.nn.conf.Configuration;
import org.deeplearning4j.nn.conf.DeepLearningConfigurable;
import org.deeplearning4j.scaleout.actor.core.actor.WorkerActor;
import org.deeplearning4j.scaleout.actor.core.protocol.Ack;
import org.deeplearning4j.scaleout.actor.util.ActorRefUtils;
import org.deeplearning4j.scaleout.api.statetracker.StateTracker;
import org.deeplearning4j.scaleout.api.workrouter.WorkRouter;
import org.deeplearning4j.scaleout.job.Job;
import org.deeplearning4j.scaleout.messages.DoneMessage;
import org.deeplearning4j.scaleout.messages.MoreWorkMessage;
import org.deeplearning4j.scaleout.perform.WorkerPerformer;
import org.deeplearning4j.scaleout.perform.WorkerPerformerFactory;
import scala.Option;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

public class MasterActor
extends UntypedActor
implements DeepLearningConfigurable {
    protected Configuration conf;
    protected LoggingAdapter log = Logging.getLogger((ActorSystem)this.getContext().system(), (Object)((Object)this));
    protected ActorRef batchActor;
    protected StateTracker stateTracker;
    protected final ActorRef mediator = DistributedPubSubExtension.get((ActorSystem)this.getContext().system()).mediator();
    public static String BROADCAST = "broadcast";
    public static String MASTER = "result";
    public static String SHUTDOWN = "shutdown";
    public static String FINISH = "finish";
    public static final String NAME_SPACE = "org.deeplearning4j.scaleout.actor.core.actor";
    public static final String POLL_FOR_WORK = "org.deeplearning4j.scaleout.actor.core.actor.poll";
    protected int secondsPoll = 1;
    protected Cancellable forceNextPhase;
    protected Cancellable clearStateWorkers;
    protected WorkRouter workRouter;
    protected AtomicBoolean doneCalled = new AtomicBoolean(false);

    public MasterActor(Configuration conf, ActorRef batchActor, final StateTracker stateTracker, WorkRouter router) {
        this.conf = conf;
        this.batchActor = batchActor;
        this.workRouter = router;
        this.stateTracker = stateTracker;
        this.setup(conf);
        stateTracker.runPreTrainIterations(conf.getInt("org.deeplearning4j.numpasses", 1));
        this.mediator.tell((Object)new DistributedPubSubMediator.Subscribe(MASTER, this.getSelf()), this.getSelf());
        this.mediator.tell((Object)new DistributedPubSubMediator.Subscribe(FINISH, this.getSelf()), this.getSelf());
        this.forceNextPhase = this.context().system().scheduler().schedule(Duration.create((long)this.secondsPoll, (TimeUnit)TimeUnit.SECONDS), Duration.create((long)this.secondsPoll, (TimeUnit)TimeUnit.SECONDS), new Runnable(){

            @Override
            public void run() {
                MasterActor.this.log.info("Heart beat on " + stateTracker.workers().size() + " workers");
                if (stateTracker.isDone()) {
                    return;
                }
                if (MasterActor.this.workRouter.sendWork()) {
                    MasterActor.this.nextBatch();
                }
                try {
                    HashSet<Job> clear = new HashSet<Job>();
                    for (Job j : stateTracker.currentJobs()) {
                        if (!stateTracker.recentlyCleared().contains(j.workerId())) continue;
                        stateTracker.clearJob(j.workerId());
                        clear.add(j);
                        MasterActor.this.log.info("Found job that wasn't clear " + j.workerId());
                    }
                    stateTracker.currentJobs().removeAll(clear);
                    if (stateTracker.currentJobs().isEmpty()) {
                        stateTracker.recentlyCleared().clear();
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, (ExecutionContext)this.context().dispatcher());
        this.clearStateWorkers = this.context().system().scheduler().schedule(Duration.create((long)1L, (TimeUnit)TimeUnit.MINUTES), Duration.create((long)1L, (TimeUnit)TimeUnit.MINUTES), new Runnable(){

            @Override
            public void run() {
                if (stateTracker.isDone()) {
                    return;
                }
                try {
                    long now = System.currentTimeMillis();
                    Map heartbeats = MasterActor.this.stateTracker.getHeartBeats();
                    for (String key : heartbeats.keySet()) {
                        long lastChecked = (Long)heartbeats.get(key);
                        long diff = now - lastChecked;
                        long seconds = TimeUnit.MILLISECONDS.toSeconds(diff);
                        if (seconds < 120L) continue;
                        MasterActor.this.log.info("Removing stale worker " + key);
                        MasterActor.this.stateTracker.removeWorker(key);
                    }
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }, (ExecutionContext)this.context().dispatcher());
    }

    public void setup(Configuration conf) {
        this.log.info("Starting workers");
        ActorSystem system = this.context().system();
        RoundRobinPool pool = new RoundRobinPool(Runtime.getRuntime().availableProcessors());
        String performerFactoryClazz = conf.get("org.deeplearning4j.scaleout.perform.workerperformer");
        try {
            Class<?> clazz = Class.forName(performerFactoryClazz);
            WorkerPerformerFactory factory = null;
            try {
                Constructor<?> c = clazz.getConstructor(StateTracker.class);
                factory = (WorkerPerformerFactory)c.newInstance(this.stateTracker);
            }
            catch (NoSuchMethodException e) {
                factory = (WorkerPerformerFactory)clazz.newInstance();
            }
            WorkerPerformer performer = factory.create(conf);
            this.secondsPoll = conf.getInt(POLL_FOR_WORK, 10);
            Props p = pool.props(WorkerActor.propsFor(conf, this.stateTracker, performer));
            p = ClusterSingletonManager.defaultProps((Props)p, (String)"master", (Object)PoisonPill.getInstance(), (String)"master");
            system.actorOf(p, "worker");
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void onReceive(Object message) throws Exception {
        if (message instanceof DistributedPubSubMediator.SubscribeAck || message instanceof DistributedPubSubMediator.UnsubscribeAck) {
            DistributedPubSubMediator.SubscribeAck ack = (DistributedPubSubMediator.SubscribeAck)message;
            this.mediator.tell((Object)new DistributedPubSubMediator.Publish("topics", message), this.getSelf());
            this.log.info("Subscribed " + ack.toString());
        } else if (message instanceof DoneMessage) {
            this.log.info("Received done message");
            this.doDoneOrNextPhase();
        } else if (message instanceof String) {
            this.getSender().tell((Object)Ack.getInstance(), this.getSelf());
        } else if (message instanceof MoreWorkMessage) {
            this.log.info("Prompted for more work, starting pipeline");
            this.mediator.tell((Object)new DistributedPubSubMediator.Publish("batch", (Object)MoreWorkMessage.getInstance()), this.getSelf());
        } else {
            this.unhandled(message);
        }
    }

    protected void nextBatch() {
        List currentJobs;
        Collection updates = this.stateTracker.workerUpdates();
        try {
            currentJobs = this.stateTracker.currentJobs();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        if (!updates.isEmpty() && currentJobs.isEmpty()) {
            this.workRouter.update();
            Future f = Futures.future((Callable)new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    MasterActor.this.mediator.tell((Object)new DistributedPubSubMediator.Publish("batch", (Object)MoreWorkMessage.getInstance()), MasterActor.this.getSelf());
                    MasterActor.this.log.info("Requesting more work...");
                    return null;
                }
            }, (ExecutionContext)this.context().dispatcher());
            ActorRefUtils.throwExceptionIfExists(f, (ExecutionContext)this.context().dispatcher());
        } else if (currentJobs.isEmpty()) {
            this.stateTracker.finish();
            this.stateTracker.shutdown();
            this.context().system().shutdown();
            this.log.info("Current jobs is empty and no more updates; terminating");
        }
    }

    protected void doDoneOrNextPhase() throws Exception {
        if (!this.stateTracker.workerUpdates().isEmpty()) {
            this.workRouter.update();
        }
        if (this.stateTracker.currentJobs().isEmpty()) {
            if (this.doneCalled.get()) {
                return;
            }
            this.doneCalled.set(true);
            this.nextBatch();
            this.stateTracker.finish();
            this.log.info("Done training!");
        }
    }

    public void aroundPostRestart(Throwable reason) {
        super.aroundPostRestart(reason);
        this.log.info("Restarted because of ", (Object)reason);
    }

    public void aroundPreRestart(Throwable reason, Option<Object> message) {
        super.aroundPreRestart(reason, message);
        this.log.info("Restarted because of ", (Object)(reason + " with message " + message.toString()));
    }

    public void preStart() throws Exception {
        super.preStart();
        this.mediator.tell((Object)new DistributedPubSubMediator.Put(this.getSelf()), this.getSelf());
        ActorRef self = this.self();
        this.log.info("Setup master with path " + self.path());
        this.log.info("Pre start on master " + this.self().path().toString());
    }

    public void postStop() throws Exception {
        super.postStop();
        this.log.info("Post stop on master");
        if (this.clearStateWorkers != null) {
            this.clearStateWorkers.cancel();
        }
        if (this.forceNextPhase != null) {
            this.forceNextPhase.cancel();
        }
    }

    public SupervisorStrategy supervisorStrategy() {
        return new OneForOneStrategy(0, (Duration)Duration.Zero(), (Function)new Function<Throwable, SupervisorStrategy.Directive>(){

            public SupervisorStrategy.Directive apply(Throwable cause) {
                MasterActor.this.log.error("Problem with processing", (Object)cause);
                return SupervisorStrategy.resume();
            }
        });
    }
}

