package ca.uhn.fhir.batch2.coordinator;

import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.util.Logs;
import jakarta.annotation.Nonnull;
import java.util.Optional;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.class */
public class WorkChannelMessageHandler implements MessageHandler {
    private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
    private final IJobPersistence myJobPersistence;
    private final JobDefinitionRegistry myJobDefinitionRegistry;
    private final JobStepExecutorFactory myJobStepExecutorFactory;
    private final IHapiTransactionService myHapiTransactionService;

    /* renamed from: ca.uhn.fhir.batch2.coordinator.WorkChannelMessageHandler$1, reason: invalid class name */
    /* loaded from: input_file:ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum = new int[StatusEnum.values().length];

        static {
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.QUEUED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.IN_PROGRESS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.ERRORED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.FINALIZE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.COMPLETED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.CANCELLED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[StatusEnum.FAILED.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler$MessageProcess.class */
    public class MessageProcess {
        final JobWorkNotification myWorkNotification;
        String myChunkId;
        WorkChunk myWorkChunk;
        JobWorkCursor<?, ?, ?> myCursor;
        JobInstance myJobInstance;
        JobDefinition<?> myJobDefinition;
        JobStepExecutor<?, ?, ?> myStepExector;

        MessageProcess(JobWorkNotification jobWorkNotification) {
            this.myWorkNotification = jobWorkNotification;
        }

        Optional<MessageProcess> validateChunkId() {
            this.myChunkId = this.myWorkNotification.getChunkId();
            if (this.myChunkId != null) {
                return Optional.of(this);
            }
            WorkChannelMessageHandler.ourLog.error("Received work notification with null chunkId: {}", this.myWorkNotification);
            return Optional.empty();
        }

        Optional<MessageProcess> loadJobDefinitionOrThrow() {
            this.myJobDefinition = WorkChannelMessageHandler.this.myJobDefinitionRegistry.getJobDefinitionOrThrowException(this.myWorkNotification.getJobDefinitionId(), this.myWorkNotification.getJobDefinitionVersion());
            return Optional.of(this);
        }

        Optional<MessageProcess> loadJobInstance() {
            return WorkChannelMessageHandler.this.myJobPersistence.fetchInstance(this.myWorkNotification.getInstanceId()).or(() -> {
                WorkChannelMessageHandler.ourLog.error("No instance {} exists for chunk notification {}", this.myWorkNotification.getInstanceId(), this.myWorkNotification);
                return Optional.empty();
            }).map(jobInstance -> {
                this.myJobInstance = jobInstance;
                jobInstance.setJobDefinition(this.myJobDefinition);
                return this;
            });
        }

        Optional<MessageProcess> updateChunkStatusAndValidate() {
            return WorkChannelMessageHandler.this.myJobPersistence.onWorkChunkDequeue(this.myChunkId).or(() -> {
                WorkChannelMessageHandler.ourLog.error("Unable to find chunk with ID {} - Aborting.  {}", this.myChunkId, this.myWorkNotification);
                return Optional.empty();
            }).map(workChunk -> {
                this.myWorkChunk = workChunk;
                WorkChannelMessageHandler.ourLog.debug("Worker picked up chunk. [chunkId={}, stepId={}, startTime={}]", new Object[]{this.myChunkId, this.myWorkChunk.getTargetStepId(), this.myWorkChunk.getStartTime()});
                return this;
            });
        }

        Optional<MessageProcess> updateAndValidateJobStatus() {
            WorkChannelMessageHandler.ourLog.trace("Check status {} of job {} for chunk {}", new Object[]{this.myJobInstance.getStatus(), this.myJobInstance.getInstanceId(), this.myChunkId});
            switch (AnonymousClass1.$SwitchMap$ca$uhn$fhir$batch2$model$StatusEnum[this.myJobInstance.getStatus().ordinal()]) {
                case 1:
                    WorkChannelMessageHandler.this.myJobPersistence.onChunkDequeued(this.myJobInstance.getInstanceId());
                    break;
                case 2:
                case WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT /* 3 */:
                case 4:
                    break;
                case JobMaintenanceServiceImpl.MAINTENANCE_TRIGGER_RUN_WITHOUT_SCHEDULER_TIMEOUT /* 5 */:
                    WorkChannelMessageHandler.ourLog.error("Received chunk {}, but job instance is {}.  Skipping.", this.myChunkId, this.myJobInstance.getStatus());
                    return Optional.empty();
                case 6:
                case 7:
                default:
                    WorkChannelMessageHandler.ourLog.info("Skipping chunk {} because job instance is {}", this.myChunkId, this.myJobInstance.getStatus());
                    return Optional.empty();
            }
            return Optional.of(this);
        }

