package org.axonframework.queryhandling;

import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.messaging.DefaultInterceptorChain;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.ResultMessage;
import org.axonframework.messaging.interceptors.TransactionManagingInterceptor;
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.messaging.unitofwork.DefaultUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.axonframework.queryhandling.registration.DuplicateQueryHandlerResolution;
import org.axonframework.queryhandling.registration.DuplicateQueryHandlerResolver;
import org.axonframework.tracing.NoOpSpanFactory;
import org.axonframework.tracing.Span;
import org.axonframework.tracing.SpanFactory;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import reactor.util.context.Context;

/* loaded from: input_file:org/axonframework/queryhandling/SimpleQueryBus.class */
public class SimpleQueryBus implements QueryBus {
    private static final Logger logger = LoggerFactory.getLogger(SimpleQueryBus.class);
    private final MessageMonitor<? super QueryMessage<?, ?>> messageMonitor;
    private final DuplicateQueryHandlerResolver duplicateQueryHandlerResolver;
    private final QueryInvocationErrorHandler errorHandler;
    private final SpanFactory spanFactory;
    private final QueryUpdateEmitter queryUpdateEmitter;
    private final ConcurrentMap<String, List<QuerySubscription<?>>> subscriptions = new ConcurrentHashMap();
    private final List<MessageHandlerInterceptor<? super QueryMessage<?, ?>>> handlerInterceptors = new CopyOnWriteArrayList();
    private final List<MessageDispatchInterceptor<? super QueryMessage<?, ?>>> dispatchInterceptors = new CopyOnWriteArrayList();

    /* loaded from: input_file:org/axonframework/queryhandling/SimpleQueryBus$Builder.class */
    public static class Builder {
        private MessageMonitor<? super QueryMessage<?, ?>> messageMonitor = NoOpMessageMonitor.INSTANCE;
        private TransactionManager transactionManager = NoTransactionManager.instance();
        private QueryInvocationErrorHandler errorHandler = LoggingQueryInvocationErrorHandler.builder().logger(SimpleQueryBus.logger).build();
        private DuplicateQueryHandlerResolver duplicateQueryHandlerResolver = DuplicateQueryHandlerResolution.logAndAccept();
        private QueryUpdateEmitter queryUpdateEmitter = SimpleQueryUpdateEmitter.builder().spanFactory(NoOpSpanFactory.INSTANCE).build();
        private SpanFactory spanFactory = NoOpSpanFactory.INSTANCE;

        public Builder messageMonitor(@Nonnull MessageMonitor<? super QueryMessage<?, ?>> messageMonitor) {
            BuilderUtils.assertNonNull(messageMonitor, "MessageMonitor may not be null");
            this.messageMonitor = messageMonitor;
            return this;
        }

        public Builder duplicateQueryHandlerResolver(DuplicateQueryHandlerResolver duplicateQueryHandlerResolver) {
            BuilderUtils.assertNonNull(duplicateQueryHandlerResolver, "DuplicateQueryHandlerResolver may not be null");
            this.duplicateQueryHandlerResolver = duplicateQueryHandlerResolver;
            return this;
        }

        public Builder transactionManager(@Nonnull TransactionManager transactionManager) {
            BuilderUtils.assertNonNull(transactionManager, "TransactionManager may not be null");
            this.transactionManager = transactionManager;
            return this;
        }

        public Builder errorHandler(@Nonnull QueryInvocationErrorHandler queryInvocationErrorHandler) {
            BuilderUtils.assertNonNull(queryInvocationErrorHandler, "QueryInvocationErrorHandler may not be null");
            this.errorHandler = queryInvocationErrorHandler;
            return this;
        }

        public Builder queryUpdateEmitter(@Nonnull QueryUpdateEmitter queryUpdateEmitter) {
            BuilderUtils.assertNonNull(queryUpdateEmitter, "QueryUpdateEmitter may not be null");
            this.queryUpdateEmitter = queryUpdateEmitter;
            return this;
        }

        public Builder spanFactory(@Nonnull SpanFactory spanFactory) {
            BuilderUtils.assertNonNull(spanFactory, "SpanFactory may not be null");
            this.spanFactory = spanFactory;
            return this;
        }

        public SimpleQueryBus build() {
            return new SimpleQueryBus(this);
        }

        protected void validate() throws AxonConfigurationException {
        }
    }

    /* loaded from: input_file:org/axonframework/queryhandling/SimpleQueryBus$MonitorCallbackContextWriter.class */
    private static class MonitorCallbackContextWriter implements UnaryOperator<Context> {
        private final MessageMonitor<? super QueryMessage<?, ?>> messageMonitor;
        private final StreamingQueryMessage<?, ?> query;

