package org.apache.samza.clustermanager;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.job.model.JobModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/clustermanager/StandbyContainerManager.class */
public class StandbyContainerManager {
    private static final Logger log = LoggerFactory.getLogger(StandbyContainerManager.class);
    private final SamzaApplicationState samzaApplicationState;
    private final LocalityManager localityManager;
    private ClusterResourceManager clusterResourceManager;
    private final FaultDomainManager faultDomainManager;
    private final boolean isFaultDomainAwareStandbyEnabled;
    private final Map<String, FailoverMetadata> failovers = new ConcurrentHashMap();
    private final Map<String, List<String>> standbyContainerConstraints = new HashMap();

    /* loaded from: input_file:org/apache/samza/clustermanager/StandbyContainerManager$FailoverMetadata.class */
    public class FailoverMetadata {
        public final String activeContainerID;
        public final String activeContainerResourceID;
        private final Map<String, String> selectedStandbyContainers = new HashMap();
        private final Set<SamzaResourceRequest> resourceRequests = new HashSet();

        public FailoverMetadata(String str, String str2) {
            this.activeContainerID = str;
            this.activeContainerResourceID = str2;
        }

        public synchronized boolean isStandbyResourceUsed(String str) {
            return this.selectedStandbyContainers.keySet().contains(str);
        }

        public synchronized String getStandbyContainerHostname(String str) {
            return this.selectedStandbyContainers.get(str);
        }

        public synchronized void updateStandbyContainer(String str, String str2) {
            this.selectedStandbyContainers.put(str, str2);
        }

        public synchronized void recordResourceRequest(SamzaResourceRequest samzaResourceRequest) {
            this.resourceRequests.add(samzaResourceRequest);
        }

        public synchronized boolean containsResourceRequest(SamzaResourceRequest samzaResourceRequest) {
            return this.resourceRequests.contains(samzaResourceRequest);
        }

        public boolean isStandbyHostUsed(String str) {
            return this.selectedStandbyContainers.values().contains(str) || this.resourceRequests.stream().filter(samzaResourceRequest -> {
                return samzaResourceRequest.getPreferredHost().equals(str);
            }).count() > 0;
        }

        public String toString() {
            return "[activeContainerID: " + this.activeContainerID + " activeContainerResourceID: " + this.activeContainerResourceID + " selectedStandbyContainers:" + this.selectedStandbyContainers + " resourceRequests: " + this.resourceRequests + "]";
        }
    }

    public StandbyContainerManager(SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager, LocalityManager localityManager, Config config, FaultDomainManager faultDomainManager) {
        this.localityManager = localityManager;
        this.samzaApplicationState = samzaApplicationState;
        JobModel jobModel = samzaApplicationState.jobModelManager.jobModel();
        jobModel.getContainers().keySet().forEach(str -> {
            this.standbyContainerConstraints.put(str, StandbyTaskUtil.getStandbyContainerConstraints(str, jobModel));
        });
        this.clusterResourceManager = clusterResourceManager;
        this.faultDomainManager = faultDomainManager;
        this.isFaultDomainAwareStandbyEnabled = new ClusterManagerConfig(config).getFaultDomainAwareStandbyEnabled();
        log.info("Populated standbyContainerConstraints map {}", this.standbyContainerConstraints);
    }

    public void handleContainerStop(String str, String str2, String str3, int i, ContainerAllocator containerAllocator, Duration duration) {
        if (StandbyTaskUtil.isStandbyContainer(str)) {
            handleStandbyContainerStop(str, str2, str3, containerAllocator, duration);
            return;
        }
        switch (i) {
            case SamzaResourceStatus.PREEMPTED /* -102 */:
            case SamzaResourceStatus.DISK_FAIL /* -101 */:
            case SamzaResourceStatus.ABORTED /* -100 */:
                initiateStandbyAwareAllocation(str, str2, containerAllocator);
                return;
            default:
                log.info("Requesting resource for active-container {} on host {}", str, str3);
                SamzaResourceRequest resourceRequestWithDelay = containerAllocator.getResourceRequestWithDelay(str, str3, duration);
                registerActiveContainerFailure(str, str2).recordResourceRequest(resourceRequestWithDelay);
                containerAllocator.issueResourceRequest(resourceRequestWithDelay);
                return;
        }
    }

