package org.apache.samza.clustermanager.container.placement;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.samza.clustermanager.ContainerProcessManager;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.class */
public class ContainerPlacementRequestAllocator implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ContainerPlacementRequestAllocator.class);
    private static final int DEFAULT_CLUSTER_MANAGER_CONTAINER_PLACEMENT_HANDLER_SLEEP_MS = 5000;
    private final ContainerProcessManager containerProcessManager;
    private final ContainerPlacementMetadataStore containerPlacementMetadataStore;
    private volatile boolean isRunning;
    private final String appRunId;
    private final int containerPlacementHandlerSleepMs;

    public ContainerPlacementRequestAllocator(ContainerPlacementMetadataStore containerPlacementMetadataStore, ContainerProcessManager containerProcessManager, ApplicationConfig applicationConfig) {
        Preconditions.checkNotNull(containerPlacementMetadataStore, "containerPlacementMetadataStore cannot be null");
        Preconditions.checkNotNull(containerProcessManager, "ContainerProcessManager cannot be null");
        this.containerProcessManager = containerProcessManager;
        this.containerPlacementMetadataStore = containerPlacementMetadataStore;
        this.isRunning = true;
        this.appRunId = applicationConfig.getRunId();
        this.containerPlacementHandlerSleepMs = DEFAULT_CLUSTER_MANAGER_CONTAINER_PLACEMENT_HANDLER_SLEEP_MS;
    }

    @VisibleForTesting
    public ContainerPlacementRequestAllocator(ContainerPlacementMetadataStore containerPlacementMetadataStore, ContainerProcessManager containerProcessManager, ApplicationConfig applicationConfig, int i) {
        Preconditions.checkNotNull(containerPlacementMetadataStore, "containerPlacementMetadataStore cannot be null");
        Preconditions.checkNotNull(containerProcessManager, "ContainerProcessManager cannot be null");
        this.containerProcessManager = containerProcessManager;
        this.containerPlacementMetadataStore = containerPlacementMetadataStore;
        this.isRunning = true;
        this.appRunId = applicationConfig.getRunId();
        this.containerPlacementHandlerSleepMs = i;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.isRunning && this.containerPlacementMetadataStore.isRunning()) {
            try {
                for (ContainerPlacementRequestMessage containerPlacementRequestMessage : this.containerPlacementMetadataStore.readAllContainerPlacementRequestMessages()) {
                    if (containerPlacementRequestMessage.getDeploymentId().equals(this.appRunId)) {
                        LOG.debug("Received a container placement message {}", containerPlacementRequestMessage);
                        this.containerProcessManager.registerContainerPlacementAction(containerPlacementRequestMessage);
                    } else {
                        this.containerPlacementMetadataStore.deleteAllContainerPlacementMessages(containerPlacementRequestMessage.getUuid());
                    }
                }
                Thread.sleep(this.containerPlacementHandlerSleepMs);
            } catch (InterruptedException e) {
                LOG.warn("Got InterruptedException in ContainerPlacementRequestAllocator thread.", e);
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                LOG.error("Got an exception while reading ContainerPlacementRequestMessage in ContainerPlacementRequestAllocator thread", e2);
            }
        }
    }

    public void stop() {
        if (this.isRunning) {
            this.isRunning = false;
        } else {
            LOG.warn("ContainerPlacementRequestAllocator already stopped");
        }
    }
}
