package org.apache.samza.clustermanager;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.time.Duration;
import java.util.Iterator;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import org.apache.samza.config.Config;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.placement.ContainerPlacementMessage;
import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
import org.apache.samza.container.placement.ContainerPlacementResponseMessage;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.util.BoundedLinkedHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/clustermanager/ContainerManager.class */
public class ContainerManager {
    private static final Logger LOG = LoggerFactory.getLogger(ContainerManager.class);
    private static final String ANY_HOST = "ANY_HOST";
    private static final String LAST_SEEN = "LAST_SEEN";
    private static final String FORCE_RESTART_LAST_SEEN = "FORCE_RESTART_LAST_SEEN";
    private static final int UUID_CACHE_SIZE = 20000;
    private final ContainerPlacementMetadataStore containerPlacementMetadataStore;
    private final ClusterResourceManager clusterResourceManager;
    private final SamzaApplicationState samzaApplicationState;
    private final boolean hostAffinityEnabled;
    private final ConcurrentHashMap<String, ContainerPlacementMetadata> actions;
    private final BoundedLinkedHashSet<UUID> placementRequestsCache;
    private final Optional<StandbyContainerManager> standbyContainerManager;
    private final LocalityManager localityManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.samza.clustermanager.ContainerManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/samza/clustermanager/ContainerManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$samza$container$placement$ContainerPlacementMessage$StatusCode = new int[ContainerPlacementMessage.StatusCode.values().length];