    public void handleContainerLaunchFail(String str, String str2, ContainerAllocator containerAllocator) {
        if (!StandbyTaskUtil.isStandbyContainer(str)) {
            initiateStandbyAwareAllocation(str, str2, containerAllocator);
            return;
        }
        log.info("Handling launch fail for standby-container {}, requesting resource on any host {}", str);
        requestResource(containerAllocator, str, ResourceRequestState.ANY_HOST, Duration.ZERO, getActiveContainerHost(str).orElse(null));
    }

    public void handleContainerStopFail(String str, String str2, ContainerAllocator containerAllocator) {
        if (!StandbyTaskUtil.isStandbyContainer(str)) {
            throw new SamzaException("Invalid State. Received stop container fail for container Id: " + str);
        }
        log.info("Handling stop fail for standby-container {}, continuing the failover (if present)", str);
        checkIfUsedForFailover(str2).ifPresent(failoverMetadata -> {
            initiateStandbyAwareAllocation(failoverMetadata.activeContainerID, failoverMetadata.activeContainerResourceID, containerAllocator);
        });
    }

    public Set<FaultDomain> getAllowedFaultDomainsGivenHostToAvoid(String str) {
        Set<FaultDomain> allFaultDomains = this.faultDomainManager.getAllFaultDomains();
        Optional ofNullable = Optional.ofNullable(str);
        FaultDomainManager faultDomainManager = this.faultDomainManager;
        faultDomainManager.getClass();
        allFaultDomains.removeAll((Set) ofNullable.map(faultDomainManager::getFaultDomainsForHost).orElse(Collections.emptySet()));
        return allFaultDomains;
    }

    private void handleStandbyContainerStop(String str, String str2, String str3, ContainerAllocator containerAllocator, Duration duration) {
        Optional<FailoverMetadata> checkIfUsedForFailover = checkIfUsedForFailover(str2);
        if (!checkIfUsedForFailover.isPresent()) {
            log.info("Issuing request for standby container {} on host {}, since this is not for a failover", str, str3);
            requestResource(containerAllocator, str, str3, duration, getActiveContainerHost(str).orElse(null));
            return;
        }
        String str4 = checkIfUsedForFailover.get().activeContainerID;
        String standbyContainerHostname = checkIfUsedForFailover.get().getStandbyContainerHostname(str2);
        log.info("Requesting resource for active container {} on host {}, and backup container {} on any host", new Object[]{str4, standbyContainerHostname, str});
        SamzaResourceRequest resourceRequestWithDelay = containerAllocator.getResourceRequestWithDelay(str4, standbyContainerHostname, duration);
        checkIfUsedForFailover.get().recordResourceRequest(resourceRequestWithDelay);
        containerAllocator.issueResourceRequest(resourceRequestWithDelay);
        requestResource(containerAllocator, str, ResourceRequestState.ANY_HOST, Duration.ZERO, standbyContainerHostname);
    }

