package org.bonitasoft.engine.message;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.bonitasoft.engine.api.utils.VisibleForTesting;
import org.bonitasoft.engine.builder.BuilderFactory;
import org.bonitasoft.engine.commons.TenantLifecycleService;
import org.bonitasoft.engine.commons.exceptions.SBonitaException;
import org.bonitasoft.engine.core.process.instance.api.event.EventInstanceService;
import org.bonitasoft.engine.core.process.instance.api.exceptions.event.trigger.SMessageInstanceReadException;
import org.bonitasoft.engine.core.process.instance.api.exceptions.event.trigger.SMessageModificationException;
import org.bonitasoft.engine.core.process.instance.api.exceptions.event.trigger.SWaitingEventModificationException;
import org.bonitasoft.engine.core.process.instance.api.exceptions.event.trigger.SWaitingEventReadException;
import org.bonitasoft.engine.core.process.instance.model.builder.event.handling.SMessageInstanceBuilder;
import org.bonitasoft.engine.core.process.instance.model.builder.event.handling.SWaitingMessageEventBuilderFactory;
import org.bonitasoft.engine.core.process.instance.model.event.handling.SBPMEventType;
import org.bonitasoft.engine.core.process.instance.model.event.handling.SMessageEventCouple;
import org.bonitasoft.engine.core.process.instance.model.event.handling.SMessageInstance;
import org.bonitasoft.engine.core.process.instance.model.event.handling.SWaitingMessageEvent;
import org.bonitasoft.engine.execution.work.BPMWorkFactory;
import org.bonitasoft.engine.lock.BonitaLock;
import org.bonitasoft.engine.lock.LockService;
import org.bonitasoft.engine.log.technical.TechnicalLogger;
import org.bonitasoft.engine.log.technical.TechnicalLoggerService;
import org.bonitasoft.engine.recorder.model.EntityUpdateDescriptor;
import org.bonitasoft.engine.sessionaccessor.SessionAccessor;
import org.bonitasoft.engine.transaction.BonitaTransactionSynchronization;
import org.bonitasoft.engine.transaction.STransactionNotFoundException;
import org.bonitasoft.engine.transaction.UserTransactionService;
import org.bonitasoft.engine.work.SWorkRegisterException;
import org.bonitasoft.engine.work.WorkService;

/* loaded from: input_file:org/bonitasoft/engine/message/MessagesHandlingService.class */
public class MessagesHandlingService implements TenantLifecycleService {
    private static final int MAX_COUPLES = 100;
    private static final String LOCK_TYPE = "EVENTS";
    public static final String NUMBER_OF_MESSAGES_EXECUTED = "bonita.bpmengine.message.executed";
    public static final String NUMBER_OF_MESSAGES_POTENTIAL_MATCHED = "bonita.bpmengine.message.potential";
    public static final String NUMBER_OF_MESSAGES_MATCHING_RETRIGGERED_TASKS = "bonita.bpmengine.message.retriggeredtasks";
    private ThreadPoolExecutor threadPoolExecutor;
    private EventInstanceService eventInstanceService;
    private WorkService workService;
    private TechnicalLogger logger;
    private LockService lockService;
    private Long tenantId;
    private UserTransactionService userTransactionService;
    private SessionAccessor sessionAccessor;
    private BPMWorkFactory workFactory;
    private final Counter executedMessagesCounter;
    private final Counter matchedPotentialMessagesCounter;
    private final Counter retriggeredMatchingTasksCounter;