        static {
            try {
                $SwitchMap$org$apache$samza$container$placement$ContainerPlacementMessage$StatusCode[ContainerPlacementMessage.StatusCode.ACCEPTED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$samza$container$placement$ContainerPlacementMessage$StatusCode[ContainerPlacementMessage.StatusCode.IN_PROGRESS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public ContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore, SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager, boolean z, boolean z2, LocalityManager localityManager, FaultDomainManager faultDomainManager, Config config) {
        Preconditions.checkNotNull(localityManager, "Locality manager cannot be null");
        Preconditions.checkNotNull(faultDomainManager, "Fault domain manager cannot be null");
        this.samzaApplicationState = samzaApplicationState;
        this.clusterResourceManager = clusterResourceManager;
        this.actions = new ConcurrentHashMap<>();
        this.placementRequestsCache = new BoundedLinkedHashSet<>(UUID_CACHE_SIZE);
        this.hostAffinityEnabled = z;
        this.containerPlacementMetadataStore = containerPlacementMetadataStore;
        this.localityManager = localityManager;
        if (z2) {
            this.standbyContainerManager = Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager, localityManager, config, faultDomainManager));
        } else {
            this.standbyContainerManager = Optional.empty();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean handleContainerLaunch(SamzaResourceRequest samzaResourceRequest, String str, SamzaResource samzaResource, ResourceRequestState resourceRequestState, ContainerAllocator containerAllocator) {
        if (hasActiveContainerPlacementAction(samzaResourceRequest.getProcessorId())) {
            String processorId = samzaResourceRequest.getProcessorId();
            ContainerPlacementMetadata containerPlacementMetadata = getPlacementActionMetadata(processorId).get();
            ContainerPlacementMetadata.ContainerStatus containerStatus = containerPlacementMetadata.getContainerStatus();
            if (this.samzaApplicationState.runningProcessors.containsKey(processorId) && containerStatus == ContainerPlacementMetadata.ContainerStatus.RUNNING) {
                LOG.debug("Requesting running container to shutdown due to existing ContainerPlacement action {}", containerPlacementMetadata);
                containerPlacementMetadata.setContainerStatus(ContainerPlacementMetadata.ContainerStatus.STOP_IN_PROGRESS);
                updateContainerPlacementActionStatus(containerPlacementMetadata, ContainerPlacementMessage.StatusCode.IN_PROGRESS, "Active container stop in progress");
                this.clusterResourceManager.stopStreamProcessor(this.samzaApplicationState.runningProcessors.get(processorId));
                return false;
            }
            if (containerStatus == ContainerPlacementMetadata.ContainerStatus.STOP_IN_PROGRESS) {
                LOG.info("Waiting for running container to shutdown due to existing ContainerPlacement action {}", containerPlacementMetadata);
                return false;
            }
            if (containerStatus == ContainerPlacementMetadata.ContainerStatus.STOP_FAILED) {
                LOG.info("Shutdown on running container failed for action {}", containerPlacementMetadata);
                markContainerPlacementActionFailed(containerPlacementMetadata, String.format("failed to stop container on current host %s", containerPlacementMetadata.getSourceHost()));
                resourceRequestState.cancelResourceRequest(samzaResourceRequest);
                return true;
            }
            if (containerStatus == ContainerPlacementMetadata.ContainerStatus.STOPPED) {
                if (!this.standbyContainerManager.isPresent() || this.standbyContainerManager.get().checkStandbyConstraints(samzaResourceRequest.getProcessorId(), samzaResource.getHost())) {
                    LOG.info("Status updated for ContainerPlacement action: ", containerPlacementMetadata);
                    containerAllocator.runStreamProcessor(samzaResourceRequest, str);
                    return true;
                }
                LOG.info("Starting container {} on host {} does not meet standby constraints, falling back to source host placement metadata: {}", new Object[]{samzaResourceRequest.getProcessorId(), str, containerPlacementMetadata});
                this.standbyContainerManager.get().releaseUnstartableContainer(samzaResourceRequest, samzaResource, str, resourceRequestState);
                containerAllocator.requestResource(processorId, containerPlacementMetadata.getSourceHost());
                markContainerPlacementActionFailed(containerPlacementMetadata, String.format("allocated resource %s does not meet standby constraints now, falling back to source host", samzaResource));
                return true;
            }
        }
        if (this.standbyContainerManager.isPresent()) {
            this.standbyContainerManager.get().checkStandbyConstraintsAndRunStreamProcessor(samzaResourceRequest, str, samzaResource, containerAllocator, resourceRequestState);
            return true;
        }
        containerAllocator.runStreamProcessor(samzaResourceRequest, str);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleContainerStop(String str, String str2, String str3, int i, Duration duration, ContainerAllocator containerAllocator) {
        if (hasActiveContainerPlacementAction(str)) {
            ContainerPlacementMetadata containerPlacementMetadata = getPlacementActionMetadata(str).get();
            LOG.info("Setting the container state with Processor ID: {} to be stopped because of existing ContainerPlacement action: {}", str, containerPlacementMetadata);
            containerPlacementMetadata.setContainerStatus(ContainerPlacementMetadata.ContainerStatus.STOPPED);
        } else if (this.standbyContainerManager.isPresent()) {
            this.standbyContainerManager.get().handleContainerStop(str, str2, str3, i, containerAllocator, duration);
        } else {
            containerAllocator.requestResourceWithDelay(str, str3, duration);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleContainerLaunchFail(String str, String str2, String str3, ContainerAllocator containerAllocator) {
        if (str != null && hasActiveContainerPlacementAction(str)) {
            ContainerPlacementMetadata containerPlacementMetadata = getPlacementActionMetadata(str).get();
            String sourceHost = this.hostAffinityEnabled ? containerPlacementMetadata.getSourceHost() : "ANY_HOST";
            markContainerPlacementActionFailed(containerPlacementMetadata, String.format("failed to start container on destination host %s, attempting to start on source host %s", str3, sourceHost));
            containerAllocator.requestResource(str, sourceHost);
            return;
        }
        if (str != null && this.standbyContainerManager.isPresent()) {
            this.standbyContainerManager.get().handleContainerLaunchFail(str, str2, containerAllocator);
        } else if (str == null) {
            LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. Ignoring invalid/redundant notification.", str2, str3);
        } else {
            LOG.info("Falling back to ANY_HOST for Processor ID: {} since launch failed for Container ID: {} on host: {}", new Object[]{str, str2, str3});
            containerAllocator.requestResource(str, "ANY_HOST");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleContainerStopFail(String str, String str2, String str3, ContainerAllocator containerAllocator) {
        if (str != null && hasActiveContainerPlacementAction(str)) {
            getPlacementActionMetadata(str).get().setContainerStatus(ContainerPlacementMetadata.ContainerStatus.STOP_FAILED);
        } else if (str == null || !this.standbyContainerManager.isPresent()) {
            LOG.warn("Did not find a running Processor ID for Container ID: {} on host: {}. Ignoring invalid/redundant notification.", str2, str3);
        } else {
            this.standbyContainerManager.get().handleContainerStopFail(str, str2, containerAllocator);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleContainerLaunchSuccess(String str, String str2) {
        if (hasActiveContainerPlacementAction(str)) {
            ContainerPlacementMetadata containerPlacementMetadata = getPlacementActionMetadata(str).get();
            containerPlacementMetadata.setContainerStatus(ContainerPlacementMetadata.ContainerStatus.RUNNING);
            updateContainerPlacementActionStatus(containerPlacementMetadata, ContainerPlacementMessage.StatusCode.SUCCEEDED, "Successfully completed the container placement action started container on host " + str2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void handleExpiredRequest(String str, String str2, SamzaResourceRequest samzaResourceRequest, ContainerAllocator containerAllocator, ResourceRequestState resourceRequestState) {
        boolean hasAllocatedResource = containerAllocator.hasAllocatedResource("ANY_HOST");
        if (hasActiveContainerPlacementAction(str)) {
            resourceRequestState.cancelResourceRequest(samzaResourceRequest);
            markContainerPlacementActionFailed(getPlacementActionMetadata(str).get(), "failed the ContainerPlacement action because request for resources to ClusterManager expired");
            return;
        }
        if (this.hostAffinityEnabled) {
            if (this.standbyContainerManager.isPresent()) {
                this.standbyContainerManager.get().handleExpiredResourceRequest(str, samzaResourceRequest, Optional.ofNullable(containerAllocator.peekAllocatedResource("ANY_HOST")), containerAllocator, resourceRequestState);
                return;
            }
            if (hasAllocatedResource) {
                LOG.info("Request for Processor ID: {} on host: {} has expired. Running on ANY_HOST", str, str2);
                containerAllocator.runStreamProcessor(samzaResourceRequest, "ANY_HOST");
            } else {
                LOG.info("Request for Processor ID: {} on host: {} has expired. Requesting additional resources on ANY_HOST.", str, str2);
                resourceRequestState.cancelResourceRequest(samzaResourceRequest);
                containerAllocator.requestResource(str, "ANY_HOST");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleExpiredResource(SamzaResourceRequest samzaResourceRequest, SamzaResource samzaResource, String str, ResourceRequestState resourceRequestState, ContainerAllocator containerAllocator) {
        LOG.info("Allocated resource {} has expired for Processor ID: {} request: {}. Re-requesting resource again", new Object[]{samzaResource, samzaResourceRequest.getProcessorId(), samzaResourceRequest});
        resourceRequestState.releaseUnstartableContainer(samzaResource, str);
        resourceRequestState.cancelResourceRequest(samzaResourceRequest);
        SamzaResourceRequest resourceRequest = containerAllocator.getResourceRequest(samzaResourceRequest.getProcessorId(), samzaResourceRequest.getPreferredHost());
        if (hasActiveContainerPlacementAction(resourceRequest.getProcessorId())) {
            getPlacementActionMetadata(samzaResourceRequest.getProcessorId()).get().recordResourceRequest(resourceRequest);
        }
        containerAllocator.issueResourceRequest(resourceRequest);
    }

    public void registerContainerPlacementAction(ContainerPlacementRequestMessage containerPlacementRequestMessage, ContainerAllocator containerAllocator) {
        String processorId = containerPlacementRequestMessage.getProcessorId();
        String destinationHost = containerPlacementRequestMessage.getDestinationHost();
        if (deQueueAction(containerPlacementRequestMessage)) {
            LOG.info("ContainerPlacement action is de-queued metadata: {}", containerPlacementRequestMessage);
            Pair<ContainerPlacementMessage.StatusCode, String> validatePlacementAction = validatePlacementAction(containerPlacementRequestMessage);
            this.placementRequestsCache.put(containerPlacementRequestMessage.getUuid());
            this.containerPlacementMetadataStore.deleteContainerPlacementRequestMessage(containerPlacementRequestMessage.getUuid());
            if (validatePlacementAction.getKey() == ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
                LOG.info("Status updated for ContainerPlacement action request: {} response: {}", containerPlacementRequestMessage, validatePlacementAction.getValue());
                writeContainerPlacementResponseMessage(containerPlacementRequestMessage, (ContainerPlacementMessage.StatusCode) validatePlacementAction.getKey(), (String) validatePlacementAction.getValue());
                return;
            }
            if (destinationHost.equals(FORCE_RESTART_LAST_SEEN)) {
                LOG.info("Issuing a force restart for Processor ID: {} for ContainerPlacement action request {}", processorId, containerPlacementRequestMessage);
                this.clusterResourceManager.stopStreamProcessor(this.samzaApplicationState.runningProcessors.get(processorId));
                writeContainerPlacementResponseMessage(containerPlacementRequestMessage, ContainerPlacementMessage.StatusCode.SUCCEEDED, "Successfully issued a stop container request falling back to normal restart path");
                return;
            }
            if (destinationHost.equals(LAST_SEEN)) {
                String sourceHostForContainer = getSourceHostForContainer(containerPlacementRequestMessage);
                LOG.info("Changing the requested host for placement action to {} because requested host is LAST_SEEN", sourceHostForContainer);
                destinationHost = sourceHostForContainer;
            }
            if (!this.hostAffinityEnabled) {
                LOG.info("Changing the requested host for placement action to {} because host affinity is disabled", "ANY_HOST");
                destinationHost = "ANY_HOST";
            }
            ContainerPlacementMetadata containerPlacementMetadata = new ContainerPlacementMetadata(containerPlacementRequestMessage, getSourceHostForContainer(containerPlacementRequestMessage));
            this.actions.put(processorId, containerPlacementMetadata);
            if (this.samzaApplicationState.failedProcessors.containsKey(containerPlacementRequestMessage.getProcessorId())) {
                containerPlacementMetadata.setContainerStatus(ContainerPlacementMetadata.ContainerStatus.STOPPED);
            }
            SamzaResourceRequest resourceRequest = containerAllocator.getResourceRequest(processorId, destinationHost);
            containerPlacementMetadata.recordResourceRequest(resourceRequest);
            this.actions.put(processorId, containerPlacementMetadata);
            updateContainerPlacementActionStatus(containerPlacementMetadata, ContainerPlacementMessage.StatusCode.IN_PROGRESS, "Preferred Resources requested");
            containerAllocator.issueResourceRequest(resourceRequest);
        }
    }

    @VisibleForTesting
    ContainerPlacementMetadata registerContainerPlacementActionForTest(ContainerPlacementRequestMessage containerPlacementRequestMessage, ContainerAllocator containerAllocator) {
        registerContainerPlacementAction(containerPlacementRequestMessage, containerAllocator);
        if (hasActiveContainerPlacementAction(containerPlacementRequestMessage.getProcessorId())) {
            return getPlacementActionMetadata(containerPlacementRequestMessage.getProcessorId()).get();
        }
        return null;
    }

    public Optional<Duration> getActionExpiryTimeout(SamzaResourceRequest samzaResourceRequest) {
        for (ContainerPlacementMetadata containerPlacementMetadata : this.actions.values()) {
            if (containerPlacementMetadata.containsResourceRequest(samzaResourceRequest) && containerPlacementMetadata.getActionStatus() == ContainerPlacementMessage.StatusCode.IN_PROGRESS) {
                return containerPlacementMetadata.getRequestActionExpiryTimeout();
            }
        }
        return Optional.empty();
    }

    private void markContainerPlacementActionFailed(ContainerPlacementMetadata containerPlacementMetadata, String str) {
        this.samzaApplicationState.failedContainerPlacementActions.incrementAndGet();
        updateContainerPlacementActionStatus(containerPlacementMetadata, ContainerPlacementMessage.StatusCode.FAILED, str);
    }

    private boolean hasActiveContainerPlacementAction(String str) {
        Optional<ContainerPlacementMetadata> placementActionMetadata = getPlacementActionMetadata(str);
        if (!placementActionMetadata.isPresent()) {
            return false;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$samza$container$placement$ContainerPlacementMessage$StatusCode[placementActionMetadata.get().getActionStatus().ordinal()]) {
            case 1:
            case CoordinatorStreamMessage.KEY_INDEX /* 2 */:
                return true;
            default:
                return false;
        }
    }

    private Optional<ContainerPlacementMetadata> getPlacementActionMetadata(String str) {
        return Optional.ofNullable(this.actions.get(str));
    }

    private void updateContainerPlacementActionStatus(ContainerPlacementMetadata containerPlacementMetadata, ContainerPlacementMessage.StatusCode statusCode, String str) {
        containerPlacementMetadata.setActionStatus(statusCode, str);
        writeContainerPlacementResponseMessage(containerPlacementMetadata.getRequestMessage(), statusCode, str);
        LOG.info("Status updated for ContainerPlacement action: {}", containerPlacementMetadata);
    }

    private void writeContainerPlacementResponseMessage(ContainerPlacementRequestMessage containerPlacementRequestMessage, ContainerPlacementMessage.StatusCode statusCode, String str) {
        this.containerPlacementMetadataStore.writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage.fromContainerPlacementRequestMessage(containerPlacementRequestMessage, statusCode, str, System.currentTimeMillis()));
    }

    private String getSourceHostForContainer(ContainerPlacementRequestMessage containerPlacementRequestMessage) {
        String str;
        String processorId = containerPlacementRequestMessage.getProcessorId();
        if (this.samzaApplicationState.runningProcessors.containsKey(processorId)) {
            SamzaResource samzaResource = this.samzaApplicationState.runningProcessors.get(processorId);
            LOG.info("Processor ID: {} matched a running container with containerId ID: {} is running on host: {} for ContainerPlacement action: {}", new Object[]{processorId, samzaResource.getContainerId(), samzaResource.getHost(), containerPlacementRequestMessage});
            str = samzaResource.getHost();
        } else {
            str = (String) Optional.ofNullable(this.localityManager.readLocality().getProcessorLocality(processorId)).map((v0) -> {
                return v0.host();
            }).orElse(null);
            LOG.info("Processor ID: {} is not running and was last seen on host: {} for ContainerPlacement action: {}", new Object[]{processorId, str, containerPlacementRequestMessage});
        }
        return str;
    }

    private boolean deQueueAction(ContainerPlacementRequestMessage containerPlacementRequestMessage) {
        if (checkIfActiveOrStandbyContainerHasActivePlacementAction(containerPlacementRequestMessage)) {
            return false;
        }
        if (this.samzaApplicationState.failedProcessors.containsKey(containerPlacementRequestMessage.getProcessorId())) {
            LOG.info("ContainerPlacement request: {} is de-queued, container with Processor ID: {} has exhausted all retries and is in failed state", containerPlacementRequestMessage, containerPlacementRequestMessage.getProcessorId());
            return true;
        }
        if (this.samzaApplicationState.runningProcessors.containsKey(containerPlacementRequestMessage.getProcessorId()) && !this.samzaApplicationState.pendingProcessors.containsKey(containerPlacementRequestMessage.getProcessorId())) {
            return true;
        }
        LOG.info("ContainerPlacement request: {} is en-queued because container is pending start", containerPlacementRequestMessage);
        return false;
    }

    private Pair<ContainerPlacementMessage.StatusCode, String> validatePlacementAction(ContainerPlacementRequestMessage containerPlacementRequestMessage) {
        String str = ContainerPlacementMessage.StatusCode.BAD_REQUEST + " reason: %s";
        Boolean bool = false;
        String str2 = null;
        boolean containsKey = this.samzaApplicationState.runningProcessors.containsKey(containerPlacementRequestMessage.getProcessorId());
        boolean containsKey2 = this.samzaApplicationState.pendingProcessors.containsKey(containerPlacementRequestMessage.getProcessorId());
        boolean containsKey3 = this.samzaApplicationState.failedProcessors.containsKey(containerPlacementRequestMessage.getProcessorId());
        if (!containsKey && !containsKey2 && !containsKey3) {
            str2 = String.format(str, "invalid processor id neither in running, pending or failed processors");
            bool = true;
        } else if (this.placementRequestsCache.containsKey(containerPlacementRequestMessage.getUuid())) {
            str2 = String.format(str, "duplicate UUID of the request, please retry");
            bool = true;
        } else if (this.standbyContainerManager.isPresent() && !this.standbyContainerManager.get().checkStandbyConstraints(containerPlacementRequestMessage.getProcessorId(), containerPlacementRequestMessage.getDestinationHost())) {
            str2 = String.format(str, "destination host does not meet standby constraints");
            bool = true;
        }
        return bool.booleanValue() ? new ImmutablePair(ContainerPlacementMessage.StatusCode.BAD_REQUEST, str2) : new ImmutablePair(ContainerPlacementMessage.StatusCode.ACCEPTED, "Request is accepted");
    }

    private boolean checkIfActiveOrStandbyContainerHasActivePlacementAction(ContainerPlacementRequestMessage containerPlacementRequestMessage) {
        String processorId = containerPlacementRequestMessage.getProcessorId();
        if (hasActiveContainerPlacementAction(processorId)) {
            LOG.info("ContainerPlacement request: {} is en-queued because container has an in-progress placement action", containerPlacementRequestMessage);
            return true;
        }
        if (!this.standbyContainerManager.isPresent()) {
            return false;
        }
        if (StandbyTaskUtil.isStandbyContainer(processorId) && hasActiveContainerPlacementAction(StandbyTaskUtil.getActiveContainerId(processorId))) {
            LOG.info("ContainerPlacement request: {} is en-queued because its active container has an in-progress placement action", containerPlacementRequestMessage);
            return true;
        }
        if (StandbyTaskUtil.isStandbyContainer(processorId)) {
            return false;
        }
        Iterator<String> it = this.standbyContainerManager.get().getStandbyList(processorId).iterator();
        while (it.hasNext()) {
            if (hasActiveContainerPlacementAction(it.next())) {
                LOG.info("ContainerPlacement request: {} is en-queued because one of its standby replica has an in-progress placement action", containerPlacementRequestMessage);
                return true;
            }
        }
        return false;
    }
}
