package org.axonframework.eventhandling;

import java.util.List;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.io.IOUtils;
import org.axonframework.eventhandling.AbstractEventProcessor;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.unitofwork.BatchingUnitOfWork;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.monitoring.MessageMonitor;

/* loaded from: input_file:org/axonframework/eventhandling/SubscribingEventProcessor.class */
public class SubscribingEventProcessor extends AbstractEventProcessor {
    private final SubscribableMessageSource<? extends EventMessage<?>> messageSource;
    private final EventProcessingStrategy processingStrategy;
    private volatile Registration eventBusRegistration;

    /* loaded from: input_file:org/axonframework/eventhandling/SubscribingEventProcessor$Builder.class */
    public static class Builder extends AbstractEventProcessor.Builder {
        private SubscribableMessageSource<? extends EventMessage<?>> messageSource;
        private EventProcessingStrategy processingStrategy = DirectEventProcessingStrategy.INSTANCE;

        public Builder() {
            super.rollbackConfiguration((RollbackConfiguration) RollbackConfigurationType.ANY_THROWABLE);
        }

        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public Builder name(String str) {
            super.name(str);
            return this;
        }

        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public Builder eventHandlerInvoker(EventHandlerInvoker eventHandlerInvoker) {
            super.eventHandlerInvoker(eventHandlerInvoker);
            return this;
        }

        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public Builder rollbackConfiguration(RollbackConfiguration rollbackConfiguration) {
            super.rollbackConfiguration(rollbackConfiguration);
            return this;
        }

        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public Builder errorHandler(ErrorHandler errorHandler) {
            super.errorHandler(errorHandler);
            return this;
        }

        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public Builder messageMonitor(MessageMonitor<? super EventMessage<?>> messageMonitor) {
            super.messageMonitor(messageMonitor);
            return this;
        }

        public Builder messageSource(SubscribableMessageSource<? extends EventMessage<?>> subscribableMessageSource) {
            BuilderUtils.assertNonNull(subscribableMessageSource, "SubscribableMessageSource may not be null");
            this.messageSource = subscribableMessageSource;
            return this;
        }

        public Builder processingStrategy(EventProcessingStrategy eventProcessingStrategy) {
            BuilderUtils.assertNonNull(eventProcessingStrategy, "EventProcessingStrategy may not be null");
            this.processingStrategy = eventProcessingStrategy;
            return this;
        }

        public SubscribingEventProcessor build() {
            return new SubscribingEventProcessor(this);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public void validate() throws AxonConfigurationException {
            super.validate();
            BuilderUtils.assertNonNull(this.messageSource, "The SubscribableMessageSource is a hard requirement and should be provided");
        }

        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public /* bridge */ /* synthetic */ AbstractEventProcessor.Builder messageMonitor(MessageMonitor messageMonitor) {
            return messageMonitor((MessageMonitor<? super EventMessage<?>>) messageMonitor);
        }
    }

    protected SubscribingEventProcessor(Builder builder) {
        super(builder);
        this.messageSource = builder.messageSource;
        this.processingStrategy = builder.processingStrategy;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override // org.axonframework.eventhandling.EventProcessor
    public void start() {
        if (this.eventBusRegistration == null) {
            this.eventBusRegistration = this.messageSource.subscribe(list -> {
                this.processingStrategy.handle(list, this::process);
            });
        }
    }

    protected void process(List<? extends EventMessage<?>> list) {
        try {
            processInUnitOfWork(list, new BatchingUnitOfWork(list), Segment.ROOT_SEGMENT);
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new EventProcessingException("Exception occurred while processing events", e2);
        }
    }

    @Override // org.axonframework.eventhandling.EventProcessor
    public void shutDown() {
        IOUtils.closeQuietly(this.eventBusRegistration);
        this.eventBusRegistration = null;
    }
}