    /* loaded from: input_file:org/bonitasoft/engine/message/MessagesHandlingService$MessagesMatchingTask.class */
    private class MessagesMatchingTask implements Callable<Void> {
        private MessagesMatchingTask() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            try {
                MessagesHandlingService.this.logger.debug("Starting messages matching");
                BonitaLock tryLock = MessagesHandlingService.this.lockService.tryLock(1L, MessagesHandlingService.LOCK_TYPE, 1L, TimeUnit.MILLISECONDS, MessagesHandlingService.this.tenantId.longValue());
                if (tryLock == null) {
                    MessagesHandlingService.this.logger.debug("The task that matches BPMN messages is already running, this execution will be ignored");
                    return null;
                }
                try {
                    MessagesHandlingService.this.sessionAccessor.setTenantId(MessagesHandlingService.this.tenantId.longValue());
                    MessagesHandlingService.this.matchEventCoupleAndTriggerExecution();
                    MessagesHandlingService.this.lockService.unlock(tryLock, MessagesHandlingService.this.tenantId.longValue());
                    MessagesHandlingService.this.logger.debug("Messages matching completed");
                    return null;
                } catch (Throwable th) {
                    MessagesHandlingService.this.lockService.unlock(tryLock, MessagesHandlingService.this.tenantId.longValue());
                    throw th;
                }
            } catch (Exception e) {
                MessagesHandlingService.this.logger.error("Error while matching messages", e);
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/bonitasoft/engine/message/MessagesHandlingService$RegisterMessagesMatchingSynchronization.class */
    public class RegisterMessagesMatchingSynchronization implements BonitaTransactionSynchronization {
        private RegisterMessagesMatchingSynchronization() {
        }

        @Override // org.bonitasoft.engine.transaction.BonitaTransactionSynchronization
        public void afterCompletion(int i) {
            MessagesHandlingService.this.threadPoolExecutor.submit(new MessagesMatchingTask());
            MessagesHandlingService.this.logger.debug("Messages matching task registered");
        }
    }

    public MessagesHandlingService(EventInstanceService eventInstanceService, WorkService workService, TechnicalLoggerService technicalLoggerService, LockService lockService, Long l, UserTransactionService userTransactionService, SessionAccessor sessionAccessor, BPMWorkFactory bPMWorkFactory, MeterRegistry meterRegistry) {
        this.eventInstanceService = eventInstanceService;
        this.workService = workService;
        this.logger = technicalLoggerService.asLogger(MessagesHandlingService.class);
        this.lockService = lockService;
        this.tenantId = l;
        this.userTransactionService = userTransactionService;
        this.sessionAccessor = sessionAccessor;
        this.workFactory = bPMWorkFactory;
        this.executedMessagesCounter = Counter.builder(NUMBER_OF_MESSAGES_EXECUTED).tags(Tags.of("tenant", String.valueOf(l))).baseUnit("messages").description("BPMN message couples executed").register(meterRegistry);
        this.matchedPotentialMessagesCounter = Counter.builder(NUMBER_OF_MESSAGES_POTENTIAL_MATCHED).tags(Tags.of("tenant", String.valueOf(l))).baseUnit("messages").description("BPMN message couples potentially matched").register(meterRegistry);
        this.retriggeredMatchingTasksCounter = Counter.builder(NUMBER_OF_MESSAGES_MATCHING_RETRIGGERED_TASKS).tags(Tags.of("tenant", String.valueOf(l))).baseUnit("messages matching tasks").description("BPMN message matching tasks retriggered").register(meterRegistry);
    }

    @Override // org.bonitasoft.engine.commons.LifecycleService
    public void start() {
        this.logger.info("Starting BPMN messages matcher thread");
        this.threadPoolExecutor = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.HOURS, new ArrayBlockingQueue(5), runnable -> {
            return new Thread(runnable, "Bonita-Message-Matching");
        }, (runnable2, threadPoolExecutor) -> {
            this.logger.debug("Message matching queue capacity reached");
        });
        this.logger.info("Thread that handle messages matching successfully started");
    }

    @Override // org.bonitasoft.engine.commons.LifecycleService
    public void stop() {
        this.logger.info("Stopping BPMN messages matcher thread");
        if (this.threadPoolExecutor == null) {
            this.logger.info("BPMN messages matcher thread is already stopped");
            return;
        }
        this.threadPoolExecutor.shutdown();
        try {
            if (!this.threadPoolExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                this.logger.warn("Failed to terminate the BPMN messages matcher thread. This will not have functional impacts but it might produce warnings on server shutdown");
            }
        } catch (InterruptedException e) {
        }
        this.threadPoolExecutor = null;
        this.logger.info("BPMN messages matcher thread successfully stopped");
    }

    @Override // org.bonitasoft.engine.commons.LifecycleService
    public void pause() {
        stop();
    }

    @Override // org.bonitasoft.engine.commons.LifecycleService
    public void resume() {
        start();
    }

    public void triggerMatchingOfMessages() throws STransactionNotFoundException {
        if (this.threadPoolExecutor == null) {
            this.logger.warn("Cannot match messages when service is stopped. Maybe the engine is not yet started");
        } else {
            this.userTransactionService.registerBonitaSynchronization(new RegisterMessagesMatchingSynchronization());
        }
    }

    @VisibleForTesting
    void matchEventCoupleAndTriggerExecution() throws Exception {
        this.userTransactionService.executeInTransaction(() -> {
            List<SMessageEventCouple> messageEventCouples = this.eventInstanceService.getMessageEventCouples(0, 100);
            int size = messageEventCouples.size();
            this.logger.info("Found {} potential message/event couples", Integer.valueOf(size));
            this.matchedPotentialMessagesCounter.increment(size);
            List<SMessageEventCouple> messageUniqueCouples = getMessageUniqueCouples(messageEventCouples);
            if (messageUniqueCouples.isEmpty()) {
                this.logger.debug("No message/event couples to be executed");
            } else {
                this.logger.info("Triggering execution of unique {} message/event couples", Integer.valueOf(messageUniqueCouples.size()));
                executeUniqueMessageCouplesWork(messageUniqueCouples);
                this.logger.info("Execution of message/event couples triggered");
            }
            if (size != 100) {
                return null;
            }
            this.logger.debug("There are more than {} message/event couples to match. Will trigger the execution again now, to match more couples", 100);
            triggerMatchingOfMessages();
            this.retriggeredMatchingTasksCounter.increment();
            return null;
        });
    }

