package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.query.QueryComplete;
import io.axoniq.axonserver.grpc.query.QueryProviderInbound;
import io.axoniq.axonserver.grpc.query.QueryProviderOutbound;
import io.axoniq.axonserver.grpc.query.QueryRequest;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.axoniq.axonserver.grpc.query.QueryServiceGrpc;
import io.axoniq.axonserver.grpc.query.QuerySubscription;
import io.axoniq.axonserver.grpc.query.SubscriptionQuery;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.netty.util.internal.OutOfDirectMemoryError;
import java.lang.reflect.Type;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.DispatchInterceptors;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.TargetContextResolver;
import org.axonframework.axonserver.connector.command.AxonServerRegistration;
import org.axonframework.axonserver.connector.query.subscription.AxonServerSubscriptionQueryResult;
import org.axonframework.axonserver.connector.query.subscription.DeserializedResult;
import org.axonframework.axonserver.connector.query.subscription.SubscriptionMessageSerializer;
import org.axonframework.axonserver.connector.query.subscription.SubscriptionQueryRequestTarget;
import org.axonframework.axonserver.connector.util.BufferingSpliterator;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.axonserver.connector.util.ExecutorServiceBuilder;
import org.axonframework.axonserver.connector.util.FlowControllingStreamObserver;
import org.axonframework.axonserver.connector.util.ProcessingInstructionHelper;
import org.axonframework.axonserver.connector.util.ResubscribableStreamObserver;
import org.axonframework.axonserver.connector.util.UpstreamAwareStreamObserver;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/axonserver/connector/query/AxonServerQueryBus.class */
public class AxonServerQueryBus implements QueryBus {
    private static final int DIRECT_QUERY_NUMBER_OF_RESULTS = 1;
    private static final int SCATTER_GATHER_NUMBER_OF_RESULTS = -1;
    private final AxonServerConnectionManager axonServerConnectionManager;
    private final AxonServerConfiguration configuration;
    private final QueryUpdateEmitter updateEmitter;
    private final QueryBus localSegment;
    private final QuerySerializer serializer;
    private final SubscriptionMessageSerializer subscriptionSerializer;
    private final QueryPriorityCalculator priorityCalculator;
    private final QueryProcessor queryProcessor;
    private final DispatchInterceptors<QueryMessage<?, ?>> dispatchInterceptors;
    private final Map<String, Set<String>> subscriptions;
    private final Map<QueryProviderInbound.RequestCase, Collection<Consumer<QueryProviderInbound>>> queryHandlers;
    private final TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver;
    private static final Logger logger = LoggerFactory.getLogger(AxonServerQueryBus.class);
    private static final long DIRECT_QUERY_TIMEOUT_MS = TimeUnit.HOURS.toMillis(1);

    /* loaded from: input_file:org/axonframework/axonserver/connector/query/AxonServerQueryBus$Builder.class */
    public static class Builder {
        private AxonServerConnectionManager axonServerConnectionManager;
        private AxonServerConfiguration configuration;
        private QueryBus localSegment;
        private QueryUpdateEmitter updateEmitter;
        private Serializer messageSerializer;
        private Serializer genericSerializer;
        private QueryPriorityCalculator priorityCalculator = QueryPriorityCalculator.defaultQueryPriorityCalculator();
        private TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver = queryMessage -> {
            return this.configuration.getContext();
        };
        private ExecutorServiceBuilder executorServiceBuilder = ExecutorServiceBuilder.defaultQueryExecutorServiceBuilder();

        public Builder axonServerConnectionManager(AxonServerConnectionManager axonServerConnectionManager) {
            BuilderUtils.assertNonNull(axonServerConnectionManager, "AxonServerConnectionManager may not be null");
            this.axonServerConnectionManager = axonServerConnectionManager;
            return this;
        }