    private void initiateStandbyAwareAllocation(String str, String str2, ContainerAllocator containerAllocator) {
        String selectStandbyHost = selectStandbyHost(str, str2);
        if (selectStandbyHost.equals(ResourceRequestState.ANY_HOST)) {
            log.info("No standby container found for active container {}, making a resource-request for placing {} on {}, active's resourceID: {}", new Object[]{str, str, ResourceRequestState.ANY_HOST, str2});
            this.samzaApplicationState.failoversToAnyHost.incrementAndGet();
            containerAllocator.requestResource(str, ResourceRequestState.ANY_HOST);
            return;
        }
        List<String> list = this.standbyContainerConstraints.get(str);
        HashMap hashMap = new HashMap();
        this.samzaApplicationState.runningProcessors.forEach((str3, samzaResource) -> {
            if (list.contains(str3) && samzaResource.getHost().equals(selectStandbyHost)) {
                hashMap.put(str3, samzaResource);
            }
        });
        if (!hashMap.isEmpty()) {
            FailoverMetadata registerActiveContainerFailure = registerActiveContainerFailure(str, str2);
            hashMap.forEach((str4, samzaResource2) -> {
                log.info("Initiating failover and stopping standby container, found standbyContainer {} = resource {}, for active container {}", new Object[]{hashMap.keySet(), hashMap.values(), str});
                registerActiveContainerFailure.updateStandbyContainer(samzaResource2.getContainerId(), samzaResource2.getHost());
                this.samzaApplicationState.failoversToStandby.incrementAndGet();
                this.clusterResourceManager.stopStreamProcessor(samzaResource2);
            });
            if (hashMap.size() > 1) {
                throw new SamzaException("Invalid State. Multiple standby containers found running on one host:" + hashMap);
            }
            return;
        }
        log.info("No running standby container to stop on host {}, making a resource-request for placing {} on {}, active's resourceID: {}", new Object[]{selectStandbyHost, str, selectStandbyHost, str2});
        FailoverMetadata registerActiveContainerFailure2 = registerActiveContainerFailure(str, str2);
        SamzaResourceRequest resourceRequest = containerAllocator.getResourceRequest(str, selectStandbyHost);
        registerActiveContainerFailure2.recordResourceRequest(resourceRequest);
        containerAllocator.issueResourceRequest(resourceRequest);
        this.samzaApplicationState.failoversToStandby.incrementAndGet();
    }

    private String selectStandbyHost(String str, String str2) {
        log.info("Standby containers {} for active container {} (resourceID {})", new Object[]{this.standbyContainerConstraints.get(str), str, str2});
        Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(str2);
        for (String str3 : this.standbyContainerConstraints.get(str)) {
            if (this.samzaApplicationState.runningProcessors.containsKey(str3)) {
                SamzaResource samzaResource = this.samzaApplicationState.runningProcessors.get(str3);
                if (!failoverMetadata.isPresent() || !failoverMetadata.get().isStandbyResourceUsed(samzaResource.getContainerId())) {
                    log.info("Returning standby container {} in running state on host {} for active container {}", new Object[]{str3, samzaResource.getHost(), str});
                    return samzaResource.getHost();
                }
            }
        }
        log.info("Did not find any running standby container for active container {}", str);
        for (String str4 : this.standbyContainerConstraints.get(str)) {
            String str5 = (String) Optional.ofNullable(this.localityManager.readLocality().getProcessorLocality(str4)).map((v0) -> {
                return v0.host();
            }).orElse(null);
            if (StringUtils.isBlank(str5)) {
                log.info("No last known standbyHost for container {}", str4);
            } else {
                if (!failoverMetadata.isPresent() || !failoverMetadata.get().isStandbyHostUsed(str5)) {
                    log.info("Returning standby host {} for active container {}", str5, str);
                    return str5;
                }
                log.info("Not using standby host {} for active container {} because it had already been selected", str5, str);
            }
        }
        log.info("Did not find any standby host for active container {}, returning any-host", str);
        return ResourceRequestState.ANY_HOST;
    }

    private FailoverMetadata registerActiveContainerFailure(String str, String str2) {
        FailoverMetadata failoverMetadata = this.failovers.containsKey(str2) ? this.failovers.get(str2) : new FailoverMetadata(str, str2);
        this.failovers.put(str2, failoverMetadata);
        return failoverMetadata;
    }

