package org.apache.flink.runtime.clusterframework;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers;
import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.util.Preconditions;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/clusterframework/FlinkResourceManager.class */
public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrievable> extends FlinkUntypedActor {
    protected static final int EXIT_CODE_FATAL_ERROR = -13;
    public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
    protected final Configuration config;
    private final FiniteDuration messageTimeout;
    private final LeaderRetrievalService leaderRetriever;
    private final Map<ResourceID, WorkerType> startedWorkers = new HashMap();
    private final Set<ActorRef> infoMessageListeners;
    private ActorRef jobManager;
    private UUID leaderSessionID;
    private int designatedPoolSize;

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkResourceManager(int i, Configuration configuration, LeaderRetrievalService leaderRetrievalService) {
        FiniteDuration finiteDuration;
        this.config = (Configuration) Objects.requireNonNull(configuration);
        this.leaderRetriever = (LeaderRetrievalService) Objects.requireNonNull(leaderRetrievalService);
        try {
            finiteDuration = AkkaUtils.getLookupTimeout(this.config);
        } catch (Exception e) {
            finiteDuration = new FiniteDuration(Duration.apply(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT).toMillis(), TimeUnit.MILLISECONDS);
        }
        this.messageTimeout = finiteDuration;
        this.designatedPoolSize = i;
        this.infoMessageListeners = new HashSet();
    }

    public void preStart() {
        try {
            this.leaderRetriever.start(new LeaderRetrievalListener() { // from class: org.apache.flink.runtime.clusterframework.FlinkResourceManager.1
                @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
                public void notifyLeaderAddress(String str, UUID uuid) {
                    FlinkResourceManager.this.self().tell(new NewLeaderAvailable(str, uuid), ActorRef.noSender());
                }

                @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
                public void handleError(Exception exc) {
                    FlinkResourceManager.this.self().tell(new FatalErrorOccurred("Leader retrieval service failed", exc), ActorRef.noSender());
                }
            });
            initialize();
        } catch (Throwable th) {
            self().tell(new FatalErrorOccurred("Error during startup of ResourceManager actor", th), ActorRef.noSender());
        }
    }

    public void postStop() {
        try {
            this.leaderRetriever.stop();
        } catch (Throwable th) {
            this.LOG.error("Could not cleanly shut down leader retrieval service", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.akka.FlinkUntypedActor
    public void handleMessage(Object obj) {
        try {
            if (obj instanceof CheckAndAllocateContainers) {
                checkWorkersPool();
            } else if (obj instanceof SetWorkerPoolSize) {
                adjustDesignatedNumberOfWorkers(((SetWorkerPoolSize) obj).numberOfWorkers());
            } else if (obj instanceof RemoveResource) {
                removeRegisteredResource(((RemoveResource) obj).resourceId());
            } else if (obj instanceof NotifyResourceStarted) {
                handleResourceStarted(sender(), ((NotifyResourceStarted) obj).getResourceID());
            } else if (obj instanceof NewLeaderAvailable) {
                NewLeaderAvailable newLeaderAvailable = (NewLeaderAvailable) obj;
                newJobManagerLeaderAvailable(newLeaderAvailable.leaderAddress(), newLeaderAvailable.leaderSessionId());
            } else if (obj instanceof TriggerRegistrationAtJobManager) {
                triggerConnectingToJobManager(((TriggerRegistrationAtJobManager) obj).jobManagerAddress());
            } else if (obj instanceof RegisterResourceManagerSuccessful) {
                RegisterResourceManagerSuccessful registerResourceManagerSuccessful = (RegisterResourceManagerSuccessful) obj;
                jobManagerLeaderConnected(registerResourceManagerSuccessful.jobManager(), registerResourceManagerSuccessful.currentlyRegisteredTaskManagers());
            } else if (obj instanceof StopCluster) {
                StopCluster stopCluster = (StopCluster) obj;
                shutdownCluster(stopCluster.finalStatus(), stopCluster.message());
                sender().tell(decorateMessage(StopClusterSuccessful.getInstance()), ActorRef.noSender());
            } else if (obj instanceof RegisterInfoMessageListener) {
                if (this.jobManager != null) {
                    this.infoMessageListeners.add(sender());
                    sender().tell(decorateMessage(RegisterInfoMessageListenerSuccessful.get()), this.jobManager);
                }
            } else if (obj instanceof UnRegisterInfoMessageListener) {
                this.infoMessageListeners.remove(sender());
            } else if (obj instanceof FatalErrorOccurred) {
                FatalErrorOccurred fatalErrorOccurred = (FatalErrorOccurred) obj;
                fatalError(fatalErrorOccurred.message(), fatalErrorOccurred.error());
            } else {
                this.LOG.error("Discarding unknown message: {}", obj);
            }
        } catch (Throwable th) {
            fatalError("Error processing actor message", th);
        }
    }

    @Override // org.apache.flink.runtime.akka.FlinkUntypedActor
    protected final UUID getLeaderSessionID() {
        return this.leaderSessionID;
    }

    public int getDesignatedWorkerPoolSize() {
        return this.designatedPoolSize;
    }

    public int getNumberOfStartedTaskManagers() {
        return this.startedWorkers.size();
    }

    public Collection<WorkerType> getStartedTaskManagers() {
        return this.startedWorkers.values();
    }

    public boolean isStarted(ResourceID resourceID) {
        return this.startedWorkers.containsKey(resourceID);
    }

    public Collection<WorkerType> allStartedWorkers() {
        return this.startedWorkers.values();
    }

    private void handleResourceStarted(ActorRef actorRef, ResourceID resourceID) {
        if (resourceID != null) {
            if (this.startedWorkers.get(resourceID) != null) {
                this.LOG.debug("Notification that TaskManager {} had been started was sent before.", resourceID);
            } else {
                WorkerType workerStarted = workerStarted(resourceID);
                if (workerStarted != null) {
                    this.startedWorkers.put(resourceID, workerStarted);
                    this.LOG.info("TaskManager {} has started.", resourceID);
                } else {
                    this.LOG.info("TaskManager {} has not been started by this resource manager.", resourceID);
                }
            }
        }
        actorRef.tell(decorateMessage(Acknowledge.get()), self());
    }

    private void removeRegisteredResource(ResourceID resourceID) {
        WorkerType remove = this.startedWorkers.remove(resourceID);
        if (remove != null) {
            releaseStartedWorker(remove);
        } else {
            this.LOG.warn("Resource {} could not be released", resourceID);
        }
    }

    private void newJobManagerLeaderAvailable(String str, UUID uuid) {
        this.LOG.debug("Received new leading JobManager {}. Connecting.", str);
        jobManagerLostLeadership();
        if (uuid == null || str == null) {
            return;
        }
        this.leaderSessionID = uuid;
        triggerConnectingToJobManager(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void triggerConnectingToJobManager(String str) {
        this.LOG.info("Trying to associate with JobManager leader " + str);
        Object decorateMessage = decorateMessage(new RegisterResourceManager(self()));
        final Object decorateMessage2 = decorateMessage(new TriggerRegistrationAtJobManager(str));
        Patterns.ask(context().actorSelection(str), decorateMessage, new Timeout(this.messageTimeout)).onComplete(new OnComplete<Object>() { // from class: org.apache.flink.runtime.clusterframework.FlinkResourceManager.2
            public void onComplete(Throwable th, Object obj) {
                if (FlinkResourceManager.this.jobManager == null) {
                    if (obj == null) {
                        FlinkResourceManager.this.LOG.error("Resource manager could not register at JobManager", th);
                        FlinkResourceManager.this.self().tell(decorateMessage2, ActorRef.noSender());
                    } else if ((obj instanceof JobManagerMessages.LeaderSessionMessage) && (((JobManagerMessages.LeaderSessionMessage) obj).message() instanceof RegisterResourceManagerSuccessful)) {
                        FlinkResourceManager.this.self().tell(obj, ActorRef.noSender());
                    } else {
                        FlinkResourceManager.this.LOG.error("Invalid response type to registration at JobManager: {}", obj);
                        FlinkResourceManager.this.self().tell(decorateMessage2, ActorRef.noSender());
                    }
                }
            }
        }, context().dispatcher());
    }

    private void jobManagerLostLeadership() {
        if (this.jobManager != null) {
            this.LOG.info("Associated JobManager {} lost leader status", this.jobManager);
            this.jobManager = null;
            this.leaderSessionID = null;
            this.infoMessageListeners.clear();
        }
    }

    private void jobManagerLeaderConnected(ActorRef actorRef, Collection<ResourceID> collection) {
        if (this.jobManager != null) {
            String str = "Attempting to associate with new JobManager leader " + actorRef + " without previously disassociating from current leader " + this.jobManager;
            fatalError(str, new Exception(str));
            return;
        }
        this.LOG.info("Resource Manager associating with leading JobManager {} - leader session {}", actorRef, this.leaderSessionID);
        this.jobManager = actorRef;
        if (collection.size() > 0) {
            this.LOG.info("Received TaskManagers that were registered at the leader JobManager. Trying to consolidate.");
            HashSet hashSet = new HashSet(collection.size());
            hashSet.addAll(collection);
            try {
                Collection<WorkerType> reacceptRegisteredWorkers = reacceptRegisteredWorkers(collection);
                this.LOG.info("Consolidated {} TaskManagers", Integer.valueOf(reacceptRegisteredWorkers.size()));
                for (WorkerType workertype : reacceptRegisteredWorkers) {
                    ResourceID resourceID = workertype.getResourceID();
                    this.startedWorkers.put(resourceID, workertype);
                    hashSet.remove(resourceID);
                }
            } catch (Throwable th) {
                this.LOG.error("Error during consolidation of known TaskManagers", th);
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    releasePendingWorker((ResourceID) it.next());
                }
            }
        }
        checkWorkersPool();
    }

    private void shutdownCluster(ApplicationStatus applicationStatus, String str) {
        this.LOG.info("Shutting down cluster with status {} : {}", applicationStatus, str);
        shutdownApplication(applicationStatus, str);
    }

    private void checkWorkersPool() {
        int numWorkerRequestsPending = getNumWorkerRequestsPending();
        int numWorkersPendingRegistration = getNumWorkersPendingRegistration();
        Preconditions.checkState(numWorkerRequestsPending >= 0, "Number of pending workers should never be below 0.");
        Preconditions.checkState(numWorkersPendingRegistration >= 0, "Number of pending workers pending registration should never be below 0.");
        int size = this.designatedPoolSize - ((this.startedWorkers.size() + numWorkerRequestsPending) + numWorkersPendingRegistration);
        if (size > 0) {
            requestNewWorkers(size);
        }
    }

    private void adjustDesignatedNumberOfWorkers(int i) {
        if (i < 0) {
            this.LOG.warn("Ignoring invalid designated worker pool size: " + i);
            return;
        }
        this.LOG.info("Adjusting designated worker pool size to {}", Integer.valueOf(i));
        this.designatedPoolSize = i;
        checkWorkersPool();
    }

    public void triggerCheckWorkers() {
        self().tell(decorateMessage(CheckAndAllocateContainers.get()), ActorRef.noSender());
    }

    public void notifyWorkerFailed(ResourceID resourceID, String str) {
        if (this.startedWorkers.remove(resourceID) != null) {
            this.jobManager.tell(decorateMessage(new ResourceRemoved(resourceID, str)), self());
        }
    }

    protected abstract void initialize() throws Exception;

    protected abstract void shutdownApplication(ApplicationStatus applicationStatus, String str);

    protected abstract void fatalError(String str, Throwable th);

    protected abstract void requestNewWorkers(int i);

    protected abstract void releasePendingWorker(ResourceID resourceID);

    protected abstract void releaseStartedWorker(WorkerType workertype);

    protected abstract WorkerType workerStarted(ResourceID resourceID);

    protected abstract Collection<WorkerType> reacceptRegisteredWorkers(Collection<ResourceID> collection);

    protected abstract int getNumWorkerRequestsPending();

    protected abstract int getNumWorkersPendingRegistration();

    protected void sendInfoMessage(String str) {
        Iterator<ActorRef> it = this.infoMessageListeners.iterator();
        while (it.hasNext()) {
            it.next().tell(decorateMessage(new InfoMessage(str)), self());
        }
    }

    public static ActorRef startResourceManagerActors(Configuration configuration, ActorSystem actorSystem, LeaderRetrievalService leaderRetrievalService, Class<? extends FlinkResourceManager<?>> cls) {
        return startResourceManagerActors(configuration, actorSystem, leaderRetrievalService, cls, "resourcemanager-" + UUID.randomUUID());
    }

    public static ActorRef startResourceManagerActors(Configuration configuration, ActorSystem actorSystem, LeaderRetrievalService leaderRetrievalService, Class<? extends FlinkResourceManager<?>> cls, String str) {
        return actorSystem.actorOf(getResourceManagerProps(cls, configuration, leaderRetrievalService), str);
    }

    public static Props getResourceManagerProps(Class<? extends FlinkResourceManager> cls, Configuration configuration, LeaderRetrievalService leaderRetrievalService) {
        return Props.create(cls, new Object[]{configuration, leaderRetrievalService});
    }
}
