package org.axonframework.saga.annotation;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.Subscribable;
import org.axonframework.common.annotation.ClasspathParameterResolverFactory;
import org.axonframework.common.annotation.ParameterResolverFactory;
import org.axonframework.correlation.CorrelationDataProvider;
import org.axonframework.correlation.MultiCorrelationDataProvider;
import org.axonframework.correlation.SimpleCorrelationDataProvider;
import org.axonframework.domain.EventMessage;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventProcessingMonitor;
import org.axonframework.eventhandling.EventProcessingMonitorCollection;
import org.axonframework.eventhandling.EventProcessingMonitorSupport;
import org.axonframework.saga.GenericSagaFactory;
import org.axonframework.saga.SagaCreationPolicy;
import org.axonframework.saga.SagaFactory;
import org.axonframework.saga.SagaManager;
import org.axonframework.saga.SagaRepository;
import org.axonframework.saga.annotation.AsyncSagaProcessingEvent;
import org.axonframework.saga.repository.inmemory.InMemorySagaRepository;
import org.axonframework.unitofwork.DefaultUnitOfWorkFactory;
import org.axonframework.unitofwork.TransactionManager;
import org.axonframework.unitofwork.UnitOfWorkFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/saga/annotation/AsyncAnnotatedSagaManager.class */
public class AsyncAnnotatedSagaManager implements SagaManager, Subscribable, EventProcessingMonitorSupport {
    private static final WaitStrategy DEFAULT_WAIT_STRATEGY = new BlockingWaitStrategy();
    private WaitStrategy waitStrategy;
    private static final int DEFAULT_BUFFER_SIZE = 512;
    private int bufferSize;
    private static final int DEFAULT_PROCESSOR_COUNT = 1;
    private int processorCount;
    private final EventBus eventBus;
    private final Class<? extends AbstractAnnotatedSaga>[] sagaTypes;
    private final ParameterResolverFactory parameterResolverFactory;
    private final SagaManagerStatus sagaManagerStatus;
    private final EventProcessingMonitorCollection processingMonitors;
    private volatile Disruptor<AsyncSagaProcessingEvent> disruptor;
    private boolean shutdownExecutorOnStop;
    private Executor executor;
    private SagaRepository sagaRepository;
    private volatile SagaFactory sagaFactory;
    private UnitOfWorkFactory unitOfWorkFactory;
    private long startTimeout;
    private CorrelationDataProvider<? super EventMessage> correlationDataProvider;
    private ErrorHandler errorHandler;

    /* loaded from: input_file:org/axonframework/saga/annotation/AsyncAnnotatedSagaManager$LoggingExceptionHandler.class */
    private static final class LoggingExceptionHandler implements ExceptionHandler {
        private static final Logger logger = LoggerFactory.getLogger(LoggingExceptionHandler.class);

        private LoggingExceptionHandler() {
        }

        public void handleEventException(Throwable th, long j, Object obj) {
            logger.warn("A fatal exception occurred while processing an Event for a Saga. Processing will continue with the next Event", th);
        }

        public void handleOnStartException(Throwable th) {
            logger.warn("An exception occurred while starting the AsyncAnnotatedSagaManager.", th);
        }

        public void handleOnShutdownException(Throwable th) {
            logger.warn("An exception occurred while shutting down the AsyncAnnotatedSagaManager.", th);
        }
    }

    /* loaded from: input_file:org/axonframework/saga/annotation/AsyncAnnotatedSagaManager$MonitorNotifier.class */
    private static class MonitorNotifier implements EventHandler<AsyncSagaProcessingEvent> {
        private final EventProcessingMonitor monitor;
        private final List<EventMessage> processedMessages = new ArrayList();

        public MonitorNotifier(EventProcessingMonitor eventProcessingMonitor) {
            this.monitor = eventProcessingMonitor;
        }

