package ca.uhn.fhir.batch2.coordinator;

import ca.uhn.fhir.batch2.api.ChunkExecutionDetails;
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.api.IReductionStepWorker;
import ca.uhn.fhir.batch2.api.JobCompletionDetails;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.model.ChunkOutcome;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import jakarta.annotation.Nonnull;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.transaction.annotation.Propagation;

/* loaded from: input_file:ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.class */
public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorService, IHasScheduledJobs {
    public static final String SCHEDULED_JOB_ID = ReductionStepExecutorScheduledJob.class.getName();
    private static final Logger ourLog = LoggerFactory.getLogger(ReductionStepExecutorServiceImpl.class);
    private final IJobPersistence myJobPersistence;
    private final IHapiTransactionService myTransactionService;
    private final JobDefinitionRegistry myJobDefinitionRegistry;
    private Timer myHeartbeatTimer;
    private final Map<String, JobWorkCursor> myInstanceIdToJobWorkCursor = Collections.synchronizedMap(new LinkedHashMap());
    private final Semaphore myCurrentlyExecuting = new Semaphore(1);
    private final AtomicReference<String> myCurrentlyFinalizingInstanceId = new AtomicReference<>();
    private final ExecutorService myReducerExecutor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory("batch2-reducer"));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: ca.uhn.fhir.batch2.coordinator.ReductionStepExecutorServiceImpl$1, reason: invalid class name */
    /* loaded from: input_file:ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum;

        static {
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$ChunkOutcome$Status[ChunkOutcome.Status.SUCCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$ChunkOutcome$Status[ChunkOutcome.Status.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum = new int[StatusEnum.values().length];
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.IN_PROGRESS.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.ERRORED.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.FINALIZE.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.COMPLETED.ordinal()] = 4;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.FAILED.ordinal()] = 5;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.QUEUED.ordinal()] = 6;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.CANCELLED.ordinal()] = 7;
            } catch (NoSuchFieldError e9) {
            }
        }
    }

    /* loaded from: input_file:ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl$HeartbeatTimerTask.class */
    private class HeartbeatTimerTask extends TimerTask {
        private HeartbeatTimerTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            ReductionStepExecutorServiceImpl.this.runHeartbeat();
        }
    }

    /* loaded from: input_file:ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl$ReductionStepExecutorScheduledJob.class */
    public static class ReductionStepExecutorScheduledJob implements HapiJob {

        @Autowired
        private IReductionStepExecutorService myTarget;

        public void execute(JobExecutionContext jobExecutionContext) {
            this.myTarget.reducerPass();
        }
    }

    public ReductionStepExecutorServiceImpl(IJobPersistence iJobPersistence, IHapiTransactionService iHapiTransactionService, JobDefinitionRegistry jobDefinitionRegistry) {
        this.myJobPersistence = iJobPersistence;
        this.myTransactionService = iHapiTransactionService;
        this.myJobDefinitionRegistry = jobDefinitionRegistry;
    }

    @EventListener({ContextRefreshedEvent.class})
    public void start() {
        if (this.myHeartbeatTimer == null) {
            this.myHeartbeatTimer = new Timer("batch2-reducer-heartbeat");
            this.myHeartbeatTimer.schedule(new HeartbeatTimerTask(), 60000L, 60000L);
        }
    }

    private void runHeartbeat() {
        String str = this.myCurrentlyFinalizingInstanceId.get();
        if (str != null) {
            ourLog.info("Running heartbeat for instance: {}", str);
            executeInTransactionWithSynchronization(() -> {
                this.myJobPersistence.updateInstanceUpdateTime(str);
                return null;
            });
        }
    }

    @EventListener({ContextClosedEvent.class})
    public void shutdown() {
        if (this.myHeartbeatTimer != null) {
            this.myHeartbeatTimer.cancel();
            this.myHeartbeatTimer = null;
        }
    }

    @Override // ca.uhn.fhir.batch2.api.IReductionStepExecutorService
    public void triggerReductionStep(String str, JobWorkCursor<?, ?, ?> jobWorkCursor) {
        this.myInstanceIdToJobWorkCursor.putIfAbsent(str, jobWorkCursor);
        if (this.myCurrentlyExecuting.availablePermits() > 0) {
            this.myReducerExecutor.submit(this::reducerPass);
        }
    }

    @Override // ca.uhn.fhir.batch2.api.IReductionStepExecutorService
    public void reducerPass() {
        try {
            if (this.myCurrentlyExecuting.tryAcquire()) {
                try {
                    String[] strArr = (String[]) this.myInstanceIdToJobWorkCursor.keySet().toArray(new String[0]);
                    if (strArr.length > 0) {
                        String str = strArr[0];
                        this.myCurrentlyFinalizingInstanceId.set(str);
                        executeReductionStep(str, this.myInstanceIdToJobWorkCursor.get(str));
                        this.myInstanceIdToJobWorkCursor.remove(str);
                    }
                    this.myCurrentlyFinalizingInstanceId.set(null);
                    this.myCurrentlyExecuting.release();
                } catch (Exception e) {
                    ourLog.error("Failed to execute reducer pass", e);
                    this.myCurrentlyFinalizingInstanceId.set(null);
                    this.myCurrentlyExecuting.release();
                }
            }
        } catch (Throwable th) {
            this.myCurrentlyFinalizingInstanceId.set(null);
            this.myCurrentlyExecuting.release();
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @VisibleForTesting
    @WithSpan(BatchJobOpenTelemetryUtils.JOB_STEP_EXECUTION_SPAN_NAME)
    <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> ReductionStepChunkProcessingResponse executeReductionStep(String str, JobWorkCursor<PT, IT, OT> jobWorkCursor) {
        BatchJobOpenTelemetryUtils.addAttributesToCurrentSpan(jobWorkCursor.getJobDefinition().getJobDefinitionId(), jobWorkCursor.getJobDefinition().getJobDefinitionVersion(), str, jobWorkCursor.getCurrentStepId(), null);
        JobDefinitionStep<PT, IT, OT> currentStep = jobWorkCursor.getCurrentStep();
        JobInstance jobInstance = (JobInstance) executeInTransactionWithSynchronization(() -> {
            return this.myJobPersistence.fetchInstance(str).orElseThrow(() -> {
                return new InternalErrorException("Unknown instance: " + str);
            });
        });
        boolean z = false;
        switch (AnonymousClass1.$SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[jobInstance.getStatus().ordinal()]) {
            case 1:
            case 2:
                if (((Boolean) executeInTransactionWithSynchronization(() -> {
                    return Boolean.valueOf(this.myJobPersistence.markInstanceAsStatusWhenStatusIn(jobInstance.getInstanceId(), StatusEnum.FINALIZE, EnumSet.of(StatusEnum.IN_PROGRESS, StatusEnum.ERRORED)));
                })).booleanValue()) {
                    ourLog.info("Job instance {} has been set to FINALIZE state - Beginning reducer step", jobInstance.getInstanceId());
                    z = true;
                    break;
                }
                break;
        }
        if (!z) {
            ourLog.warn("JobInstance[{}] should not be finalized at this time. In memory status is {}. Reduction step will not rerun! This could be a long running reduction job resulting in the processed msg not being acknowledged, or the result of a failed process or server restarting.", jobInstance.getInstanceId(), jobInstance.getStatus());
            return new ReductionStepChunkProcessingResponse(false);
        }
        IModelJson parameters = jobInstance.getParameters(jobWorkCursor.getJobDefinition().getParametersType());
        IReductionStepWorker iReductionStepWorker = (IReductionStepWorker) currentStep.getJobStepWorker();
        jobInstance.setStatus(StatusEnum.FINALIZE);
        ReductionStepChunkProcessingResponse reductionStepChunkProcessingResponse = new ReductionStepChunkProcessingResponse(true);
        try {
            processChunksAndCompleteJob(jobWorkCursor, currentStep, jobInstance, parameters, iReductionStepWorker, reductionStepChunkProcessingResponse);
        } catch (Exception e) {
            ourLog.error("Job completion failed for Job {}", jobInstance.getInstanceId(), e);
            executeInTransactionWithSynchronization(() -> {
                this.myJobPersistence.updateInstance(jobInstance.getInstanceId(), jobInstance2 -> {
                    jobInstance2.setStatus(StatusEnum.FAILED);
                    return true;
                });
                return null;
            });
            reductionStepChunkProcessingResponse.setSuccessful(false);
        }
        if (!reductionStepChunkProcessingResponse.hasSuccessfulChunksIds()) {
            reductionStepChunkProcessingResponse.setSuccessful(false);
        }
        return reductionStepChunkProcessingResponse;
    }

    private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> void processChunksAndCompleteJob(JobWorkCursor<PT, IT, OT> jobWorkCursor, JobDefinitionStep<PT, IT, OT> jobDefinitionStep, JobInstance jobInstance, PT pt, IReductionStepWorker<PT, IT, OT> iReductionStepWorker, ReductionStepChunkProcessingResponse reductionStepChunkProcessingResponse) {
        try {
            executeInTransactionWithSynchronization(() -> {
                Stream<WorkChunk> fetchAllWorkChunksForStepStream = this.myJobPersistence.fetchAllWorkChunksForStepStream(jobInstance.getInstanceId(), jobDefinitionStep.getStepId());
                try {
                    fetchAllWorkChunksForStepStream.forEach(workChunk -> {
                        processChunk(workChunk, jobInstance, pt, iReductionStepWorker, reductionStepChunkProcessingResponse, jobWorkCursor);
                    });
                    if (fetchAllWorkChunksForStepStream == null) {
                        return null;
                    }
                    fetchAllWorkChunksForStepStream.close();
                    return null;
                } catch (Throwable th) {
                    if (fetchAllWorkChunksForStepStream != null) {
                        try {
                            fetchAllWorkChunksForStepStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            });
            executeInTransactionWithSynchronization(() -> {
                IJobCompletionHandler completionHandler;
                ourLog.info("Reduction step for instance[{}] produced {} successful and {} failed chunks", new Object[]{jobInstance.getInstanceId(), Integer.valueOf(reductionStepChunkProcessingResponse.getSuccessfulChunkIds().size()), Integer.valueOf(reductionStepChunkProcessingResponse.getFailedChunksIds().size())});
                ReductionStepDataSink reductionStepDataSink = new ReductionStepDataSink(jobInstance.getInstanceId(), jobWorkCursor, this.myJobPersistence, this.myJobDefinitionRegistry);
                StepExecutionDetails createReductionStepDetails = StepExecutionDetails.createReductionStepDetails(pt, null, jobInstance);
                if (reductionStepChunkProcessingResponse.isSuccessful()) {
                    iReductionStepWorker.run(createReductionStepDetails, reductionStepDataSink);
                    jobInstance.setStatus(StatusEnum.COMPLETED);
                }
                if (reductionStepChunkProcessingResponse.hasSuccessfulChunksIds()) {
                    this.myJobPersistence.markWorkChunksWithStatusAndWipeData(jobInstance.getInstanceId(), reductionStepChunkProcessingResponse.getSuccessfulChunkIds(), WorkChunkStatusEnum.COMPLETED, null);
                }
                if (reductionStepChunkProcessingResponse.hasFailedChunkIds()) {
                    this.myJobPersistence.markWorkChunksWithStatusAndWipeData(jobInstance.getInstanceId(), reductionStepChunkProcessingResponse.getFailedChunksIds(), WorkChunkStatusEnum.FAILED, "JOB ABORTED");
                }
                if (!reductionStepChunkProcessingResponse.isSuccessful() || (completionHandler = jobWorkCursor.getJobDefinition().getCompletionHandler()) == null) {
                    return null;
                }
                completionHandler.jobComplete(new JobCompletionDetails(pt, jobInstance));
                return null;
            });
        } catch (Throwable th) {
            executeInTransactionWithSynchronization(() -> {
                IJobCompletionHandler completionHandler;
                ourLog.info("Reduction step for instance[{}] produced {} successful and {} failed chunks", new Object[]{jobInstance.getInstanceId(), Integer.valueOf(reductionStepChunkProcessingResponse.getSuccessfulChunkIds().size()), Integer.valueOf(reductionStepChunkProcessingResponse.getFailedChunksIds().size())});
                ReductionStepDataSink reductionStepDataSink = new ReductionStepDataSink(jobInstance.getInstanceId(), jobWorkCursor, this.myJobPersistence, this.myJobDefinitionRegistry);
                StepExecutionDetails createReductionStepDetails = StepExecutionDetails.createReductionStepDetails(pt, null, jobInstance);
                if (reductionStepChunkProcessingResponse.isSuccessful()) {
                    iReductionStepWorker.run(createReductionStepDetails, reductionStepDataSink);
                    jobInstance.setStatus(StatusEnum.COMPLETED);
                }
                if (reductionStepChunkProcessingResponse.hasSuccessfulChunksIds()) {
                    this.myJobPersistence.markWorkChunksWithStatusAndWipeData(jobInstance.getInstanceId(), reductionStepChunkProcessingResponse.getSuccessfulChunkIds(), WorkChunkStatusEnum.COMPLETED, null);
                }
                if (reductionStepChunkProcessingResponse.hasFailedChunkIds()) {
                    this.myJobPersistence.markWorkChunksWithStatusAndWipeData(jobInstance.getInstanceId(), reductionStepChunkProcessingResponse.getFailedChunksIds(), WorkChunkStatusEnum.FAILED, "JOB ABORTED");
                }
                if (!reductionStepChunkProcessingResponse.isSuccessful() || (completionHandler = jobWorkCursor.getJobDefinition().getCompletionHandler()) == null) {
                    return null;
                }
                completionHandler.jobComplete(new JobCompletionDetails(pt, jobInstance));
                return null;
            });
            throw th;
        }
    }

    private <T> T executeInTransactionWithSynchronization(Callable<T> callable) {
        return (T) this.myTransactionService.withRequest((RequestDetails) null).withPropagation(Propagation.REQUIRES_NEW).execute(callable);
    }

    public void scheduleJobs(ISchedulerService iSchedulerService) {
        iSchedulerService.scheduleClusteredJob(10000L, buildJobDefinition());
    }

    @Nonnull
    private ScheduledJobDefinition buildJobDefinition() {
        ScheduledJobDefinition scheduledJobDefinition = new ScheduledJobDefinition();
        scheduledJobDefinition.setId(SCHEDULED_JOB_ID);
        scheduledJobDefinition.setJobClass(ReductionStepExecutorScheduledJob.class);
        return scheduledJobDefinition;
    }

    private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> void processChunk(WorkChunk workChunk, JobInstance jobInstance, PT pt, IReductionStepWorker<PT, IT, OT> iReductionStepWorker, ReductionStepChunkProcessingResponse reductionStepChunkProcessingResponse, JobWorkCursor<PT, IT, OT> jobWorkCursor) {
        if (workChunk.getStatus() == WorkChunkStatusEnum.COMPLETED) {
            ourLog.error("Unexpected chunk {} with status {} found while reducing {}.  No chunks feeding into a reduction step should be in a state other than READY.", new Object[]{workChunk.getId(), workChunk.getStatus(), jobInstance});
            return;
        }
        if (reductionStepChunkProcessingResponse.hasFailedChunkIds()) {
            reductionStepChunkProcessingResponse.addFailedChunkId(workChunk);
            return;
        }
        try {
            switch (iReductionStepWorker.consume(new ChunkExecutionDetails<>(workChunk.getData(jobWorkCursor.getCurrentStep().getInputType()), pt, jobInstance.getInstanceId(), workChunk.getId())).getStatus()) {
                case SUCCESS:
                    reductionStepChunkProcessingResponse.addSuccessfulChunkId(workChunk);
                    break;
                case FAILED:
                    ourLog.error("Processing of work chunk {} resulted in aborting job.", workChunk.getId());
                    reductionStepChunkProcessingResponse.addFailedChunkId(workChunk);
                    reductionStepChunkProcessingResponse.setSuccessful(false);
                    break;
            }
        } catch (Exception e) {
            String format = String.format("Reduction step failed to execute chunk reduction for chunk %s with exception: %s.", workChunk.getId(), e.getMessage());
            ourLog.error(format, e);
            reductionStepChunkProcessingResponse.setSuccessful(false);
            this.myJobPersistence.onWorkChunkFailed(workChunk.getId(), format);
        }
    }
}
