package io.digdag.server;

import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import io.digdag.client.config.Config;
import io.digdag.core.database.TransactionManager;
import io.digdag.core.repository.ProjectStoreManager;
import io.digdag.core.repository.ResourceNotFoundException;
import io.digdag.core.repository.StoredProject;
import io.digdag.core.session.SessionStoreManager;
import io.digdag.core.session.StoredSessionAttempt;
import io.digdag.core.session.StoredSessionAttemptWithSession;
import io.digdag.core.session.TaskStateCode;
import io.digdag.spi.Notification;
import io.digdag.spi.NotificationException;
import io.digdag.spi.Notifier;
import io.digdag.util.DurationParam;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/digdag/server/WorkflowExecutionTimeoutEnforcer.class */
public class WorkflowExecutionTimeoutEnforcer {
    private static final Logger logger = LoggerFactory.getLogger(WorkflowExecutionTimeoutEnforcer.class);
    private static final Duration DEFAULT_ATTEMPT_TTL = Duration.ofDays(7);
    private static final Duration DEFAULT_TASK_TTL = Duration.ofDays(1);
    private static final Duration DEFAULT_REAPING_INTERVAL = Duration.ofSeconds(5);
    private static final TaskStateCode[] TASK_TTL_ENFORCED_STATE_CODES = {TaskStateCode.RETRY_WAITING, TaskStateCode.GROUP_RETRY_WAITING, TaskStateCode.RUNNING};
    private final ScheduledExecutorService scheduledExecutorService;
    private final SessionStoreManager ssm;
    private final Notifier notifier;
    private final ProjectStoreManager psm;
    private final TransactionManager tm;
    private final Duration attemptTTL;
    private final Duration reapingInterval;
    private final Duration taskTTL;