        public Builder configuration(AxonServerConfiguration axonServerConfiguration) {
            BuilderUtils.assertNonNull(axonServerConfiguration, "AxonServerConfiguration may not be null");
            this.configuration = axonServerConfiguration;
            return this;
        }

        public Builder localSegment(QueryBus queryBus) {
            BuilderUtils.assertNonNull(queryBus, "Local QueryBus may not be null");
            this.localSegment = queryBus;
            return this;
        }

        public Builder updateEmitter(QueryUpdateEmitter queryUpdateEmitter) {
            BuilderUtils.assertNonNull(queryUpdateEmitter, "QueryUpdateEmitter may not be null");
            this.updateEmitter = queryUpdateEmitter;
            return this;
        }

        public Builder messageSerializer(Serializer serializer) {
            BuilderUtils.assertNonNull(serializer, "Message Serializer may not be null");
            this.messageSerializer = serializer;
            return this;
        }

        public Builder genericSerializer(Serializer serializer) {
            BuilderUtils.assertNonNull(serializer, "Generic Serializer may not be null");
            this.genericSerializer = serializer;
            return this;
        }

        public Builder priorityCalculator(QueryPriorityCalculator queryPriorityCalculator) {
            BuilderUtils.assertNonNull(this.targetContextResolver, "QueryPriorityCalculator may not be null");
            this.priorityCalculator = queryPriorityCalculator;
            return this;
        }

        public Builder targetContextResolver(TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver) {
            BuilderUtils.assertNonNull(targetContextResolver, "TargetContextResolver may not be null");
            this.targetContextResolver = targetContextResolver;
            return this;
        }

        public Builder executorServiceBuilder(ExecutorServiceBuilder executorServiceBuilder) {
            BuilderUtils.assertNonNull(executorServiceBuilder, "ExecutorServiceBuilder may not be null");
            this.executorServiceBuilder = executorServiceBuilder;
            return this;
        }

        public AxonServerQueryBus build() {
            return new AxonServerQueryBus(this);
        }

        protected QuerySerializer buildQuerySerializer() {
            return new QuerySerializer(this.messageSerializer, this.genericSerializer, this.configuration);
        }

