package org.deeplearning4j.scaleout.actor.core.actor;

import akka.actor.ActorRef;
import akka.actor.AddressFromURIString;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;
import akka.japi.Function;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.canova.api.conf.Configuration;
import org.deeplearning4j.nn.conf.DeepLearningConfigurable;
import org.deeplearning4j.scaleout.actor.core.protocol.Ack;
import org.deeplearning4j.scaleout.actor.core.protocol.ClearWorker;
import org.deeplearning4j.scaleout.api.statetracker.StateTracker;
import org.deeplearning4j.scaleout.job.Job;
import org.deeplearning4j.scaleout.perform.WorkerPerformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;

/* loaded from: input_file:org/deeplearning4j/scaleout/actor/core/actor/WorkerActor.class */
public class WorkerActor extends UntypedActor implements DeepLearningConfigurable {
    protected Job currentJob;
    protected String id;
    Cluster cluster;
    protected ActorRef clusterClient;
    protected String masterPath;
    protected StateTracker tracker;
    protected AtomicBoolean isWorking;
    protected Configuration conf;
    protected ActorRef mediator;
    protected Cancellable heartbeat;
    protected static final Logger log = LoggerFactory.getLogger(WorkerActor.class);
    protected WorkerPerformer workerPerformer;

    public WorkerActor(Configuration configuration, StateTracker stateTracker, WorkerPerformer workerPerformer) throws Exception {
        this(configuration, null, stateTracker, workerPerformer);
    }