        Optional<MessageProcess> buildCursor() {
            this.myCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(this.myJobDefinition, this.myWorkNotification.getTargetStepId());
            if (this.myWorkChunk.getTargetStepId().equals(this.myCursor.getCurrentStepId())) {
                return Optional.of(this);
            }
            WorkChannelMessageHandler.ourLog.error("Chunk {} has target step {} but expected {}", new Object[]{this.myChunkId, this.myWorkChunk.getTargetStepId(), this.myCursor.getCurrentStepId()});
            return Optional.empty();
        }

        public Optional<MessageProcess> buildStepExecutor() {
            this.myStepExector = WorkChannelMessageHandler.this.myJobStepExecutorFactory.newJobStepExecutor(this.myJobInstance, this.myWorkChunk, this.myCursor);
            return Optional.of(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WorkChannelMessageHandler(@Nonnull IJobPersistence iJobPersistence, @Nonnull JobDefinitionRegistry jobDefinitionRegistry, @Nonnull BatchJobSender batchJobSender, @Nonnull WorkChunkProcessor workChunkProcessor, @Nonnull IJobMaintenanceService iJobMaintenanceService, IHapiTransactionService iHapiTransactionService) {
        this.myJobPersistence = iJobPersistence;
        this.myJobDefinitionRegistry = jobDefinitionRegistry;
        this.myHapiTransactionService = iHapiTransactionService;
        this.myJobStepExecutorFactory = new JobStepExecutorFactory(iJobPersistence, batchJobSender, workChunkProcessor, iJobMaintenanceService, jobDefinitionRegistry);
    }

    public void handleMessage(@Nonnull Message<?> message) throws MessagingException {
        handleWorkChannelMessage((JobWorkNotificationJsonMessage) message);
    }

    private void handleWorkChannelMessage(JobWorkNotificationJsonMessage jobWorkNotificationJsonMessage) {
        JobWorkNotification m29getPayload = jobWorkNotificationJsonMessage.m29getPayload();
        ourLog.info("Received work notification for {}", m29getPayload);
        executeInTxRollbackWhenEmpty(() -> {
            return Optional.of(new MessageProcess(m29getPayload)).flatMap((v0) -> {
                return v0.validateChunkId();
            }).flatMap((v0) -> {
                return v0.loadJobDefinitionOrThrow();
            }).flatMap((v0) -> {
                return v0.loadJobInstance();
            }).flatMap((v0) -> {
                return v0.updateChunkStatusAndValidate();
            }).flatMap((v0) -> {
                return v0.updateAndValidateJobStatus();
            }).flatMap((v0) -> {
                return v0.buildCursor();
            }).flatMap((v0) -> {
                return v0.buildStepExecutor();
            });
        }).ifPresentOrElse(messageProcess -> {
            messageProcess.myStepExector.executeStep();
        }, () -> {
            ourLog.debug("Discarding chunk notification {}", m29getPayload);
        });
    }

    <T> Optional<T> executeInTxRollbackWhenEmpty(Supplier<Optional<T>> supplier) {
        return (Optional) this.myHapiTransactionService.withSystemRequestOnDefaultPartition().execute(transactionStatus -> {
            Optional optional = (Optional) supplier.get();
            if (optional.isEmpty()) {
                ourLog.debug("WorkChunk setup failed - rollback tx");
                transactionStatus.setRollbackOnly();
            }
            return optional;
        });
    }
}