        private MonitorCallbackContextWriter(MessageMonitor<? super QueryMessage<?, ?>> messageMonitor, StreamingQueryMessage<?, ?> streamingQueryMessage) {
            this.messageMonitor = messageMonitor;
            this.query = streamingQueryMessage;
        }

        @Override // java.util.function.Function
        public Context apply(Context context) {
            return context.put(MessageMonitor.MonitorCallback.class, this.messageMonitor.onMessageIngested(this.query));
        }
    }

    /* loaded from: input_file:org/axonframework/queryhandling/SimpleQueryBus$SuccessReporter.class */
    private static class SuccessReporter implements Consumer<Signal<?>> {
        private SuccessReporter() {
        }

        /* renamed from: accept, reason: avoid collision after fix types in other method */
        public void accept2(Signal signal) {
            MessageMonitor.MonitorCallback monitorCallback = (MessageMonitor.MonitorCallback) signal.getContextView().get(MessageMonitor.MonitorCallback.class);
            if (signal.isOnNext()) {
                monitorCallback.reportSuccess();
            } else if (signal.isOnError()) {
                monitorCallback.reportFailure(signal.getThrowable());
            }
        }

        @Override // java.util.function.Consumer
        public /* bridge */ /* synthetic */ void accept(Signal<?> signal) {
            accept2((Signal) signal);
        }
    }

