package org.apache.samza.clustermanager;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.job.CommandBuilder;
import org.apache.samza.job.ShellCommandBuilder;
import org.apache.samza.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/clustermanager/ContainerAllocator.class */
public class ContainerAllocator implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ContainerAllocator.class);
    private volatile boolean isRunning = true;
    private final boolean hostAffinityEnabled;
    private final TaskConfig taskConfig;
    private final Config config;
    protected final ClusterResourceManager clusterResourceManager;
    protected final int allocatorSleepIntervalMs;
    protected final int containerMemoryMb;
    protected final int containerNumCpuCores;
    protected final SamzaApplicationState state;
    protected final ResourceRequestState resourceRequestState;
    private final int configuredRequestExpiryTimeout;
    private final ContainerManager containerManager;

    public ContainerAllocator(ClusterResourceManager clusterResourceManager, Config config, SamzaApplicationState samzaApplicationState, boolean z, ContainerManager containerManager) {
        ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
        this.clusterResourceManager = clusterResourceManager;
        this.allocatorSleepIntervalMs = clusterManagerConfig.getAllocatorSleepTime();
        this.resourceRequestState = new ResourceRequestState(z, this.clusterResourceManager);
        this.containerMemoryMb = clusterManagerConfig.getContainerMemoryMb();
        this.containerNumCpuCores = clusterManagerConfig.getNumCores();
        this.taskConfig = new TaskConfig(config);
        this.state = samzaApplicationState;
        this.config = config;
        this.hostAffinityEnabled = z;
        this.containerManager = containerManager;
        this.configuredRequestExpiryTimeout = clusterManagerConfig.getContainerRequestTimeout();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.isRunning) {
            try {
                assignResourceRequests();
                this.resourceRequestState.sendPendingDelayedResourceRequests();
                this.resourceRequestState.releaseExtraResources();
                Thread.sleep(this.allocatorSleepIntervalMs);
            } catch (InterruptedException e) {
                LOG.warn("Got InterruptedException in AllocatorThread.", e);
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                LOG.error("Got unknown Exception in AllocatorThread.", e2);
            }
        }
    }

    void assignResourceRequests() {
        while (hasReadyPendingRequest()) {
            SamzaResourceRequest samzaResourceRequest = peekReadyPendingRequest().get();
            String processorId = samzaResourceRequest.getProcessorId();
            String preferredHost = this.hostAffinityEnabled ? samzaResourceRequest.getPreferredHost() : ResourceRequestState.ANY_HOST;
            Instant requestTimestamp = samzaResourceRequest.getRequestTimestamp();
            LOG.info("Handling assignment for Processor ID: {} with request {}", processorId, samzaResourceRequest);
            if (hasAllocatedResource(preferredHost)) {
                LOG.info("Found an available container for Processor ID: {} on the host: {}", processorId, preferredHost);
                if (this.hostAffinityEnabled) {
                    this.state.matchedResourceRequests.incrementAndGet();
                }
                if (!this.containerManager.handleContainerLaunch(samzaResourceRequest, preferredHost, peekAllocatedResource(preferredHost), this.resourceRequestState, this)) {
                    return;
                }
            } else {
                LOG.info("Did not find any allocated containers for running Processor ID: {} on the host: {}.", processorId, preferredHost);
                if (!isRequestExpired(samzaResourceRequest)) {
                    LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet.Request creation time: {}. Current Time: {}. Request timeout: {} ms", new Object[]{processorId, preferredHost, requestTimestamp, Long.valueOf(System.currentTimeMillis()), Long.valueOf(getRequestTimeout(samzaResourceRequest).toMillis())});
                    return;
                }
                updateExpiryMetrics(samzaResourceRequest);
                this.containerManager.handleExpiredRequest(processorId, preferredHost, samzaResourceRequest, this, this.resourceRequestState);
                if (!this.hostAffinityEnabled) {
                    LOG.info("Waiting for resources to get allocated for request {}, no retries will be issued since host affinity is disabled", samzaResourceRequest);
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runStreamProcessor(SamzaResourceRequest samzaResourceRequest, String str) {
        CommandBuilder commandBuilder = getCommandBuilder(samzaResourceRequest.getProcessorId());
        SamzaResource peekAllocatedResource = peekAllocatedResource(str);
        if (peekAllocatedResource == null) {
            throw new SamzaException("Expected resource for Processor ID: " + samzaResourceRequest.getProcessorId() + " was unavailable on host: " + str);
        }
        if (this.clusterResourceManager.isResourceExpired(peekAllocatedResource)) {
            this.containerManager.handleExpiredResource(samzaResourceRequest, peekAllocatedResource, str, this.resourceRequestState, this);
            return;
        }
        this.resourceRequestState.updateStateAfterAssignment(samzaResourceRequest, str, peekAllocatedResource);
        String processorId = samzaResourceRequest.getProcessorId();
        LOG.info("Found Container ID: {} for Processor ID: {} on host: {} for request creation time: {}.", new Object[]{peekAllocatedResource.getContainerId(), processorId, str, samzaResourceRequest.getRequestTimestamp()});
        this.state.failedProcessors.remove(processorId);
        this.state.pendingProcessors.put(processorId, peekAllocatedResource);
        this.clusterResourceManager.launchStreamProcessor(peekAllocatedResource, commandBuilder);
    }

    public void requestResources(Map<String, String> map) {
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            if (!this.hostAffinityEnabled) {
                value = ResourceRequestState.ANY_HOST;
            } else if (value == null) {
                LOG.info("No preferred host mapping found for Processor ID: {}. Requesting resource on ANY_HOST", key);
                value = ResourceRequestState.ANY_HOST;
            }
            requestResource(key, value);
        }
    }

    protected final boolean hasReadyPendingRequest() {
        return peekReadyPendingRequest().isPresent();
    }

    protected final Optional<SamzaResourceRequest> peekReadyPendingRequest() {
        return Optional.ofNullable(this.resourceRequestState.peekPendingRequest());
    }

    public final void requestResource(String str, String str2) {
        requestResourceWithDelay(str, str2, Duration.ZERO);
    }

    public final void requestResource(String str, String str2, Set<FaultDomain> set) {
        requestResourceWithDelay(str, str2, Duration.ZERO, set);
    }

    public final void requestResourceWithDelay(String str, String str2, Duration duration) {
        issueResourceRequest(getResourceRequestWithDelay(str, str2, duration));
    }

    public final void requestResourceWithDelay(String str, String str2, Duration duration, Set<FaultDomain> set) {
        issueResourceRequest(getResourceRequestWithDelay(str, str2, duration, set));
    }

    public final SamzaResourceRequest getResourceRequest(String str, String str2) {
        return getResourceRequestWithDelay(str, str2, Duration.ZERO);
    }

    public final SamzaResourceRequest getResourceRequest(String str, String str2, Set<FaultDomain> set) {
        return getResourceRequestWithDelay(str, str2, Duration.ZERO, set);
    }

    public final SamzaResourceRequest getResourceRequestWithDelay(String str, String str2, Duration duration) {
        return new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb, str2, str, Instant.now().plus((TemporalAmount) duration));
    }

    public final SamzaResourceRequest getResourceRequestWithDelay(String str, String str2, Duration duration, Set<FaultDomain> set) {
        return new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb, str2, str, Instant.now().plus((TemporalAmount) duration), set);
    }

    public final void issueResourceRequest(SamzaResourceRequest samzaResourceRequest) {
        this.resourceRequestState.addResourceRequest(samzaResourceRequest);
        this.state.containerRequests.incrementAndGet();
        if (ResourceRequestState.ANY_HOST.equals(samzaResourceRequest.getPreferredHost())) {
            this.state.anyHostRequests.incrementAndGet();
        } else {
            this.state.preferredHostRequests.incrementAndGet();
        }
        if (samzaResourceRequest.getFaultDomains().isEmpty()) {
            return;
        }
        this.state.faultDomainAwareContainerRequests.incrementAndGet();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean hasAllocatedResource(String str) {
        return peekAllocatedResource(str) != null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SamzaResource peekAllocatedResource(String str) {
        return this.resourceRequestState.peekResource(str);
    }

    private CommandBuilder getCommandBuilder(String str) {
        CommandBuilder commandBuilder = (CommandBuilder) ReflectionUtil.getObj(this.taskConfig.getCommandClass(ShellCommandBuilder.class.getName()), CommandBuilder.class);
        commandBuilder.setConfig(this.config).setId(str).setUrl(this.state.jobModelManager.server().getUrl());
        return commandBuilder;
    }

    public final void addResource(SamzaResource samzaResource) {
        this.resourceRequestState.addResource(samzaResource);
    }

    public final void releaseResource(String str) {
        this.resourceRequestState.releaseResource(str);
    }

    public void stop() {
        this.isRunning = false;
    }

    protected boolean isRequestExpired(SamzaResourceRequest samzaResourceRequest) {
        long epochMilli = Instant.now().toEpochMilli();
        boolean z = epochMilli - samzaResourceRequest.getRequestTimestamp().toEpochMilli() > getRequestTimeout(samzaResourceRequest).toMillis();
        if (z) {
            LOG.info("Request for Processor ID: {} on host: {} with creation time: {} has expired at current time: {} after timeout: {} ms.", new Object[]{samzaResourceRequest.getProcessorId(), samzaResourceRequest.getPreferredHost(), samzaResourceRequest.getRequestTimestamp(), Long.valueOf(epochMilli), Long.valueOf(getRequestTimeout(samzaResourceRequest).toMillis())});
        }
        return z;
    }

    private Duration getRequestTimeout(SamzaResourceRequest samzaResourceRequest) {
        return this.containerManager.getActionExpiryTimeout(samzaResourceRequest).orElse(Duration.ofMillis(this.configuredRequestExpiryTimeout));
    }

    private void updateExpiryMetrics(SamzaResourceRequest samzaResourceRequest) {
        if (ResourceRequestState.ANY_HOST.equals(samzaResourceRequest.getPreferredHost())) {
            this.state.expiredAnyHostRequests.incrementAndGet();
        } else {
            this.state.expiredPreferredHostRequests.incrementAndGet();
        }
        if (samzaResourceRequest.getFaultDomains().isEmpty()) {
            return;
        }
        this.state.expiredFaultDomainAwareContainerRequests.incrementAndGet();
    }
}