    @Inject
    public WorkflowExecutionTimeoutEnforcer(ServerConfig serverConfig, SessionStoreManager sessionStoreManager, TransactionManager transactionManager, Config config, Notifier notifier, ProjectStoreManager projectStoreManager) {
        this.attemptTTL = (Duration) config.getOptional("executor.attempt_ttl", DurationParam.class).transform((v0) -> {
            return v0.getDuration();
        }).or(DEFAULT_ATTEMPT_TTL);
        this.taskTTL = (Duration) config.getOptional("executor.task_ttl", DurationParam.class).transform((v0) -> {
            return v0.getDuration();
        }).or(DEFAULT_TASK_TTL);
        this.reapingInterval = (Duration) config.getOptional("executor.ttl_reaping_interval", DurationParam.class).transform((v0) -> {
            return v0.getDuration();
        }).or(DEFAULT_REAPING_INTERVAL);
        this.ssm = sessionStoreManager;
        this.notifier = notifier;
        this.psm = projectStoreManager;
        this.tm = transactionManager;
        if (serverConfig.getExecutorEnabled()) {
            this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("attempt-timeout-enforcer-%d").build());
        } else {
            this.scheduledExecutorService = null;
        }
    }

    private void run() {
        try {
            enforceAttemptTTLs();
        } catch (Throwable th) {
            logger.error("Uncaught exception when enforcing attempt TTLs. Ignoring. Loop will be retried.", th);
        }
        try {
            enforceTaskTTLs();
        } catch (Throwable th2) {
            logger.error("Uncaught exception when enforcing task TTLs. Ignoring. Loop will be retried.", th2);
        }
    }

    private void enforceAttemptTTLs() {
        for (StoredSessionAttempt storedSessionAttempt : (List) this.tm.begin(() -> {
            return this.ssm.findActiveAttemptsCreatedBefore(this.ssm.getStoreTime().minus((TemporalAmount) this.attemptTTL), 0L, 100);
        })) {
            try {
                if (((Boolean) this.tm.begin(() -> {
                    try {
                        if (this.ssm.getAttemptStateFlags(storedSessionAttempt.getId()).isCancelRequested()) {
                            logger.debug("Session Attempt already canceled, ignoring: {}", storedSessionAttempt);
                            return null;
                        }
                        logger.info("Session Attempt timed out, canceling: {}", storedSessionAttempt);
                        return Boolean.valueOf(this.ssm.requestCancelAttempt(storedSessionAttempt.getId()));
                    } catch (ResourceNotFoundException e) {
                        logger.debug("Session Attempt not found, ignoring: {}", storedSessionAttempt, e);
                        return null;
                    }
                })).booleanValue()) {
                    sendTimeoutNotification("Workflow execution timeout", storedSessionAttempt.getId());
                }
            } catch (Throwable th) {
                logger.error("Uncaught exception when enforcing attempt TTLs of attempt {}. Ignoring. Loop continues.", Long.valueOf(storedSessionAttempt.getId()), th);
            }
        }
    }

    private void enforceTaskTTLs() {
        for (Map.Entry entry : ((Map) ((List) this.tm.begin(() -> {
            return this.ssm.findTasksStartedBeforeWithState(TASK_TTL_ENFORCED_STATE_CODES, this.ssm.getStoreTime().minus((TemporalAmount) this.taskTTL), 0L, 100);
        })).stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getAttemptId();
        }))).entrySet()) {
            long longValue = ((Long) entry.getKey()).longValue();
            try {
                if (((Boolean) this.tm.begin(() -> {
                    logger.info("Task(s) timed out, canceling Session Attempt: {}, tasks={}", Long.valueOf(longValue), entry.getValue());
                    return Boolean.valueOf(this.ssm.requestCancelAttempt(longValue));
                })).booleanValue()) {
                    sendTimeoutNotification("Task execution timeout: " + ((String) ((List) entry.getValue()).stream().mapToLong((v0) -> {
                        return v0.getId();
                    }).mapToObj(Long::toString).collect(Collectors.joining(","))), longValue);
                }
            } catch (Throwable th) {
                logger.error("Uncaught exception when enforcing task TTLs of attempt {}. Ignoring. Loop continues.", entry.getKey(), th);
            }
        }
    }

    private void sendTimeoutNotification(String str, long j) {
        try {
            StoredSessionAttemptWithSession storedSessionAttemptWithSession = (StoredSessionAttemptWithSession) this.tm.begin(() -> {
                return this.ssm.getAttemptWithSessionById(j);
            }, ResourceNotFoundException.class);
            int projectId = storedSessionAttemptWithSession.getSession().getProjectId();
            try {
                StoredProject storedProject = (StoredProject) this.tm.begin(() -> {
                    return this.psm.getProjectByIdInternal(projectId);
                }, ResourceNotFoundException.class);
                Optional workflowDefinitionId = storedSessionAttemptWithSession.getWorkflowDefinitionId();
                Optional absent = Optional.absent();
                if (workflowDefinitionId.isPresent()) {
                    try {
                        absent = Optional.of(this.tm.begin(() -> {
                            return this.psm.getWorkflowDetailsById(((Long) workflowDefinitionId.get()).longValue());
                        }, ResourceNotFoundException.class));
                    } catch (ResourceNotFoundException e) {
                        absent = Optional.absent();
                    }
                }
                try {
                    this.notifier.sendNotification(Notification.builder(Instant.now(), str).attemptId(storedSessionAttemptWithSession.getId()).projectId(projectId).projectName(storedProject.getName()).revision(absent.transform(storedWorkflowDefinitionWithProject -> {
                        return storedWorkflowDefinitionWithProject.getRevisionName();
                    })).sessionId(storedSessionAttemptWithSession.getSessionId()).siteId(storedSessionAttemptWithSession.getSiteId()).workflowName(absent.transform(storedWorkflowDefinitionWithProject2 -> {
                        return storedWorkflowDefinitionWithProject2.getName();
                    })).build());
                } catch (NotificationException e2) {
                    logger.error("Failed to send execution timeout notification for attempt: {}", Long.valueOf(j), e2);
                }
            } catch (ResourceNotFoundException e3) {
                logger.error("Project not found, ignoring: {}", Long.valueOf(j));
            }
        } catch (ResourceNotFoundException e4) {
            logger.error("Session attempt not found, ignoring: {}", Long.valueOf(j));
        }
    }

    @PostConstruct
    public void start() {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.scheduleAtFixedRate(this::run, this.reapingInterval.toNanos(), this.reapingInterval.toNanos(), TimeUnit.NANOSECONDS);
        }
    }

    @PreDestroy
    public void shutdown() throws InterruptedException {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
    }
}
