/*
 * Decompiled with CFR 0.152.
 */
package io.digdag.server;

import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import io.digdag.client.config.Config;
import io.digdag.core.database.TransactionManager;
import io.digdag.core.log.LogMarkers;
import io.digdag.core.repository.ProjectStoreManager;
import io.digdag.core.repository.ResourceNotFoundException;
import io.digdag.core.repository.StoredProject;
import io.digdag.core.session.AttemptStateFlags;
import io.digdag.core.session.SessionStoreManager;
import io.digdag.core.session.StoredSessionAttempt;
import io.digdag.core.session.StoredSessionAttemptWithSession;
import io.digdag.core.session.TaskAttemptSummary;
import io.digdag.core.session.TaskStateCode;
import io.digdag.server.ServerConfig;
import io.digdag.spi.ImmutableNotification;
import io.digdag.spi.Notification;
import io.digdag.spi.NotificationException;
import io.digdag.spi.Notifier;
import io.digdag.util.DurationParam;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkflowExecutionTimeoutEnforcer {
    private static final Logger logger = LoggerFactory.getLogger(WorkflowExecutionTimeoutEnforcer.class);
    private static final Duration DEFAULT_ATTEMPT_TTL = Duration.ofDays(7L);
    private static final Duration DEFAULT_TASK_TTL = Duration.ofDays(1L);
    private static final Duration DEFAULT_REAPING_INTERVAL = Duration.ofSeconds(5L);
    private static final TaskStateCode[] TASK_TTL_ENFORCED_STATE_CODES = new TaskStateCode[]{TaskStateCode.RETRY_WAITING, TaskStateCode.GROUP_RETRY_WAITING, TaskStateCode.RUNNING};
    private final ScheduledExecutorService scheduledExecutorService;
    private final SessionStoreManager ssm;
    private final Notifier notifier;
    private final ProjectStoreManager psm;
    private final TransactionManager tm;
    private final Duration attemptTTL;
    private final Duration reapingInterval;
    private final Duration taskTTL;

    @Inject
    public WorkflowExecutionTimeoutEnforcer(ServerConfig serverConfig, SessionStoreManager ssm, TransactionManager tm, Config systemConfig, Notifier notifier, ProjectStoreManager psm) {
        this.attemptTTL = (Duration)systemConfig.getOptional("executor.attempt_ttl", DurationParam.class).transform(DurationParam::getDuration).or((Object)DEFAULT_ATTEMPT_TTL);
        this.taskTTL = (Duration)systemConfig.getOptional("executor.task_ttl", DurationParam.class).transform(DurationParam::getDuration).or((Object)DEFAULT_TASK_TTL);
        this.reapingInterval = (Duration)systemConfig.getOptional("executor.ttl_reaping_interval", DurationParam.class).transform(DurationParam::getDuration).or((Object)DEFAULT_REAPING_INTERVAL);
        this.ssm = ssm;
        this.notifier = notifier;
        this.psm = psm;
        this.tm = tm;
        this.scheduledExecutorService = serverConfig.getExecutorEnabled() ? Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("attempt-timeout-enforcer-%d").build()) : null;
    }

    private void run() {
        try {
            this.enforceAttemptTTLs();
        }
        catch (Throwable t) {
            logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Uncaught exception when enforcing attempt TTLs. Ignoring. Loop will be retried.", t);
        }
        try {
            this.enforceTaskTTLs();
        }
        catch (Throwable t) {
            logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Uncaught exception when enforcing task TTLs. Ignoring. Loop will be retried.", t);
        }
    }

    private void enforceAttemptTTLs() {
        List expiredAttempts = (List)this.tm.begin(() -> {
            Instant creationDeadline = this.ssm.getStoreTime().minus(this.attemptTTL);
            return this.ssm.findActiveAttemptsCreatedBefore(creationDeadline, 0L, 100);
        });
        for (StoredSessionAttempt attempt : expiredAttempts) {
            try {
                boolean canceled = (Boolean)this.tm.begin(() -> {
                    AttemptStateFlags stateFlags;
                    try {
                        stateFlags = this.ssm.getAttemptStateFlags(attempt.getId());
                    }
                    catch (ResourceNotFoundException e) {
                        logger.debug("Session Attempt not found, ignoring: {}", (Object)attempt, (Object)e);
                        return null;
                    }
                    if (stateFlags.isCancelRequested()) {
                        logger.debug("Session Attempt already canceled, ignoring: {}", (Object)attempt);
                        return null;
                    }
                    logger.info("Session Attempt timed out, canceling: {}", (Object)attempt);
                    return this.ssm.requestCancelAttempt(attempt.getId());
                });
                if (!canceled) continue;
                this.sendTimeoutNotification("Workflow execution timeout", attempt.getId());
            }
            catch (Throwable t) {
                logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Uncaught exception when enforcing attempt TTLs of attempt {}. Ignoring. Loop continues.", (Object)attempt.getId(), (Object)t);
            }
        }
    }

    private void enforceTaskTTLs() {
        List expiredTasks = (List)this.tm.begin(() -> {
            Instant startDeadline = this.ssm.getStoreTime().minus(this.taskTTL);
            return this.ssm.findTasksStartedBeforeWithState(TASK_TTL_ENFORCED_STATE_CODES, startDeadline, 0L, 100);
        });
        Map<Long, List<TaskAttemptSummary>> attempts = expiredTasks.stream().collect(Collectors.groupingBy(TaskAttemptSummary::getAttemptId));
        for (Map.Entry<Long, List<TaskAttemptSummary>> entry : attempts.entrySet()) {
            long attemptId = entry.getKey();
            try {
                boolean canceled = (Boolean)this.tm.begin(() -> {
                    logger.info("Task(s) timed out, canceling Session Attempt: {}, tasks={}", (Object)attemptId, entry.getValue());
                    return this.ssm.requestCancelAttempt(attemptId);
                });
                if (!canceled) continue;
                String taskIds = entry.getValue().stream().mapToLong(TaskAttemptSummary::getId).mapToObj(Long::toString).collect(Collectors.joining(","));
                this.sendTimeoutNotification("Task execution timeout: " + taskIds, attemptId);
            }
            catch (Throwable t) {
                logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Uncaught exception when enforcing task TTLs of attempt {}. Ignoring. Loop continues.", (Object)entry.getKey(), (Object)t);
            }
        }
    }

    private void sendTimeoutNotification(String message, long attemptId) {
        StoredProject project;
        StoredSessionAttemptWithSession attempt;
        try {
            attempt = (StoredSessionAttemptWithSession)this.tm.begin(() -> this.ssm.getAttemptWithSessionById(attemptId), ResourceNotFoundException.class);
        }
        catch (ResourceNotFoundException e) {
            logger.error("Session attempt not found, ignoring: {}", (Object)attemptId);
            return;
        }
        int projectId = attempt.getSession().getProjectId();
        try {
            project = (StoredProject)this.tm.begin(() -> this.psm.getProjectByIdInternal(projectId), ResourceNotFoundException.class);
        }
        catch (ResourceNotFoundException e) {
            logger.error("Project not found, ignoring: {}", (Object)attemptId);
            return;
        }
        Optional wfId = attempt.getWorkflowDefinitionId();
        Optional workflow = Optional.absent();
        if (wfId.isPresent()) {
            try {
                workflow = Optional.of((Object)this.tm.begin(() -> this.psm.getWorkflowDetailsById(((Long)wfId.get()).longValue()), ResourceNotFoundException.class));
            }
            catch (ResourceNotFoundException e) {
                workflow = Optional.absent();
            }
        }
        ImmutableNotification notification = Notification.builder((Instant)Instant.now(), (String)message).attemptId(attempt.getId()).projectId(projectId).projectName(project.getName()).revision(workflow.transform(wf -> wf.getRevisionName())).sessionId(attempt.getSessionId()).siteId(attempt.getSiteId()).workflowName(workflow.transform(wf -> wf.getName())).workflowDefinitionId(wfId).build();
        try {
            this.notifier.sendNotification((Notification)notification);
        }
        catch (NotificationException e) {
            logger.error("Failed to send execution timeout notification for attempt: {}", (Object)attemptId, (Object)e);
        }
    }

    @PostConstruct
    public void start() {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.scheduleAtFixedRate(this::run, this.reapingInterval.toNanos(), this.reapingInterval.toNanos(), TimeUnit.NANOSECONDS);
        }
    }

    @PreDestroy
    public void shutdown() throws InterruptedException {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
    }
}

