/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.scheduler;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.TemporalAmount;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.api.Job;
import org.kie.kogito.jobs.service.executor.JobExecutor;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseTimerJobScheduler
implements ReactiveJobScheduler<ScheduledJob> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseTimerJobScheduler.class);
    @ConfigProperty(name="kogito.jobs-service.backoffRetryMillis")
    long backoffRetryMillis;
    @ConfigProperty(name="kogito.jobs-service.maxIntervalLimitToRetryMillis")
    long maxIntervalLimitToRetryMillis;
    @ConfigProperty(name="kogito.jobs-service.schedulerChunkInMinutes")
    long schedulerChunkInMinutes;
    @Inject
    JobExecutor jobExecutor;
    @Inject
    ReactiveJobRepository jobRepository;
    private final Map<String, ZonedDateTime> schedulerControl;

    protected BaseTimerJobScheduler() {
        this(null, null, 0L, 0L);
    }

    public BaseTimerJobScheduler(JobExecutor jobExecutor, ReactiveJobRepository jobRepository, long backoffRetryMillis, long maxIntervalLimitToRetryMillis) {
        this.jobExecutor = jobExecutor;
        this.jobRepository = jobRepository;
        this.backoffRetryMillis = backoffRetryMillis;
        this.maxIntervalLimitToRetryMillis = maxIntervalLimitToRetryMillis;
        this.schedulerControl = new ConcurrentHashMap<String, ZonedDateTime>();
    }

    @Override
    public Publisher<ScheduledJob> schedule(Job job) {
        LOGGER.debug("Scheduling {}", (Object)job);
        return ReactiveStreams.fromCompletionStage(this.jobRepository.exists(job.getId())).flatMap(exists -> Boolean.TRUE.equals(exists) ? this.handleExistingJob(job) : ReactiveStreams.of((Object)job)).flatMap(j -> this.isOnCurrentSchedulerChunk(job) ? this.doJobScheduling(job) : ReactiveStreams.fromCompletionStage(this.jobRepository.save(ScheduledJob.builder().job(job).status(JobStatus.SCHEDULED).build()))).buildRs();
    }

    private PublisherBuilder<ScheduledJob> doJobScheduling(Job job) {
        return ReactiveStreams.of((Object)job).map(current -> job.getExpirationTime()).map(this::calculateDelay).peek(delay -> Optional.of(delay.isNegative()).filter(Boolean.FALSE::equals).orElseThrow(() -> new RuntimeException("Delay should be positive"))).map(delay -> this.schedule((Duration)delay, job)).flatMap(p -> p).map(scheduleId -> ScheduledJob.builder().job(job).scheduledId((String)scheduleId).status(JobStatus.SCHEDULED).build()).map(scheduledJob -> this.jobRepository.save((ScheduledJob)((Object)scheduledJob))).flatMapCompletionStage(p -> p);
    }

    private boolean isOnCurrentSchedulerChunk(Job job) {
        return job.getExpirationTime().isBefore(DateUtil.now().plusMinutes(this.schedulerChunkInMinutes));
    }

    private PublisherBuilder<ScheduledJob> handleExistingJob(Job job) {
        return ReactiveStreams.fromCompletionStage(this.jobRepository.get(job.getId())).flatMap(j -> {
            switch (j.getStatus()) {
                case SCHEDULED: {
                    return this.handleExpirationTime((ScheduledJob)((Object)j)).map(scheduled -> ScheduledJob.builder().of((ScheduledJob)((Object)((Object)scheduled))).status(JobStatus.CANCELED).build()).map(CompletableFuture::completedFuture).flatMapCompletionStage(this::cancel).map(deleted -> j);
                }
                case RETRY: {
                    return this.handleRetry(CompletableFuture.completedFuture(j));
                }
            }
            return ReactiveStreams.empty();
        }).onErrorResumeWith(t -> ReactiveStreams.empty());
    }

    private Duration calculateDelay(ZonedDateTime expirationTime) {
        return Duration.between(DateUtil.now(), expirationTime);
    }

    private boolean validLimit(ScheduledJob job) {
        return Optional.of(job).map(Job::getRepeatLimit).filter(limit -> job.getExecutionCounter() < limit).isPresent();
    }

    private boolean wasPeriodicScheduled(ScheduledJob job) {
        return Optional.ofNullable(job).filter(j -> j.getExecutionCounter() > 1).isPresent();
    }

    public PublisherBuilder<ScheduledJob> handleJobExecutionSuccess(ScheduledJob futureJob) {
        return ReactiveStreams.of((Object)((Object)futureJob)).map(job -> ScheduledJob.builder().of((ScheduledJob)((Object)job)).incrementExecutionCounter().build()).flatMap(job -> job.hasInterval().filter(interval -> !this.wasPeriodicScheduled((ScheduledJob)((Object)job))).map(Duration::ofMillis).map(interval -> this.periodicSchedule((Duration)interval, (Job)job).map(scheduledId -> ScheduledJob.builder().of((ScheduledJob)((Object)job)).scheduledId((String)scheduledId).expirationTime(DateUtil.now().plus((TemporalAmount)interval)).status(JobStatus.SCHEDULED).build()).flatMapCompletionStage(this.jobRepository::save)).orElseGet(() -> ReactiveStreams.fromCompletionStage((CompletionStage)job.hasInterval().map(interval -> Optional.of(job).filter(this::wasPeriodicScheduled).filter(this::validLimit).map(s -> ScheduledJob.builder().of((ScheduledJob)((Object)job)).expirationTime(DateUtil.now().plus(Duration.ofMillis(interval))).build()).map(this.jobRepository::save).orElse(null)).orElseGet(() -> CompletableFuture.completedFuture(ScheduledJob.builder().of((ScheduledJob)((Object)job)).status(JobStatus.EXECUTED).build()))))).filter(job -> JobStatus.EXECUTED.equals((Object)job.getStatus())).flatMap(job -> ReactiveStreams.fromCompletionStage(this.cancel(CompletableFuture.completedFuture(job))));
    }

    @Override
    public PublisherBuilder<ScheduledJob> handleJobExecutionSuccess(JobExecutionResponse response) {
        return ReactiveStreams.of((Object)response).map(JobExecutionResponse::getJobId).flatMapCompletionStage(this.jobRepository::get).flatMap(this::handleJobExecutionSuccess);
    }

    private boolean isExpired(ZonedDateTime expirationTime) {
        Duration limit = Duration.ofMillis(this.maxIntervalLimitToRetryMillis);
        return this.calculateDelay(expirationTime).plus(limit).isNegative();
    }

    private PublisherBuilder<ScheduledJob> handleExpirationTime(ScheduledJob scheduledJob) {
        return ReactiveStreams.of((Object)((Object)scheduledJob)).map(Job::getExpirationTime).flatMapCompletionStage(time -> this.isExpired((ZonedDateTime)time) ? this.handleExpiredJob(scheduledJob) : CompletableFuture.completedFuture(scheduledJob));
    }

    @Override
    public PublisherBuilder<ScheduledJob> handleJobExecutionError(JobExecutionResponse errorResponse) {
        return this.handleRetry(this.jobRepository.get(errorResponse.getJobId()));
    }

    private PublisherBuilder<ScheduledJob> handleRetry(CompletionStage<ScheduledJob> futureJob) {
        return ReactiveStreams.fromCompletionStage(futureJob).flatMap(scheduledJob -> this.handleExpirationTime((ScheduledJob)((Object)scheduledJob)).map(ScheduledJob::getStatus).filter(s -> !JobStatus.ERROR.equals(s)).map(time -> this.schedule(Duration.ofMillis(this.backoffRetryMillis), (Job)scheduledJob)).flatMap(p -> p).map(scheduleId -> ScheduledJob.builder().of((ScheduledJob)((Object)scheduledJob)).scheduledId((String)scheduleId).status(JobStatus.RETRY).incrementRetries().build()).map(this.jobRepository::save).flatMapCompletionStage(p -> p)).peek(job -> LOGGER.debug("Retry executed {}", (Object)job));
    }

    private CompletionStage<ScheduledJob> handleExpiredJob(ScheduledJob scheduledJob) {
        return Optional.of(ScheduledJob.builder().of(scheduledJob).status(JobStatus.ERROR).build()).map(j -> this.jobRepository.delete((ScheduledJob)((Object)j)).thenApply(deleted -> {
            LOGGER.warn("Retry limit exceeded for job{}", (Object)j);
            return j;
        })).orElse(null);
    }

    private PublisherBuilder<String> schedule(Duration delay, Job job) {
        return this.doSchedule(delay, job).peek(this.registerScheduledJob(job));
    }

    private PublisherBuilder<String> periodicSchedule(Duration delay, Job job) {
        return this.doPeriodicSchedule(delay, job).peek(this.registerScheduledJob(job));
    }

    private Consumer<String> registerScheduledJob(Job job) {
        return s -> this.schedulerControl.put(job.getId(), DateUtil.now());
    }

    public abstract PublisherBuilder<String> doSchedule(Duration var1, Job var2);

    public abstract PublisherBuilder<String> doPeriodicSchedule(Duration var1, Job var2);

    protected CompletionStage<ScheduledJob> execute(Job job) {
        LOGGER.debug("Executing job ! {}", (Object)job);
        return this.jobExecutor.execute(this.jobRepository.get(job.getId())).whenComplete((j, t) -> this.unregisterScheduledJob(job));
    }

    private ZonedDateTime unregisterScheduledJob(Job job) {
        return this.schedulerControl.remove(job.getId());
    }

    public CompletionStage<ScheduledJob> cancel(CompletionStage<ScheduledJob> futureJob) {
        return ReactiveStreams.fromCompletionStageNullable(futureJob).peek(job -> LOGGER.debug("Cancel Job Scheduling {}", (Object)job)).flatMap(scheduledJob -> Optional.ofNullable(scheduledJob.getScheduledId()).map(id -> ReactiveStreams.fromPublisher(this.doCancel((ScheduledJob)((Object)scheduledJob))).map(b -> scheduledJob)).orElse(ReactiveStreams.of((Object)scheduledJob))).flatMapCompletionStage(this.jobRepository::delete).findFirst().run().thenApply(job -> job.orElse(null));
    }

    @Override
    public CompletionStage<ScheduledJob> cancel(String jobId) {
        return this.cancel(this.jobRepository.get(jobId).thenApply(scheduledJob -> Optional.ofNullable(scheduledJob).map(j -> ScheduledJob.builder().of((ScheduledJob)((Object)((Object)j))).status(JobStatus.CANCELED).build()).orElse(null)));
    }

    public abstract Publisher<Boolean> doCancel(ScheduledJob var1);

    @Override
    public Optional<ZonedDateTime> scheduled(String jobId) {
        return Optional.ofNullable(this.schedulerControl.get(jobId));
    }
}