    private void executeUniqueMessageCouplesWork(List<SMessageEventCouple> list) throws SBonitaException {
        for (SMessageEventCouple sMessageEventCouple : list) {
            executeMessageCouple(sMessageEventCouple.getMessageInstanceId(), sMessageEventCouple.getWaitingMessageId());
        }
    }

    @VisibleForTesting
    void executeMessageCouple(long j, long j2) throws SWaitingEventReadException, SMessageInstanceReadException, SMessageModificationException, SWaitingEventModificationException, SWorkRegisterException {
        this.logger.debug("Registering message/event couple execution: message {} / event {}", Long.valueOf(j), Long.valueOf(j2));
        SWaitingMessageEvent waitingMessage = this.eventInstanceService.getWaitingMessage(j2);
        SMessageInstance messageInstance = this.eventInstanceService.getMessageInstance(j);
        markMessageAsInProgress(messageInstance);
        if (!SBPMEventType.START_EVENT.equals(waitingMessage.getEventType())) {
            markWaitingMessageAsInProgress(waitingMessage);
        }
        this.executedMessagesCounter.increment();
        this.workService.registerWork(this.workFactory.createExecuteMessageCoupleWorkDescriptor(messageInstance, waitingMessage));
    }

    List<SMessageEventCouple> getMessageUniqueCouples(List<SMessageEventCouple> list) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (SMessageEventCouple sMessageEventCouple : list) {
            long messageInstanceId = sMessageEventCouple.getMessageInstanceId();
            long waitingMessageId = sMessageEventCouple.getWaitingMessageId();
            SBPMEventType waitingMessageEventType = sMessageEventCouple.getWaitingMessageEventType();
            boolean contains = arrayList.contains(Long.valueOf(messageInstanceId));
            if (!contains && !arrayList2.contains(Long.valueOf(waitingMessageId))) {
                arrayList.add(Long.valueOf(messageInstanceId));
                if (!SBPMEventType.START_EVENT.equals(waitingMessageEventType)) {
                    arrayList2.add(Long.valueOf(waitingMessageId));
                }
                arrayList3.add(sMessageEventCouple);
            } else if (this.logger.isTraceEnabled()) {
                this.logger.trace("Ignoring couple: message {} / event {}. Duplication cause: message? {} / event? {}", Long.valueOf(sMessageEventCouple.getMessageInstanceId()), Long.valueOf(sMessageEventCouple.getWaitingMessageId()), Boolean.valueOf(contains), Boolean.valueOf(arrayList2.contains(Long.valueOf(waitingMessageId))));
            }
        }
        return arrayList3;
    }

    private void markMessageAsInProgress(SMessageInstance sMessageInstance) throws SMessageModificationException {
        EntityUpdateDescriptor entityUpdateDescriptor = new EntityUpdateDescriptor();
        entityUpdateDescriptor.addField(SMessageInstanceBuilder.HANDLED, true);
        this.eventInstanceService.updateMessageInstance(sMessageInstance, entityUpdateDescriptor);
    }

    private void markWaitingMessageAsInProgress(SWaitingMessageEvent sWaitingMessageEvent) throws SWaitingEventModificationException {
        EntityUpdateDescriptor entityUpdateDescriptor = new EntityUpdateDescriptor();
        entityUpdateDescriptor.addField(((SWaitingMessageEventBuilderFactory) BuilderFactory.get(SWaitingMessageEventBuilderFactory.class)).getProgressKey(), 1);
        this.eventInstanceService.updateWaitingMessage(sWaitingMessageEvent, entityUpdateDescriptor);
    }

    public void resetMessageCouple(long j, long j2) throws SWaitingEventReadException, SWaitingEventModificationException, SMessageModificationException, SMessageInstanceReadException {
        resetWaitingMessage(j2);
        resetMessageInstance(j);
    }

    private void resetMessageInstance(long j) throws SMessageModificationException, SMessageInstanceReadException {
        SMessageInstance messageInstance = this.eventInstanceService.getMessageInstance(j);
        if (messageInstance == null) {
            this.logger.warn("Unable to reset message instance {} because it is not found", Long.valueOf(j));
            return;
        }
        EntityUpdateDescriptor entityUpdateDescriptor = new EntityUpdateDescriptor();
        entityUpdateDescriptor.addField(SMessageInstanceBuilder.HANDLED, false);
        this.eventInstanceService.updateMessageInstance(messageInstance, entityUpdateDescriptor);
    }

    private void resetWaitingMessage(long j) throws SWaitingEventModificationException, SWaitingEventReadException {
        SWaitingMessageEvent waitingMessage = this.eventInstanceService.getWaitingMessage(j);
        if (waitingMessage == null) {
            this.logger.warn("Unable to reset waiting event because it is not found", Long.valueOf(j));
            return;
        }
        EntityUpdateDescriptor entityUpdateDescriptor = new EntityUpdateDescriptor();
        entityUpdateDescriptor.addField(((SWaitingMessageEventBuilderFactory) BuilderFactory.get(SWaitingMessageEventBuilderFactory.class)).getProgressKey(), 0);
        this.eventInstanceService.updateWaitingMessage(waitingMessage, entityUpdateDescriptor);
    }
}