    protected SimpleQueryBus(Builder builder) {
        builder.validate();
        this.messageMonitor = builder.messageMonitor;
        this.errorHandler = builder.errorHandler;
        if (builder.transactionManager != NoTransactionManager.INSTANCE) {
            registerHandlerInterceptor(new TransactionManagingInterceptor(builder.transactionManager));
        }
        this.queryUpdateEmitter = builder.queryUpdateEmitter;
        this.duplicateQueryHandlerResolver = builder.duplicateQueryHandlerResolver;
        this.spanFactory = builder.spanFactory;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override // org.axonframework.queryhandling.QueryBus
    public <R> Registration subscribe(@Nonnull String str, @Nonnull Type type, @Nonnull MessageHandler<? super QueryMessage<?, R>> messageHandler) {
        QuerySubscription<?> querySubscription = new QuerySubscription<>(type, messageHandler);
        List<QuerySubscription<?>> computeIfAbsent = this.subscriptions.computeIfAbsent(str, str2 -> {
            return new CopyOnWriteArrayList();
        });
        if (computeIfAbsent.contains(querySubscription)) {
            return () -> {
                return unsubscribe(str, querySubscription);
            };
        }
        List<QuerySubscription<?>> list = (List) computeIfAbsent.stream().filter(querySubscription2 -> {
            return querySubscription2.getResponseType().equals(type);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            computeIfAbsent.add(querySubscription);
        } else {
            this.subscriptions.put(str, this.duplicateQueryHandlerResolver.resolve(str, type, list, querySubscription));
        }
        return () -> {
            return unsubscribe(str, querySubscription);
        };
    }

    private <R> boolean unsubscribe(String str, QuerySubscription<R> querySubscription) {
        this.subscriptions.computeIfPresent(str, (str2, list) -> {
            list.remove(querySubscription);
            if (list.isEmpty()) {
                return null;
            }
            return list;
        });
        return true;
    }

    @Override // org.axonframework.queryhandling.QueryBus
    public <Q, R> CompletableFuture<QueryResponseMessage<R>> query(@Nonnull QueryMessage<Q, R> queryMessage) {
        Span start = this.spanFactory.createInternalSpan(() -> {
            return "SimpleQueryBus.query";
        }, queryMessage).start();
        return doQuery(queryMessage).whenComplete((queryResponseMessage, th) -> {
            if (th != null) {
                start.recordException(th);
            }
            start.end();
        });
    }

    @Nonnull
    private <Q, R> CompletableFuture<QueryResponseMessage<R>> doQuery(@Nonnull QueryMessage<Q, R> queryMessage) {
        ResponseType<R> responseType;
        Assert.isFalse(Publisher.class.isAssignableFrom(queryMessage.getResponseType().getExpectedResponseType()), () -> {
            return "Direct query does not support Flux as a return type.";
        });
        MessageMonitor.MonitorCallback onMessageIngested = this.messageMonitor.onMessageIngested(queryMessage);
        QueryMessage<Q, R> intercept = intercept(queryMessage);
        List<MessageHandler<? super QueryMessage<?, ?>>> handlersForMessage = getHandlersForMessage(intercept);
        CompletableFuture<QueryResponseMessage<R>> completableFuture = new CompletableFuture<>();
        try {
            responseType = intercept.getResponseType();
        } catch (Exception e) {
            completableFuture.completeExceptionally(e);
            onMessageIngested.reportFailure(e);
        }
        if (handlersForMessage.isEmpty()) {
            throw noHandlerException(intercept);
        }
        Iterator<MessageHandler<? super QueryMessage<?, ?>>> it = handlersForMessage.iterator();
        boolean z = false;
        while (!z && it.hasNext()) {
            ResultMessage<CompletableFuture<QueryResponseMessage<R>>> interceptAndInvoke = interceptAndInvoke(DefaultUnitOfWork.startAndGet(intercept), it.next());
            if (!interceptAndInvoke.isExceptional()) {
                completableFuture = interceptAndInvoke.getPayload();
                z = true;
            } else if (!(interceptAndInvoke.exceptionResult() instanceof NoHandlerForQueryException)) {
                completableFuture.complete((GenericQueryResponseMessage) responseType.convertExceptional(interceptAndInvoke.exceptionResult()).map(GenericQueryResponseMessage::new).orElse(new GenericQueryResponseMessage((Class) responseType.responseMessagePayloadType(), interceptAndInvoke.exceptionResult())));
                onMessageIngested.reportFailure(interceptAndInvoke.exceptionResult());
                return completableFuture;
            }
        }
        if (!z) {
            throw noSuitableHandlerException(intercept);
        }
        onMessageIngested.reportSuccess();
        return completableFuture;
    }

    @Override // org.axonframework.queryhandling.QueryBus
    public <Q, R> Publisher<QueryResponseMessage<R>> streamingQuery(StreamingQueryMessage<Q, R> streamingQueryMessage) {
        return Mono.just(intercept(streamingQueryMessage)).flatMapMany(streamingQueryMessage2 -> {
            return Mono.just(streamingQueryMessage2).flatMapMany(this::getStreamingHandlersForMessage).switchIfEmpty(Flux.error(noHandlerException(streamingQueryMessage2))).map(messageHandler -> {
                return interceptAndInvokeStreaming(streamingQueryMessage2, messageHandler);
            }).skipWhile((v0) -> {
                return v0.isExceptional();
            }).switchIfEmpty(Flux.error(noSuitableHandlerException(streamingQueryMessage2))).next().doOnEach(new SuccessReporter()).flatMapMany((v0) -> {
                return v0.getPayload();
            });
        }).contextWrite(new MonitorCallbackContextWriter(this.messageMonitor, streamingQueryMessage));
    }

    private NoHandlerForQueryException noHandlerException(QueryMessage<?, ?> queryMessage) {
        return new NoHandlerForQueryException(String.format("No handler found for [%s] with response type [%s]", queryMessage.getQueryName(), queryMessage.getResponseType()));
    }

    private NoHandlerForQueryException noSuitableHandlerException(QueryMessage<?, ?> queryMessage) {
        return new NoHandlerForQueryException(String.format("No suitable handler was found for [%s] with response type [%s]", queryMessage.getQueryName(), queryMessage.getResponseType()));
    }

    @Override // org.axonframework.queryhandling.QueryBus
    public <Q, R> Stream<QueryResponseMessage<R>> scatterGather(@Nonnull QueryMessage<Q, R> queryMessage, long j, @Nonnull TimeUnit timeUnit) {
        Assert.isFalse(Publisher.class.isAssignableFrom(queryMessage.getResponseType().getExpectedResponseType()), () -> {
            return "Scatter-Gather query does not support Flux as a return type.";
        });
        MessageMonitor.MonitorCallback onMessageIngested = this.messageMonitor.onMessageIngested(queryMessage);
        QueryMessage<Q, R> intercept = intercept(queryMessage);
        List<MessageHandler<? super QueryMessage<?, ?>>> handlersForMessage = getHandlersForMessage(intercept);
        if (handlersForMessage.isEmpty()) {
            onMessageIngested.reportIgnored();
            return Stream.empty();
        }
        long currentTimeMillis = System.currentTimeMillis() + timeUnit.toMillis(j);
        return handlersForMessage.stream().map(messageHandler -> {
            Span start = this.spanFactory.createInternalSpan(() -> {
                return "SimpleQueryBus.scatterGather(" + handlersForMessage.indexOf(messageHandler) + ")";
            }, queryMessage).start();
            long remainingOfDeadline = ObjectUtils.getRemainingOfDeadline(currentTimeMillis);
            ResultMessage interceptAndInvoke = interceptAndInvoke(DefaultUnitOfWork.startAndGet(intercept), messageHandler);
            QueryResponseMessage queryResponseMessage = null;
            if (interceptAndInvoke.isExceptional()) {
                onMessageIngested.reportFailure(interceptAndInvoke.exceptionResult());
                start.recordException(interceptAndInvoke.exceptionResult());
                this.errorHandler.onError(interceptAndInvoke.exceptionResult(), intercept, messageHandler);
            } else {
                try {
                    queryResponseMessage = (QueryResponseMessage) ((CompletableFuture) interceptAndInvoke.getPayload()).get(remainingOfDeadline, TimeUnit.MILLISECONDS);
                    onMessageIngested.reportSuccess();
                } catch (Exception e) {
                    start.recordException(e);
                    onMessageIngested.reportFailure(e);
                    this.errorHandler.onError(e, intercept, messageHandler);
                }
            }
            start.end();
            return queryResponseMessage;
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        });
    }

    @Override // org.axonframework.queryhandling.QueryBus
    @Deprecated
    public <Q, I, U> SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> subscriptionQuery(@Nonnull SubscriptionQueryMessage<Q, I, U> subscriptionQueryMessage, SubscriptionQueryBackpressure subscriptionQueryBackpressure, int i) {
        assertSubQueryResponseTypes(subscriptionQueryMessage);
        if (this.queryUpdateEmitter.queryUpdateHandlerRegistered(subscriptionQueryMessage)) {
            throw new IllegalArgumentException("There is already a subscription with the given message identifier");
        }
        return getSubscriptionQueryResult(Mono.fromFuture(() -> {
            return query(subscriptionQueryMessage);
        }).doOnError(th -> {
            logger.error(String.format("An error happened while trying to report an initial result. Query: %s", subscriptionQueryMessage), th);
        }), this.queryUpdateEmitter.registerUpdateHandler(subscriptionQueryMessage, subscriptionQueryBackpressure, i));
    }

    @Override // org.axonframework.queryhandling.QueryBus
    public <Q, I, U> SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> subscriptionQuery(@Nonnull SubscriptionQueryMessage<Q, I, U> subscriptionQueryMessage, int i) {
        assertSubQueryResponseTypes(subscriptionQueryMessage);
        if (this.queryUpdateEmitter.queryUpdateHandlerRegistered(subscriptionQueryMessage)) {
            throw new IllegalArgumentException("There is already a subscription with the given message identifier");
        }
        return getSubscriptionQueryResult(Mono.fromFuture(() -> {
            return query(subscriptionQueryMessage);
        }).doOnError(th -> {
            logger.error(String.format("An error happened while trying to report an initial result. Query: %s", subscriptionQueryMessage), th);
        }), this.queryUpdateEmitter.registerUpdateHandler(subscriptionQueryMessage, i));
    }

    private <Q, I, U> void assertSubQueryResponseTypes(SubscriptionQueryMessage<Q, I, U> subscriptionQueryMessage) {
        Assert.isFalse(Publisher.class.isAssignableFrom(subscriptionQueryMessage.getResponseType().getExpectedResponseType()), () -> {
            return "Subscription Query query does not support Flux as a return type.";
        });
        Assert.isFalse(Publisher.class.isAssignableFrom(subscriptionQueryMessage.getUpdateResponseType().getExpectedResponseType()), () -> {
            return "Subscription Query query does not support Flux as an update type.";
        });
    }

    private <I, U> DefaultSubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> getSubscriptionQueryResult(Publisher<QueryResponseMessage<I>> publisher, UpdateHandlerRegistration<U> updateHandlerRegistration) {
        return new DefaultSubscriptionQueryResult<>(Mono.from(publisher), updateHandlerRegistration.getUpdates(), () -> {
            updateHandlerRegistration.complete();
            return true;
        });
    }

    @Override // org.axonframework.queryhandling.QueryBus
    public QueryUpdateEmitter queryUpdateEmitter() {
        return this.queryUpdateEmitter;
    }

    private <Q, R> ResultMessage<CompletableFuture<QueryResponseMessage<R>>> interceptAndInvoke(UnitOfWork<QueryMessage<Q, R>> unitOfWork, MessageHandler<? super QueryMessage<?, R>> messageHandler) {
        return unitOfWork.executeWithResult(() -> {
            ResponseType responseType = ((QueryMessage) unitOfWork.getMessage()).getResponseType();
            Object proceed = new DefaultInterceptorChain(unitOfWork, this.handlerInterceptors, messageHandler).proceed();
            return proceed instanceof CompletableFuture ? ((CompletableFuture) proceed).thenCompose(obj -> {
                return buildCompletableFuture(responseType, obj);
            }) : proceed instanceof Future ? CompletableFuture.supplyAsync(() -> {
                try {
                    return GenericQueryResponseMessage.asNullableResponseMessage(responseType.responseMessagePayloadType(), responseType.convert(((Future) proceed).get()));
                } catch (InterruptedException | ExecutionException e) {
                    throw new QueryExecutionException("Error happened while trying to execute query handler", e);
                }
            }) : buildCompletableFuture(responseType, proceed);
        });
    }

    private <Q, R> ResultMessage<Publisher<QueryResponseMessage<R>>> interceptAndInvokeStreaming(StreamingQueryMessage<Q, R> streamingQueryMessage, MessageHandler<? super StreamingQueryMessage<?, R>> messageHandler) {
        DefaultUnitOfWork startAndGet = DefaultUnitOfWork.startAndGet(streamingQueryMessage);
        return startAndGet.executeWithResult(() -> {
            return Flux.from((Publisher) streamingQueryMessage.getResponseType().convert(new DefaultInterceptorChain(startAndGet, this.handlerInterceptors, messageHandler).proceed())).map(GenericQueryResponseMessage::asResponseMessage);
        });
    }

    private <R> CompletableFuture<QueryResponseMessage<R>> buildCompletableFuture(ResponseType<R> responseType, Object obj) {
        return CompletableFuture.completedFuture(GenericQueryResponseMessage.asNullableResponseMessage(responseType.responseMessagePayloadType(), responseType.convert(obj)));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v12, types: [org.axonframework.queryhandling.QueryMessage] */
    private <Q, R, T extends QueryMessage<Q, R>> T intercept(T t) {
        T t2 = t;
        Iterator<MessageDispatchInterceptor<? super QueryMessage<?, ?>>> it = this.dispatchInterceptors.iterator();
        while (it.hasNext()) {
            t2 = (QueryMessage) it.next().handle((MessageDispatchInterceptor<? super QueryMessage<?, ?>>) t2);
        }
        return t2;
    }

    protected Map<String, Collection<QuerySubscription<?>>> getSubscriptions() {
        return Collections.unmodifiableMap(this.subscriptions);
    }

    @Override // org.axonframework.messaging.MessageHandlerInterceptorSupport
    public Registration registerHandlerInterceptor(@Nonnull MessageHandlerInterceptor<? super QueryMessage<?, ?>> messageHandlerInterceptor) {
        this.handlerInterceptors.add(messageHandlerInterceptor);
        return () -> {
            return this.handlerInterceptors.remove(messageHandlerInterceptor);
        };
    }

    @Override // org.axonframework.messaging.MessageDispatchInterceptorSupport
    @Nonnull
    public Registration registerDispatchInterceptor(@Nonnull MessageDispatchInterceptor<? super QueryMessage<?, ?>> messageDispatchInterceptor) {
        this.dispatchInterceptors.add(messageDispatchInterceptor);
        return () -> {
            return this.dispatchInterceptors.remove(messageDispatchInterceptor);
        };
    }

    private <Q, R> List<MessageHandler<? super QueryMessage<?, ?>>> getHandlersForMessage(QueryMessage<Q, R> queryMessage) {
        ResponseType<R> responseType = queryMessage.getResponseType();
        return (List) ((Map) this.subscriptions.computeIfAbsent(queryMessage.getQueryName(), str -> {
            return new CopyOnWriteArrayList();
        }).stream().collect(Collectors.groupingBy(querySubscription -> {
            return responseType.matchRank(querySubscription.getResponseType());
        }, Collectors.mapping(Function.identity(), Collectors.toList())))).entrySet().stream().filter(entry -> {
            return ((Integer) entry.getKey()).intValue() != 0;
        }).sorted((entry2, entry3) -> {
            return ((Integer) entry3.getKey()).intValue() - ((Integer) entry2.getKey()).intValue();
        }).map((v0) -> {
            return v0.getValue();
        }).flatMap((v0) -> {
            return v0.stream();
        }).map((v0) -> {
            return v0.getQueryHandler();
        }).map(messageHandler -> {
            return messageHandler;
        }).collect(Collectors.toList());
    }

    private <Q, R> Publisher<MessageHandler<? super QueryMessage<?, ?>>> getStreamingHandlersForMessage(StreamingQueryMessage<Q, R> streamingQueryMessage) {
        return Flux.fromIterable(getHandlersForMessage(streamingQueryMessage));
    }
}