    public WorkerActor(Configuration configuration, ActorRef actorRef, StateTracker stateTracker, WorkerPerformer workerPerformer) throws Exception {
        this.cluster = Cluster.get(getContext().system());
        this.isWorking = new AtomicBoolean(false);
        this.mediator = DistributedPubSubExtension.get(getContext().system()).mediator();
        this.tracker = stateTracker;
        this.workerPerformer = workerPerformer;
        this.mediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());
        this.mediator.tell(new DistributedPubSubMediator.Subscribe(MasterActor.BROADCAST, getSelf()), getSelf());
        this.mediator.tell(new DistributedPubSubMediator.Subscribe(MasterActor.SHUTDOWN, getSelf()), getSelf());
        this.id = generateId();
        this.mediator.tell(new DistributedPubSubMediator.Publish(MasterActor.MASTER, register()), getSelf());
        this.clusterClient = actorRef;
        stateTracker.availableForWork(this.id);
        this.masterPath = configuration.get("org.deeplearning4j.scaleout.masterpath", "");
        log.info("Registered with master " + this.id + " at master " + configuration.get("org.deeplearning4j.scaleout.masterpath"));
        heartbeat();
        stateTracker.addWorker(this.id);
        setup(configuration);
        this.mediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());
        this.mediator.tell(new DistributedPubSubMediator.Subscribe(MasterActor.BROADCAST, getSelf()), getSelf());
        this.mediator.tell(new DistributedPubSubMediator.Subscribe(this.id, getSelf()), getSelf());
        heartbeat();
        stateTracker.addWorker(this.id);
    }

    public static Props propsFor(Configuration configuration, StateTracker stateTracker, WorkerPerformer workerPerformer) {
        return Props.create(WorkerActor.class, new Object[]{configuration, stateTracker, workerPerformer});
    }

    public void onReceive(Object obj) throws Exception {
        if ((obj instanceof DistributedPubSubMediator.SubscribeAck) || (obj instanceof DistributedPubSubMediator.UnsubscribeAck)) {
            this.mediator.tell(new DistributedPubSubMediator.Publish("topics", obj), getSelf());
            log.info("Subscribed to " + ((DistributedPubSubMediator.SubscribeAck) obj).toString());
        } else if (obj instanceof Ack) {
            log.info("Ack from master on worker " + this.id);
        } else {
            unhandled(obj);
        }
    }

    public void aroundPostStop() {
        super.aroundPostStop();
        this.mediator.tell(new DistributedPubSubMediator.Publish(MasterActor.MASTER, new ClearWorker(this.id)), getSelf());
        this.heartbeat.cancel();
    }

    protected void heartbeat() throws Exception {
        this.heartbeat = context().system().scheduler().schedule(Duration.apply(1L, TimeUnit.SECONDS), Duration.apply(1L, TimeUnit.SECONDS), new Runnable() { // from class: org.deeplearning4j.scaleout.actor.core.actor.WorkerActor.1
            @Override // java.lang.Runnable
            public void run() {
                if (!WorkerActor.this.tracker.isDone()) {
                    WorkerActor.this.tracker.addWorker(WorkerActor.this.id);
                }
                try {
                    WorkerActor.this.checkJobAvailable();
                    if (WorkerActor.this.getCurrentJob() != null) {
                        Job currentJob = WorkerActor.this.getCurrentJob();
                        if (currentJob.getWork() == null) {
                            WorkerActor.this.tracker.clearJob(WorkerActor.this.id);
                            WorkerActor.this.tracker.enableWorker(WorkerActor.this.id);
                            WorkerActor.log.warn("Work for worker " + WorkerActor.this.id + " was null");
                            return;
                        }
                        String workerId = currentJob.workerId();
                        if (workerId == null || workerId.isEmpty()) {
                            currentJob.setWorkerId(workerId);
                        }
                        WorkerActor.log.info("Confirmation from " + currentJob.workerId() + " on work");
                        long currentTimeMillis = System.currentTimeMillis();
                        WorkerActor.this.workerPerformer.perform(currentJob);
                        WorkerActor.log.info("Job took " + Math.abs(System.currentTimeMillis() - currentTimeMillis) + " milliseconds");
                        WorkerActor.this.tracker.addUpdate(workerId, currentJob);
                        WorkerActor.this.tracker.clearJob(workerId);
                        WorkerActor.this.setCurrentJob(null);
                    } else if ((WorkerActor.this.getCurrentJob() == null || (!WorkerActor.this.isWorking.get() && WorkerActor.this.tracker.jobFor(WorkerActor.this.id) != null)) && WorkerActor.this.tracker.jobFor(WorkerActor.this.id) != null) {
                        WorkerActor.this.tracker.clearJob(WorkerActor.this.id);
                        WorkerActor.log.info("Clearing stale job... " + WorkerActor.this.id);
                    }
                } catch (HazelcastInstanceNotActiveException e) {
                    WorkerActor.log.warn("Hazel cast shut down...exiting");
                } catch (Exception e2) {
                    throw new RuntimeException(e2);
                }
            }
        }, context().dispatcher());
    }

    public synchronized void setCurrentJob(Job job) {
        this.currentJob = job;
    }

    public synchronized Job getCurrentJob() {
        return this.currentJob;
    }

    public WorkerState register() {
        return new WorkerState(this.id);
    }

    public String generateId() {
        return System.getProperty("akka.remote.netty.tcp.hostname", "localhost") + "-" + UUID.randomUUID().toString();
    }

    public void postStop() throws Exception {
        super.postStop();
        try {
            this.tracker.removeWorker(this.id);
        } catch (Exception e) {
            log.info("Tracker already shut down");
        }
        log.info("Post stop on worker actor");
        this.cluster.unsubscribe(getSelf());
    }

    public void preStart() throws Exception {
        super.preStart();
        this.cluster.subscribe(getSelf(), new Class[]{ClusterEvent.MemberEvent.class});
        log.info("Pre start on worker");
    }

    protected void checkJobAvailable() throws Exception {
        Job jobFor = this.tracker.jobFor(this.id);
        if (jobFor == null) {
            if (this.isWorking.get() || jobFor == null) {
                return;
            }
            this.tracker.clearJob(this.id);
            log.info("Clearing stale job " + this.id);
            return;
        }
        if (this.tracker.needsReplicate(this.id)) {
            try {
                log.info("Updating worker " + this.id);
                setCurrentJob((Job) this.tracker.getCurrent());
                this.tracker.doneReplicating(this.id);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        if (jobFor == null || getCurrentJob() != null) {
            return;
        }
        log.info("Assigning job for worker " + this.id);
        setCurrentJob(jobFor);
    }

    public void setup(Configuration configuration) {
        this.conf = configuration;
        String str = configuration.get("org.deeplearning4j.scaleout.masterurl");
        if (str != null) {
            this.masterPath = configuration.get("org.deeplearning4j.scaleout.masterpath");
            Cluster.get(context().system()).join(AddressFromURIString.apply(str));
            this.mediator = DistributedPubSubExtension.get(getContext().system()).mediator();
        }
    }

    public SupervisorStrategy supervisorStrategy() {
        return new OneForOneStrategy(0, Duration.Zero(), new Function<Throwable, SupervisorStrategy.Directive>() { // from class: org.deeplearning4j.scaleout.actor.core.actor.WorkerActor.2
            public SupervisorStrategy.Directive apply(Throwable th) {
                WorkerActor.log.error("Problem with processing", th);
                WorkerActor.this.mediator.tell(new DistributedPubSubMediator.Publish(MasterActor.MASTER, new ClearWorker(WorkerActor.this.id)), WorkerActor.this.getSelf());
                return SupervisorStrategy.restart();
            }
        });
    }
}