        protected SubscriptionMessageSerializer buildSubscriptionMessageSerializer() {
            return new SubscriptionMessageSerializer(this.messageSerializer, this.genericSerializer, this.configuration);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.axonServerConnectionManager, "The AxonServerConnectionManager is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.configuration, "The AxonServerConfiguration is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.localSegment, "The Local QueryBus is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.updateEmitter, "The QueryUpdateEmitter is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.messageSerializer, "The Message Serializer is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.genericSerializer, "The Generic Serializer is a hard requirement and should be provided");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/axonserver/connector/query/AxonServerQueryBus$QueryProcessor.class */
    public class QueryProcessor {
        private static final int QUERY_QUEUE_CAPACITY = 1000;
        private static final int DEFAULT_PRIORITY = 0;
        private final String context;
        private final ExecutorService queryExecutor;
        private volatile boolean subscribing;
        private volatile StreamObserver<QueryProviderOutbound> outboundStreamObserver;
        private volatile boolean running = true;
        private final ConcurrentMap<QueryDefinition, Set<MessageHandler<? super QueryMessage<?, ?>>>> subscribedQueries = new ConcurrentHashMap();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/axonframework/axonserver/connector/query/AxonServerQueryBus$QueryProcessor$QueryDefinition.class */
        public class QueryDefinition {
            private final String queryName;
            private final String responseName;
            private final String componentName;

            QueryDefinition(String str, String str2, String str3) {
                this.queryName = str;
                this.responseName = str2;
                this.componentName = str3;
            }

            public boolean equals(Object obj) {
                if (this == obj) {
                    return true;
                }
                if (obj == null || getClass() != obj.getClass()) {
                    return false;
                }
                QueryDefinition queryDefinition = (QueryDefinition) obj;
                return Objects.equals(this.queryName, queryDefinition.queryName) && Objects.equals(this.responseName, queryDefinition.responseName) && Objects.equals(this.componentName, queryDefinition.componentName);
            }

            public int hashCode() {
                return Objects.hash(this.queryName, this.responseName, this.componentName);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/axonframework/axonserver/connector/query/AxonServerQueryBus$QueryProcessor$QueryProcessingTask.class */
        public class QueryProcessingTask implements Runnable {
            private final long priority;
            private final QueryRequest queryRequest;

            private QueryProcessingTask(QueryRequest queryRequest) {
                this.priority = -ProcessingInstructionHelper.priority(queryRequest.getProcessingInstructionsList());
                this.queryRequest = queryRequest;
            }

            public long getPriority() {
                return this.priority;
            }

            @Override // java.lang.Runnable
            public void run() {
                if (!QueryProcessor.this.running) {
                    AxonServerQueryBus.logger.debug("Query Processor has stopped running, hence query [{}] will no longer be processed", this.queryRequest.getQuery());
                    return;
                }
                try {
                    AxonServerQueryBus.logger.debug("Will process query [{}]", this.queryRequest.getQuery());
                    QueryProcessor.this.processQuery(this.queryRequest);
                } catch (RuntimeException | OutOfDirectMemoryError e) {
                    AxonServerQueryBus.logger.warn("Query Processor had an exception when processing query [{}]", this.queryRequest.getQuery(), e);
                }
            }
        }

        QueryProcessor(String str, AxonServerConfiguration axonServerConfiguration, ExecutorServiceBuilder executorServiceBuilder) {
            this.context = str;
            this.queryExecutor = executorServiceBuilder.apply(axonServerConfiguration, new PriorityBlockingQueue(QUERY_QUEUE_CAPACITY, Comparator.comparingLong(runnable -> {
                if (runnable instanceof QueryProcessingTask) {
                    return ((QueryProcessingTask) runnable).getPriority();
                }
                return 0L;
            })));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void resubscribe() {
            if (this.subscribedQueries.isEmpty() || this.subscribing) {
                return;
            }
            try {
                StreamObserver<QueryProviderOutbound> subscriberObserver = getSubscriberObserver(this.context);
                this.subscribedQueries.forEach((queryDefinition, set) -> {
                    subscriberObserver.onNext(QueryProviderOutbound.newBuilder().setSubscribe(buildQuerySubscription(queryDefinition, set.size())).m2007build());
                });
            } catch (Exception e) {
                AxonServerQueryBus.logger.warn("Error while resubscribing query handlers", e);
            }
        }

        public <R> Registration subscribe(String str, Type type, String str2, MessageHandler<? super QueryMessage<?, R>> messageHandler) {
            this.subscribing = true;
            Set<MessageHandler<? super QueryMessage<?, ?>>> computeIfAbsent = this.subscribedQueries.computeIfAbsent(new QueryDefinition(str, type.getTypeName(), str2), queryDefinition -> {
                return new CopyOnWriteArraySet();
            });
            computeIfAbsent.add(messageHandler);
            try {
                try {
                    getSubscriberObserver(this.context).onNext(QueryProviderOutbound.newBuilder().setSubscribe(QuerySubscription.newBuilder().setMessageId(UUID.randomUUID().toString()).setClientId(AxonServerQueryBus.this.configuration.getClientId()).setComponentName(str2).setQuery(str).setResultName(type.getTypeName()).setNrOfHandlers(computeIfAbsent.size()).m2154build()).m2007build());
                    this.subscribing = false;
                } catch (Exception e) {
                    AxonServerQueryBus.logger.warn("Error subscribing query handler", e);
                    this.subscribing = false;
                }
                return AxonServerQueryBus.this.localSegment.subscribe(str, type, messageHandler);
            } catch (Throwable th) {
                this.subscribing = false;
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void processQuery(QueryRequest queryRequest) {
            String messageIdentifier = queryRequest.getMessageIdentifier();
            QueryMessage deserializeRequest = AxonServerQueryBus.this.serializer.deserializeRequest(queryRequest);
            try {
                if (ProcessingInstructionHelper.numberOfResults(queryRequest.getProcessingInstructionsList()) == 1) {
                    this.outboundStreamObserver.onNext(QueryProviderOutbound.newBuilder().setQueryResponse(AxonServerQueryBus.this.serializer.serializeResponse((QueryResponseMessage) AxonServerQueryBus.this.localSegment.query(deserializeRequest).get(), messageIdentifier)).m2007build());
                } else {
                    AxonServerQueryBus.this.localSegment.scatterGather(deserializeRequest, 0L, TimeUnit.SECONDS).forEach(queryResponseMessage -> {
                        this.outboundStreamObserver.onNext(QueryProviderOutbound.newBuilder().setQueryResponse(AxonServerQueryBus.this.serializer.serializeResponse(queryResponseMessage, messageIdentifier)).m2007build());
                    });
                }
                this.outboundStreamObserver.onNext(QueryProviderOutbound.newBuilder().setQueryComplete(QueryComplete.newBuilder().setMessageId(UUID.randomUUID().toString()).setRequestId(messageIdentifier)).m2007build());
            } catch (Exception e) {
                AxonServerQueryBus.logger.warn("Failed to dispatch query [{}] locally", deserializeRequest.getQueryName(), e);
                if (this.outboundStreamObserver == null) {
                    return;
                }
                this.outboundStreamObserver.onNext(QueryProviderOutbound.newBuilder().setQueryResponse(QueryResponse.newBuilder().setMessageIdentifier(UUID.randomUUID().toString()).setRequestIdentifier(messageIdentifier).setErrorMessage(ExceptionSerializer.serialize(AxonServerQueryBus.this.configuration.getClientId(), e)).setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode()).m2103build()).m2007build());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized StreamObserver<QueryProviderOutbound> getSubscriberObserver(String str) {
            if (this.outboundStreamObserver != null) {
                return this.outboundStreamObserver;
            }
            StreamObserver<QueryProviderOutbound> queryStream = AxonServerQueryBus.this.axonServerConnectionManager.getQueryStream(str, new ResubscribableStreamObserver(new StreamObserver<QueryProviderInbound>() { // from class: org.axonframework.axonserver.connector.query.AxonServerQueryBus.QueryProcessor.1
                public void onNext(QueryProviderInbound queryProviderInbound) {
                    QueryProviderInbound.RequestCase requestCase = queryProviderInbound.getRequestCase();
                    ((Collection) AxonServerQueryBus.this.queryHandlers.getOrDefault(requestCase, Collections.emptySet())).forEach(consumer -> {
                        consumer.accept(queryProviderInbound);
                    });
                    switch (requestCase) {
                        case CONFIRMATION:
                        default:
                            return;
                        case QUERY:
                            QueryProcessor.this.queryExecutor.execute(new QueryProcessingTask(queryProviderInbound.getQuery()));
                            return;
                    }
                }

                public void onError(Throwable th) {
                    AxonServerQueryBus.logger.warn("Query Inbound Stream closed with error", th);
                    QueryProcessor.this.outboundStreamObserver = null;
                }

                public void onCompleted() {
                    AxonServerQueryBus.logger.info("Received completed from server.");
                    QueryProcessor.this.outboundStreamObserver = null;
                }
            }, th -> {
                resubscribe();
            }));
            AxonServerQueryBus.logger.info("Creating new query stream subscriber");
            this.outboundStreamObserver = new FlowControllingStreamObserver(queryStream, AxonServerQueryBus.this.configuration, flowControl -> {
                return QueryProviderOutbound.newBuilder().setFlowControl(flowControl).m2007build();
            }, queryProviderOutbound -> {
                return queryProviderOutbound.getRequestCase().equals(QueryProviderOutbound.RequestCase.QUERY_RESPONSE);
            }).sendInitialPermits();
            return this.outboundStreamObserver;
        }

        public void unsubscribe(String str, Type type, String str2) {
            QueryDefinition queryDefinition = new QueryDefinition(str, type.getTypeName(), str2);
            this.subscribedQueries.remove(queryDefinition);
            try {
                getSubscriberObserver(this.context).onNext(QueryProviderOutbound.newBuilder().setUnsubscribe(buildQuerySubscription(queryDefinition, 1)).m2007build());
            } catch (Exception e) {
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void unsubscribeAll() {
            this.subscribedQueries.forEach((queryDefinition, set) -> {
                try {
                    getSubscriberObserver(this.context).onNext(QueryProviderOutbound.newBuilder().setUnsubscribe(buildQuerySubscription(queryDefinition, 1)).m2007build());
                } catch (Exception e) {
                }
            });
            this.outboundStreamObserver = null;
        }

        void disconnect() {
            if (this.outboundStreamObserver != null) {
                this.outboundStreamObserver.onCompleted();
            }
            this.running = false;
            this.queryExecutor.shutdown();
        }

        private QuerySubscription buildQuerySubscription(QueryDefinition queryDefinition, int i) {
            return QuerySubscription.newBuilder().setClientId(AxonServerQueryBus.this.configuration.getClientId()).setMessageId(UUID.randomUUID().toString()).setComponentName(queryDefinition.componentName).setQuery(queryDefinition.queryName).setNrOfHandlers(i).setResultName(queryDefinition.responseName).m2154build();
        }
    }

    @Deprecated
    public AxonServerQueryBus(AxonServerConnectionManager axonServerConnectionManager, AxonServerConfiguration axonServerConfiguration, QueryUpdateEmitter queryUpdateEmitter, QueryBus queryBus, Serializer serializer, Serializer serializer2, QueryPriorityCalculator queryPriorityCalculator) {
        this(axonServerConnectionManager, axonServerConfiguration, queryUpdateEmitter, queryBus, serializer, serializer2, queryPriorityCalculator, queryMessage -> {
            return axonServerConfiguration.getContext();
        });
    }

    @Deprecated
    public AxonServerQueryBus(AxonServerConnectionManager axonServerConnectionManager, AxonServerConfiguration axonServerConfiguration, QueryUpdateEmitter queryUpdateEmitter, QueryBus queryBus, Serializer serializer, Serializer serializer2, QueryPriorityCalculator queryPriorityCalculator, TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver) {
        this.subscriptions = new ConcurrentHashMap();
        this.axonServerConnectionManager = axonServerConnectionManager;
        this.configuration = axonServerConfiguration;
        this.updateEmitter = queryUpdateEmitter;
        this.localSegment = queryBus;
        this.serializer = new QuerySerializer(serializer, serializer2, axonServerConfiguration);
        this.subscriptionSerializer = new SubscriptionMessageSerializer(serializer, serializer2, axonServerConfiguration);
        this.priorityCalculator = queryPriorityCalculator;
        String context = axonServerConfiguration.getContext();
        this.targetContextResolver = targetContextResolver.orElse(message -> {
            return context;
        });
        this.queryProcessor = new QueryProcessor(context, axonServerConfiguration, ExecutorServiceBuilder.defaultQueryExecutorServiceBuilder());
        this.dispatchInterceptors = new DispatchInterceptors<>();
        this.queryHandlers = new EnumMap(QueryProviderInbound.RequestCase.class);
        AxonServerConnectionManager axonServerConnectionManager2 = this.axonServerConnectionManager;
        QueryProcessor queryProcessor = this.queryProcessor;
        queryProcessor.getClass();
        axonServerConnectionManager2.addReconnectListener(context, () -> {
            queryProcessor.resubscribe();
        });
        this.axonServerConnectionManager.addReconnectInterceptor(this::interceptReconnectRequest);
        AxonServerConnectionManager axonServerConnectionManager3 = this.axonServerConnectionManager;
        QueryProcessor queryProcessor2 = this.queryProcessor;
        queryProcessor2.getClass();
        axonServerConnectionManager3.addDisconnectListener(context, () -> {
            queryProcessor2.unsubscribeAll();
        });
        this.axonServerConnectionManager.addDisconnectListener(this::onApplicationDisconnected);
        SubscriptionQueryRequestTarget subscriptionQueryRequestTarget = new SubscriptionQueryRequestTarget(queryBus, queryProviderOutbound -> {
            publish(context, queryProviderOutbound);
        }, this.subscriptionSerializer);
        QueryProviderInbound.RequestCase requestCase = QueryProviderInbound.RequestCase.SUBSCRIPTION_QUERY_REQUEST;
        subscriptionQueryRequestTarget.getClass();
        on(requestCase, subscriptionQueryRequestTarget::onSubscriptionQueryRequest);
        AxonServerConnectionManager axonServerConnectionManager4 = this.axonServerConnectionManager;
        subscriptionQueryRequestTarget.getClass();
        axonServerConnectionManager4.addDisconnectListener(subscriptionQueryRequestTarget::onApplicationDisconnected);
    }

    public AxonServerQueryBus(Builder builder) {
        this.subscriptions = new ConcurrentHashMap();
        builder.validate();
        this.axonServerConnectionManager = builder.axonServerConnectionManager;
        this.configuration = builder.configuration;
        this.updateEmitter = builder.updateEmitter;
        this.localSegment = builder.localSegment;
        this.serializer = builder.buildQuerySerializer();
        this.subscriptionSerializer = builder.buildSubscriptionMessageSerializer();
        this.priorityCalculator = builder.priorityCalculator;
        String context = this.configuration.getContext();
        this.targetContextResolver = builder.targetContextResolver.orElse(message -> {
            return context;
        });
        this.queryProcessor = new QueryProcessor(context, this.configuration, builder.executorServiceBuilder);
        this.dispatchInterceptors = new DispatchInterceptors<>();
        this.queryHandlers = new EnumMap(QueryProviderInbound.RequestCase.class);
        AxonServerConnectionManager axonServerConnectionManager = this.axonServerConnectionManager;
        QueryProcessor queryProcessor = this.queryProcessor;
        queryProcessor.getClass();
        axonServerConnectionManager.addReconnectListener(context, () -> {
            queryProcessor.resubscribe();
        });
        this.axonServerConnectionManager.addReconnectInterceptor(this::interceptReconnectRequest);
        AxonServerConnectionManager axonServerConnectionManager2 = this.axonServerConnectionManager;
        QueryProcessor queryProcessor2 = this.queryProcessor;
        queryProcessor2.getClass();
        axonServerConnectionManager2.addDisconnectListener(context, () -> {
            queryProcessor2.unsubscribeAll();
        });
        this.axonServerConnectionManager.addDisconnectListener(this::onApplicationDisconnected);
        SubscriptionQueryRequestTarget subscriptionQueryRequestTarget = new SubscriptionQueryRequestTarget(this.localSegment, queryProviderOutbound -> {
            publish(context, queryProviderOutbound);
        }, this.subscriptionSerializer);
        QueryProviderInbound.RequestCase requestCase = QueryProviderInbound.RequestCase.SUBSCRIPTION_QUERY_REQUEST;
        subscriptionQueryRequestTarget.getClass();
        on(requestCase, subscriptionQueryRequestTarget::onSubscriptionQueryRequest);
        AxonServerConnectionManager axonServerConnectionManager3 = this.axonServerConnectionManager;
        subscriptionQueryRequestTarget.getClass();
        axonServerConnectionManager3.addDisconnectListener(subscriptionQueryRequestTarget::onApplicationDisconnected);
    }

    public static Builder builder() {
        return new Builder();
    }

    private Consumer<String> interceptReconnectRequest(Consumer<String> consumer) {
        return this.subscriptions.isEmpty() ? consumer : str -> {
            logger.info("Reconnect for context [{}] refused because there are active subscription queries.", str);
        };
    }

    private void onApplicationDisconnected(String str) {
        this.subscriptions.remove(str);
    }

    public <R> Registration subscribe(String str, Type type, MessageHandler<? super QueryMessage<?, R>> messageHandler) {
        return new AxonServerRegistration(this.queryProcessor.subscribe(str, type, this.configuration.getComponentName(), messageHandler), () -> {
            this.queryProcessor.unsubscribe(str, type, this.configuration.getComponentName());
        });
    }

    public <Q, R> CompletableFuture<QueryResponseMessage<R>> query(QueryMessage<Q, R> queryMessage) {
        QueryMessage<Q, R> intercept = this.dispatchInterceptors.intercept(queryMessage);
        final CompletableFuture<QueryResponseMessage<R>> completableFuture = new CompletableFuture<>();
        try {
            String resolveContext = this.targetContextResolver.resolveContext(intercept);
            queryService(resolveContext).query(this.serializer.serializeRequest(intercept, 1, DIRECT_QUERY_TIMEOUT_MS, this.priorityCalculator.determinePriority(intercept)), new StreamObserver<QueryResponse>() { // from class: org.axonframework.axonserver.connector.query.AxonServerQueryBus.1
                public void onNext(QueryResponse queryResponse) {
                    AxonServerQueryBus.logger.debug("Received query response [{}]", queryResponse);
                    completableFuture.complete(AxonServerQueryBus.this.serializer.deserializeResponse(queryResponse));
                }

                public void onError(Throwable th) {
                    if (AxonServerQueryBus.logger.isDebugEnabled()) {
                        AxonServerQueryBus.logger.debug("Received error while waiting for first response", th);
                    }
                    completableFuture.completeExceptionally(ErrorCode.QUERY_DISPATCH_ERROR.convert(AxonServerQueryBus.this.configuration.getClientId(), th));
                }

                public void onCompleted() {
                    if (completableFuture.isDone()) {
                        return;
                    }
                    completableFuture.completeExceptionally(ErrorCode.QUERY_DISPATCH_ERROR.convert(ErrorMessage.newBuilder().setMessage("No result from query executor").m89build()));
                }
            });
        } catch (Exception e) {
            logger.debug("There was a problem issuing a query {}.", intercept, e);
            completableFuture.completeExceptionally(ErrorCode.QUERY_DISPATCH_ERROR.convert(this.configuration.getClientId(), e));
        }
        return completableFuture;
    }

    public <Q, R> Stream<QueryResponseMessage<R>> scatterGather(QueryMessage<Q, R> queryMessage, long j, TimeUnit timeUnit) {
        QueryMessage<Q, R> intercept = this.dispatchInterceptors.intercept(queryMessage);
        String resolveContext = this.targetContextResolver.resolveContext(intercept);
        QueryRequest serializeRequest = this.serializer.serializeRequest(intercept, SCATTER_GATHER_NUMBER_OF_RESULTS, timeUnit.toMillis(j), this.priorityCalculator.determinePriority(intercept));
        final BufferingSpliterator bufferingSpliterator = new BufferingSpliterator(Instant.now().plusMillis(timeUnit.toMillis(j)));
        ((QueryServiceGrpc.QueryServiceStub) queryService(resolveContext).withDeadlineAfter(j, timeUnit)).query(serializeRequest, new UpstreamAwareStreamObserver<QueryResponse>() { // from class: org.axonframework.axonserver.connector.query.AxonServerQueryBus.2
            public void onNext(QueryResponse queryResponse) {
                AxonServerQueryBus.logger.debug("Received query response [{}]", queryResponse);
                if (queryResponse.hasErrorMessage()) {
                    AxonServerQueryBus.logger.debug("The received query response has error message [{}]", queryResponse.getErrorMessage());
                } else {
                    if (bufferingSpliterator.put(AxonServerQueryBus.this.serializer.deserializeResponse(queryResponse))) {
                        return;
                    }
                    getRequestStream().cancel("Cancellation requested by client", (Throwable) null);
                }
            }

            public void onError(Throwable th) {
                if (!AxonServerQueryBus.this.isDeadlineExceeded(th)) {
                    AxonServerQueryBus.logger.info("Received error while waiting for responses", th);
                }
                bufferingSpliterator.cancel(th);
            }

            public void onCompleted() {
                bufferingSpliterator.cancel(null);
            }
        });
        return (Stream) StreamSupport.stream(bufferingSpliterator, false).onClose(() -> {
            bufferingSpliterator.cancel(null);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isDeadlineExceeded(Throwable th) {
        return (th instanceof StatusRuntimeException) && ((StatusRuntimeException) th).getStatus().getCode().equals(Status.Code.DEADLINE_EXCEEDED);
    }

    public void disconnect() {
        this.queryProcessor.disconnect();
    }

    public QueryBus localSegment() {
        return this.localSegment;
    }

    private void publish(String str, QueryProviderOutbound queryProviderOutbound) {
        this.queryProcessor.getSubscriberObserver(str).onNext(queryProviderOutbound);
    }

    private void on(QueryProviderInbound.RequestCase requestCase, BiConsumer<String, QueryProviderInbound> biConsumer) {
        this.queryHandlers.computeIfAbsent(requestCase, requestCase2 -> {
            return new CopyOnWriteArraySet();
        }).add(queryProviderInbound -> {
            biConsumer.accept(this.configuration.getContext(), queryProviderInbound);
        });
    }

    public <Q, I, U> SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> subscriptionQuery(SubscriptionQueryMessage<Q, I, U> subscriptionQueryMessage, SubscriptionQueryBackpressure subscriptionQueryBackpressure, int i) {
        SubscriptionQueryMessage intercept = this.dispatchInterceptors.intercept(subscriptionQueryMessage);
        String identifier = intercept.getIdentifier();
        String resolveContext = this.targetContextResolver.resolveContext(intercept);
        Set<String> computeIfAbsent = this.subscriptions.computeIfAbsent(resolveContext, str -> {
            return new ConcurrentSkipListSet();
        });
        if (!computeIfAbsent.add(identifier)) {
            String str2 = "There already is a subscription query with subscription Id [" + identifier + "] for context [" + resolveContext + "]";
            logger.warn(str2);
            throw new IllegalArgumentException(str2);
        }
        logger.debug("Subscription Query requested with subscription Id [{}]", identifier);
        SubscriptionQuery serialize = this.subscriptionSerializer.serialize(intercept);
        QueryServiceGrpc.QueryServiceStub queryService = queryService(resolveContext);
        queryService.getClass();
        return new DeserializedResult(new AxonServerSubscriptionQueryResult(serialize, queryService::subscription, this.configuration, subscriptionQueryBackpressure, i, () -> {
            computeIfAbsent.remove(identifier);
        }).get(), this.subscriptionSerializer);
    }

    QueryServiceGrpc.QueryServiceStub queryService(String str) {
        return QueryServiceGrpc.newStub(this.axonServerConnectionManager.getChannel(str));
    }

    public QueryUpdateEmitter queryUpdateEmitter() {
        return this.updateEmitter;
    }

    public Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super QueryMessage<?, ?>> messageHandlerInterceptor) {
        return this.localSegment.registerHandlerInterceptor(messageHandlerInterceptor);
    }

    public Registration registerDispatchInterceptor(MessageDispatchInterceptor<? super QueryMessage<?, ?>> messageDispatchInterceptor) {
        return this.dispatchInterceptors.registerDispatchInterceptor(messageDispatchInterceptor);
    }
}