    private Optional<FailoverMetadata> checkIfUsedForFailover(String str) {
        if (str == null) {
            return Optional.empty();
        }
        for (FailoverMetadata failoverMetadata : this.failovers.values()) {
            if (failoverMetadata.isStandbyResourceUsed(str)) {
                log.info("Standby container with resource id {} was selected for failover of active container {}", str, failoverMetadata.activeContainerID);
                return Optional.of(failoverMetadata);
            }
        }
        return Optional.empty();
    }

    void requestResource(ContainerAllocator containerAllocator, String str, String str2, Duration duration, String str3) {
        if (StandbyTaskUtil.isStandbyContainer(str) && this.isFaultDomainAwareStandbyEnabled) {
            containerAllocator.requestResourceWithDelay(str, str2, duration, getAllowedFaultDomainsGivenHostToAvoid(str3));
        } else {
            containerAllocator.requestResourceWithDelay(str, str2, duration, new HashSet());
        }
    }

    Optional<String> getActiveContainerHost(String str) {
        String str2 = str;
        if (StandbyTaskUtil.isStandbyContainer(str)) {
            str2 = StandbyTaskUtil.getActiveContainerId(str);
        }
        SamzaResource samzaResource = this.samzaApplicationState.pendingProcessors.get(str2);
        if (samzaResource == null) {
            samzaResource = this.samzaApplicationState.runningProcessors.get(str2);
        }
        return Optional.ofNullable(samzaResource).map((v0) -> {
            return v0.getHost();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean checkStandbyConstraints(String str, String str2) {
        for (String str3 : this.standbyContainerConstraints.get(str)) {
            SamzaResource samzaResource = this.samzaApplicationState.pendingProcessors.get(str3);
            if (samzaResource != null && this.isFaultDomainAwareStandbyEnabled && this.faultDomainManager.hasSameFaultDomains(str2, samzaResource.getHost())) {
                log.info("Container {} cannot be started on host {} because container {} is already scheduled on this fault domain", new Object[]{str, str2, str3});
                if (!StandbyTaskUtil.isStandbyContainer(str)) {
                    return false;
                }
                this.samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
                return false;
            }
            if (samzaResource != null && samzaResource.getHost().equals(str2)) {
                log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host", new Object[]{str, str2, str3});
                return false;
            }
            SamzaResource samzaResource2 = this.samzaApplicationState.runningProcessors.get(str3);
            if (samzaResource2 != null && this.isFaultDomainAwareStandbyEnabled && this.faultDomainManager.hasSameFaultDomains(str2, samzaResource2.getHost())) {
                log.info("Container {} cannot be started on host {} because container {} is already running on this fault domain", new Object[]{str, str2, str3});
                if (!StandbyTaskUtil.isStandbyContainer(str)) {
                    return false;
                }
                this.samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
                return false;
            }
            if (samzaResource2 != null && samzaResource2.getHost().equals(str2)) {
                log.info("Container {} cannot be started on host {} because container {} is already running on this host", new Object[]{str, str2, str3});
                return false;
            }
        }
        return true;
    }

    public void checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest samzaResourceRequest, String str, SamzaResource samzaResource, ContainerAllocator containerAllocator, ResourceRequestState resourceRequestState) {
        String processorId = samzaResourceRequest.getProcessorId();
        if (checkStandbyConstraints(processorId, samzaResource.getHost())) {
            log.info("Running container {} on {} meets standby constraints, preferredHost = {}", new Object[]{processorId, samzaResource.getHost(), str});
            containerAllocator.runStreamProcessor(samzaResourceRequest, str);
            if (this.isFaultDomainAwareStandbyEnabled && StandbyTaskUtil.isStandbyContainer(processorId)) {
                this.samzaApplicationState.faultDomainAwareContainersStarted.incrementAndGet();
                return;
            }
            return;
        }
        if (StandbyTaskUtil.isStandbyContainer(processorId)) {
            log.info("Running standby container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request", processorId, samzaResource.getHost());
            releaseUnstartableContainer(samzaResourceRequest, samzaResource, str, resourceRequestState);
            requestResource(containerAllocator, processorId, ResourceRequestState.ANY_HOST, Duration.ZERO, getActiveContainerHost(processorId).orElse(null));
            this.samzaApplicationState.failedStandbyAllocations.incrementAndGet();
            return;
        }
        log.warn("Running active container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource", processorId, samzaResource.getHost());
        releaseUnstartableContainer(samzaResourceRequest, samzaResource, str, resourceRequestState);
        Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(samzaResourceRequest);
        initiateStandbyAwareAllocation(processorId, failoverMetadata.isPresent() ? failoverMetadata.get().activeContainerResourceID : "unknown-" + processorId, containerAllocator);
        this.samzaApplicationState.failedStandbyAllocations.incrementAndGet();
    }

    public void handleExpiredResourceRequest(String str, SamzaResourceRequest samzaResourceRequest, Optional<SamzaResource> optional, ContainerAllocator containerAllocator, ResourceRequestState resourceRequestState) {
        if (StandbyTaskUtil.isStandbyContainer(str)) {
            handleExpiredRequestForStandbyContainer(str, samzaResourceRequest, optional, containerAllocator, resourceRequestState);
        } else {
            handleExpiredRequestForActiveContainer(str, samzaResourceRequest, containerAllocator, resourceRequestState);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<String> getStandbyList(String str) {
        return this.standbyContainerConstraints.get(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseUnstartableContainer(SamzaResourceRequest samzaResourceRequest, SamzaResource samzaResource, String str, ResourceRequestState resourceRequestState) {
        resourceRequestState.releaseUnstartableContainer(samzaResource, str);
        resourceRequestState.cancelResourceRequest(samzaResourceRequest);
    }

    private void handleExpiredRequestForStandbyContainer(String str, SamzaResourceRequest samzaResourceRequest, Optional<SamzaResource> optional, ContainerAllocator containerAllocator, ResourceRequestState resourceRequestState) {
        if (optional.isPresent()) {
            log.info("Handling expired request, standby container {} can be started on alternative resource {}", str, optional.get());
            checkStandbyConstraintsAndRunStreamProcessor(samzaResourceRequest, ResourceRequestState.ANY_HOST, optional.get(), containerAllocator, resourceRequestState);
        } else {
            log.info("Handling expired request, requesting anyHost resource for standby container {}", str);
            resourceRequestState.cancelResourceRequest(samzaResourceRequest);
            requestResource(containerAllocator, str, ResourceRequestState.ANY_HOST, Duration.ZERO, getActiveContainerHost(str).orElse(null));
        }
    }

    private void handleExpiredRequestForActiveContainer(String str, SamzaResourceRequest samzaResourceRequest, ContainerAllocator containerAllocator, ResourceRequestState resourceRequestState) {
        log.info("Handling expired request for active container {}", str);
        Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(samzaResourceRequest);
        resourceRequestState.cancelResourceRequest(samzaResourceRequest);
        String str2 = failoverMetadata.isPresent() ? failoverMetadata.get().activeContainerResourceID : "unknown-" + str;
        log.info("Handling expired request for active container {}, lastKnownResourceID is {}", str, str2);
        initiateStandbyAwareAllocation(str, str2, containerAllocator);
    }

    private Optional<FailoverMetadata> getFailoverMetadata(String str) {
        return this.failovers.containsKey(str) ? Optional.of(this.failovers.get(str)) : Optional.empty();
    }

    private Optional<FailoverMetadata> getFailoverMetadata(SamzaResourceRequest samzaResourceRequest) {
        for (FailoverMetadata failoverMetadata : this.failovers.values()) {
            if (failoverMetadata.containsResourceRequest(samzaResourceRequest)) {
                return Optional.of(failoverMetadata);
            }
        }
        return Optional.empty();
    }

    public String toString() {
        return this.failovers.toString();
    }
}