        public void onEvent(AsyncSagaProcessingEvent asyncSagaProcessingEvent, long j, boolean z) throws Exception {
            this.processedMessages.add(asyncSagaProcessingEvent.getPublishedEvent());
            if (z) {
                this.monitor.onEventProcessingCompleted(this.processedMessages);
                this.processedMessages.clear();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/axonframework/saga/annotation/AsyncAnnotatedSagaManager$SagaManagerStatus.class */
    public static class SagaManagerStatus {
        private volatile boolean isRunning;

        SagaManagerStatus() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setStatus(boolean z) {
            this.isRunning = z;
        }

        public boolean isRunning() {
            return this.isRunning;
        }
    }

    /* loaded from: input_file:org/axonframework/saga/annotation/AsyncAnnotatedSagaManager$SagaProcessingEventTranslator.class */
    private static final class SagaProcessingEventTranslator implements EventTranslator<AsyncSagaProcessingEvent> {
        private final EventMessage event;
        private final SagaMethodMessageHandlerInspector annotationInspector;
        private final List<SagaMethodMessageHandler> handlers;
        private final AbstractAnnotatedSaga newSagaInstance;

        private SagaProcessingEventTranslator(EventMessage eventMessage, SagaMethodMessageHandlerInspector sagaMethodMessageHandlerInspector, List<SagaMethodMessageHandler> list, AbstractAnnotatedSaga abstractAnnotatedSaga) {
            this.event = eventMessage;
            this.annotationInspector = sagaMethodMessageHandlerInspector;
            this.handlers = list;
            this.newSagaInstance = abstractAnnotatedSaga;
        }

        public void translateTo(AsyncSagaProcessingEvent asyncSagaProcessingEvent, long j) {
            asyncSagaProcessingEvent.reset(this.event, this.annotationInspector.getSagaType(), this.handlers, this.newSagaInstance);
        }
    }

    /* loaded from: input_file:org/axonframework/saga/annotation/AsyncAnnotatedSagaManager$StartDetectingRunnable.class */
    private static class StartDetectingRunnable implements Runnable {
        private final Runnable delegate;
        private final CountDownLatch cdl = new CountDownLatch(AsyncAnnotatedSagaManager.DEFAULT_PROCESSOR_COUNT);

        public StartDetectingRunnable(Runnable runnable) {
            this.delegate = runnable;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.cdl.countDown();
            this.delegate.run();
        }

        public boolean awaitStarted(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.cdl.await(j, timeUnit);
        }
    }

    /* loaded from: input_file:org/axonframework/saga/annotation/AsyncAnnotatedSagaManager$ValidatingExecutor.class */
    private static class ValidatingExecutor implements Executor {
        private final Executor delegate;
        private final long timeoutMillis;

        public ValidatingExecutor(Executor executor, long j) {
            this.delegate = executor;
            this.timeoutMillis = j;
        }

        @Override // java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            StartDetectingRunnable startDetectingRunnable = new StartDetectingRunnable(runnable);
            this.delegate.execute(startDetectingRunnable);
            try {
                if (startDetectingRunnable.awaitStarted(this.timeoutMillis, TimeUnit.MILLISECONDS)) {
                } else {
                    throw new AxonConfigurationException("It seems that the given Executor is not providing a thread for the AsyncSagaManager. Ensure that the corePoolSize is larger than the processor count.");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Deprecated
    public AsyncAnnotatedSagaManager(EventBus eventBus, Class<? extends AbstractAnnotatedSaga>... clsArr) {
        this.waitStrategy = DEFAULT_WAIT_STRATEGY;
        this.bufferSize = DEFAULT_BUFFER_SIZE;
        this.processorCount = DEFAULT_PROCESSOR_COUNT;
        this.sagaManagerStatus = new SagaManagerStatus();
        this.processingMonitors = new EventProcessingMonitorCollection();
        this.shutdownExecutorOnStop = true;
        this.executor = Executors.newCachedThreadPool();
        this.sagaRepository = new InMemorySagaRepository();
        this.sagaFactory = new GenericSagaFactory();
        this.unitOfWorkFactory = new DefaultUnitOfWorkFactory();
        this.startTimeout = 5000L;
        this.correlationDataProvider = new SimpleCorrelationDataProvider(new String[0]);
        this.errorHandler = new ProceedingErrorHandler();
        Assert.notNull(eventBus, "eventBus may not be null");
        this.eventBus = eventBus;
        this.sagaTypes = (Class[]) Arrays.copyOf(clsArr, clsArr.length);
        this.parameterResolverFactory = ClasspathParameterResolverFactory.forClass(clsArr.length == 0 ? AsyncAnnotatedSagaManager.class : clsArr[0]);
    }

    public AsyncAnnotatedSagaManager(Class<? extends AbstractAnnotatedSaga>... clsArr) {
        this(ClasspathParameterResolverFactory.forClass(clsArr.length == 0 ? AsyncAnnotatedSagaManager.class : clsArr[0]), clsArr);
    }

    public AsyncAnnotatedSagaManager(ParameterResolverFactory parameterResolverFactory, Class<? extends AbstractAnnotatedSaga>... clsArr) {
        this.waitStrategy = DEFAULT_WAIT_STRATEGY;
        this.bufferSize = DEFAULT_BUFFER_SIZE;
        this.processorCount = DEFAULT_PROCESSOR_COUNT;
        this.sagaManagerStatus = new SagaManagerStatus();
        this.processingMonitors = new EventProcessingMonitorCollection();
        this.shutdownExecutorOnStop = true;
        this.executor = Executors.newCachedThreadPool();
        this.sagaRepository = new InMemorySagaRepository();
        this.sagaFactory = new GenericSagaFactory();
        this.unitOfWorkFactory = new DefaultUnitOfWorkFactory();
        this.startTimeout = 5000L;
        this.correlationDataProvider = new SimpleCorrelationDataProvider(new String[0]);
        this.errorHandler = new ProceedingErrorHandler();
        this.parameterResolverFactory = parameterResolverFactory;
        this.eventBus = null;
        this.sagaTypes = (Class[]) Arrays.copyOf(clsArr, clsArr.length);
    }

    public synchronized void start() {
        if (this.disruptor == null) {
            this.sagaManagerStatus.setStatus(true);
            this.disruptor = new Disruptor<>(new AsyncSagaProcessingEvent.Factory(), this.bufferSize, new ValidatingExecutor(this.executor, this.startTimeout), ProducerType.MULTI, this.waitStrategy);
            this.disruptor.handleExceptionsWith(new LoggingExceptionHandler());
            this.disruptor.handleEventsWith(AsyncSagaEventProcessor.createInstances(this.sagaRepository, this.parameterResolverFactory, this.unitOfWorkFactory, this.processorCount, this.disruptor.getRingBuffer(), this.sagaManagerStatus, this.correlationDataProvider, this.errorHandler)).then(new EventHandler[]{new MonitorNotifier(this.processingMonitors)});
            this.disruptor.start();
        }
        subscribe();
    }

    public synchronized void stop() {
        this.sagaManagerStatus.setStatus(false);
        unsubscribe();
        if (this.disruptor != null) {
            this.disruptor.shutdown();
            if (this.shutdownExecutorOnStop && (this.executor instanceof ExecutorService)) {
                ((ExecutorService) this.executor).shutdown();
            }
        }
        this.disruptor = null;
    }

    @Override // org.axonframework.common.Subscribable
    public void unsubscribe() {
        if (this.eventBus != null) {
            this.eventBus.unsubscribe(this);
        }
    }

    @Override // org.axonframework.common.Subscribable
    public void subscribe() {
        if (this.eventBus != null) {
            this.eventBus.subscribe(this);
        }
    }

    @Override // org.axonframework.saga.SagaManager, org.axonframework.eventhandling.EventListener
    public void handle(EventMessage eventMessage) {
        if (this.disruptor != null) {
            Class<? extends AbstractAnnotatedSaga>[] clsArr = this.sagaTypes;
            int length = clsArr.length;
            for (int i = 0; i < length; i += DEFAULT_PROCESSOR_COUNT) {
                SagaMethodMessageHandlerInspector sagaMethodMessageHandlerInspector = SagaMethodMessageHandlerInspector.getInstance(clsArr[i], this.parameterResolverFactory);
                List<SagaMethodMessageHandler> messageHandlers = sagaMethodMessageHandlerInspector.getMessageHandlers(eventMessage);
                if (!messageHandlers.isEmpty()) {
                    AbstractAnnotatedSaga abstractAnnotatedSaga = null;
                    for (SagaMethodMessageHandler sagaMethodMessageHandler : messageHandlers) {
                        if (abstractAnnotatedSaga == null && sagaMethodMessageHandler.getCreationPolicy() != SagaCreationPolicy.NONE) {
                            abstractAnnotatedSaga = (AbstractAnnotatedSaga) this.sagaFactory.createSaga(sagaMethodMessageHandlerInspector.getSagaType());
                        }
                    }
                    this.disruptor.publishEvent(new SagaProcessingEventTranslator(eventMessage, sagaMethodMessageHandlerInspector, messageHandlers, abstractAnnotatedSaga));
                }
            }
        }
    }

    @Override // org.axonframework.eventhandling.EventListenerProxy
    public Class<?> getTargetType() {
        return this.sagaTypes.length > 0 ? this.sagaTypes[0] : Void.TYPE;
    }

    @Override // org.axonframework.eventhandling.EventProcessingMonitorSupport
    public void subscribeEventProcessingMonitor(EventProcessingMonitor eventProcessingMonitor) {
        this.processingMonitors.subscribeEventProcessingMonitor(eventProcessingMonitor);
    }

    @Override // org.axonframework.eventhandling.EventProcessingMonitorSupport
    public void unsubscribeEventProcessingMonitor(EventProcessingMonitor eventProcessingMonitor) {
        this.processingMonitors.unsubscribeEventProcessingMonitor(eventProcessingMonitor);
    }

    public synchronized void setExecutor(Executor executor) {
        Assert.state(this.disruptor == null, "Cannot set executor after SagaManager has started");
        this.shutdownExecutorOnStop = false;
        this.executor = executor;
    }

    public synchronized void setSagaRepository(SagaRepository sagaRepository) {
        Assert.state(this.disruptor == null, "Cannot set sagaRepository when SagaManager has started");
        this.sagaRepository = sagaRepository;
    }

    public synchronized void setSagaFactory(SagaFactory sagaFactory) {
        Assert.state(this.disruptor == null, "Cannot set sagaFactory when SagaManager has started");
        this.sagaFactory = sagaFactory;
    }

    public synchronized void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public synchronized void setTransactionManager(TransactionManager transactionManager) {
        Assert.state(this.disruptor == null, "Cannot set transactionManager when SagaManager has started");
        this.unitOfWorkFactory = new DefaultUnitOfWorkFactory(transactionManager);
    }

    public synchronized void setProcessorCount(int i) {
        Assert.state(this.disruptor == null, "Cannot set processorCount when SagaManager has started");
        this.processorCount = i;
    }

    public synchronized void setStartTimeout(long j) {
        this.startTimeout = j;
    }

    public synchronized void setBufferSize(int i) {
        Assert.isTrue(Integer.bitCount(i) == DEFAULT_PROCESSOR_COUNT, "The buffer size must be a power of 2");
        Assert.state(this.disruptor == null, "Cannot set bufferSize when SagaManager has started");
        this.bufferSize = i;
    }

    public synchronized void setWaitStrategy(WaitStrategy waitStrategy) {
        Assert.state(this.disruptor == null, "Cannot set waitStrategy when SagaManager has started");
        this.waitStrategy = waitStrategy;
    }

    public synchronized void setCorrelationDataProvider(CorrelationDataProvider<? super EventMessage> correlationDataProvider) {
        this.correlationDataProvider = correlationDataProvider;
    }

    public synchronized void setCorrelationDataProviders(List<? extends CorrelationDataProvider<? super EventMessage>> list) {
        this.correlationDataProvider = new MultiCorrelationDataProvider(list);
    }
}
