package io.smallrye.reactive.messaging.rabbitmq.internals;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiOnSubscribe;
import io.smallrye.mutiny.groups.UniOnFailure;
import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.smallrye.reactive.messaging.rabbitmq.ClientHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAck;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAckHandler;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAutoAck;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQTrace;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
import io.vertx.mutiny.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.QueueOptions;
import jakarta.enterprise.inject.Instance;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.eclipse.microprofile.reactive.messaging.Message;

/* loaded from: input_file:io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.class */
public class IncomingRabbitMQChannel {
    private final RabbitMQOpenTelemetryInstrumenter instrumenter;
    private final AtomicReference<Flow.Subscription> subscription = new AtomicReference<>();
    private volatile RabbitMQClient client;
    private final RabbitMQConnectorIncomingConfiguration config;
    private final Multi<? extends Message<?>> stream;
    private final RabbitMQConnector connector;

    public IncomingRabbitMQChannel(RabbitMQConnector rabbitMQConnector, RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration, Instance<OpenTelemetry> instance) {
        if (rabbitMQConnectorIncomingConfiguration.getTracingEnabled().booleanValue()) {
            this.instrumenter = RabbitMQOpenTelemetryInstrumenter.createForConnector(instance);
        } else {
            this.instrumenter = null;
        }
        this.config = rabbitMQConnectorIncomingConfiguration;
        this.connector = rabbitMQConnector;
        RabbitMQFailureHandler createFailureHandler = createFailureHandler(rabbitMQConnector.failureHandlerFactories(), rabbitMQConnectorIncomingConfiguration);
        RabbitMQAckHandler createAckHandler = createAckHandler(rabbitMQConnectorIncomingConfiguration);
        Multi transformToMulti = createConsumer(rabbitMQConnector, rabbitMQConnectorIncomingConfiguration).invoke(tuple2 -> {
            this.client = ((ClientHolder) tuple2.getItem1()).client();
        }).onItem().transformToMulti(tuple22 -> {
            return getStreamOfMessages((RabbitMQConsumer) tuple22.getItem2(), (ClientHolder) tuple22.getItem1(), rabbitMQConnectorIncomingConfiguration, createFailureHandler, createAckHandler);
        });
        MultiOnSubscribe onSubscription = (rabbitMQConnectorIncomingConfiguration.getBroadcast().booleanValue() ? transformToMulti.broadcast().toAllSubscribers() : transformToMulti).onSubscription();
        AtomicReference<Flow.Subscription> atomicReference = this.subscription;
        Objects.requireNonNull(atomicReference);
        this.stream = onSubscription.invoke((v1) -> {
            r2.set(v1);
        });
    }

    public Multi<? extends Message<?>> getStream() {
        return this.stream;
    }

    public HealthReport.HealthReportBuilder isAlive(HealthReport.HealthReportBuilder healthReportBuilder) {
        return !this.config.getHealthEnabled().booleanValue() ? healthReportBuilder : computeHealthReport(healthReportBuilder);
    }

    private HealthReport.HealthReportBuilder computeHealthReport(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.config.getHealthLazySubscription().booleanValue() && this.subscription.get() == null) {
            return healthReportBuilder.add(new HealthReport.ChannelInfo(this.config.getChannel(), true));
        }
        if (this.client == null) {
            return healthReportBuilder.add(new HealthReport.ChannelInfo(this.config.getChannel(), false));
        }
        return healthReportBuilder.add(new HealthReport.ChannelInfo(this.config.getChannel(), this.client.isConnected() && this.client.isOpenChannel()));
    }

    public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder healthReportBuilder) {
        return (this.config.getHealthEnabled().booleanValue() && this.config.getHealthReadinessEnabled().booleanValue()) ? computeHealthReport(healthReportBuilder) : healthReportBuilder;
    }

    private Uni<Tuple2<ClientHolder, RabbitMQConsumer>> createConsumer(RabbitMQConnector rabbitMQConnector, RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration) {
        RabbitMQClient createClient = RabbitMQClientHelper.createClient(rabbitMQConnector, rabbitMQConnectorIncomingConfiguration);
        createClient.getDelegate().addConnectionEstablishedCallback(promise -> {
            Uni basicQos = rabbitMQConnectorIncomingConfiguration.getMaxOutstandingMessages().isPresent() ? createClient.basicQos(rabbitMQConnectorIncomingConfiguration.getMaxOutstandingMessages().get().intValue(), false) : Uni.createFrom().nullItem();
            Instance<Map<String, ?>> configMaps = rabbitMQConnector.configMaps();
            UniSubscribe subscribe = basicQos.call(() -> {
                return declareQueue(createClient, rabbitMQConnectorIncomingConfiguration, configMaps);
            }).call(() -> {
                return RabbitMQClientHelper.configureDLQorDLX(createClient, rabbitMQConnectorIncomingConfiguration, configMaps);
            }).subscribe();
            Consumer consumer = r3 -> {
                promise.complete();
            };
            Objects.requireNonNull(promise);
            subscribe.with(consumer, promise::fail);
        });
        ClientHolder clientHolder = new ClientHolder(createClient, rabbitMQConnectorIncomingConfiguration, rabbitMQConnector.vertx(), Context.newInstance(rabbitMQConnector.vertx().getDelegate().createEventLoopContext()));
        return clientHolder.getOrEstablishConnection().invoke(() -> {
            RabbitMQLogging.log.connectionEstablished(rabbitMQConnectorIncomingConfiguration.getChannel());
        }).flatMap(rabbitMQClient -> {
            return createConsumer(rabbitMQConnectorIncomingConfiguration, rabbitMQClient).map(rabbitMQConsumer -> {
                return Tuple2.of(clientHolder, rabbitMQConsumer);
            });
        });
    }

    private RabbitMQFailureHandler createFailureHandler(Instance<RabbitMQFailureHandler.Factory> instance, RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration) {
        String failureStrategy = rabbitMQConnectorIncomingConfiguration.getFailureStrategy();
        Instance instanceById = CDIUtils.getInstanceById(instance, failureStrategy);
        if (instanceById.isResolvable()) {
            return ((RabbitMQFailureHandler.Factory) instanceById.get()).create(rabbitMQConnectorIncomingConfiguration, this.connector);
        }
        throw RabbitMQExceptions.ex.illegalArgumentInvalidFailureStrategy(failureStrategy);
    }

    public RabbitMQAckHandler createAckHandler(RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration) {
        return Boolean.TRUE.equals(rabbitMQConnectorIncomingConfiguration.getAutoAcknowledgement()) ? new RabbitMQAutoAck(rabbitMQConnectorIncomingConfiguration.getChannel()) : new RabbitMQAck(rabbitMQConnectorIncomingConfiguration.getChannel());
    }

    private Uni<String> declareQueue(RabbitMQClient rabbitMQClient, RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration, Instance<Map<String, ?>> instance) {
        String queueName = RabbitMQClientHelper.getQueueName(rabbitMQConnectorIncomingConfiguration);
        JsonObject jsonObject = new JsonObject();
        Instance instanceById = CDIUtils.getInstanceById(instance, rabbitMQConnectorIncomingConfiguration.getQueueArguments());
        if (instanceById.isResolvable()) {
            Map map = (Map) instanceById.get();
            Objects.requireNonNull(jsonObject);
            map.forEach(jsonObject::put);
        }
        if (rabbitMQConnectorIncomingConfiguration.getAutoBindDlq().booleanValue()) {
            jsonObject.put("x-dead-letter-exchange", rabbitMQConnectorIncomingConfiguration.getDeadLetterExchange());
            jsonObject.put("x-dead-letter-routing-key", rabbitMQConnectorIncomingConfiguration.getDeadLetterRoutingKey().orElse(queueName));
        }
        rabbitMQConnectorIncomingConfiguration.getQueueSingleActiveConsumer().ifPresent(bool -> {
            jsonObject.put("x-single-active-consumer", bool);
        });
        rabbitMQConnectorIncomingConfiguration.getQueueXQueueType().ifPresent(str -> {
            jsonObject.put("x-queue-type", str);
        });
        rabbitMQConnectorIncomingConfiguration.getQueueXQueueMode().ifPresent(str2 -> {
            jsonObject.put("x-queue-mode", str2);
        });
        rabbitMQConnectorIncomingConfiguration.getQueueTtl().ifPresent(l -> {
            if (l.longValue() < 0) {
                throw RabbitMQExceptions.ex.illegalArgumentInvalidQueueTtl();
            }
            jsonObject.put("x-message-ttl", l);
        });
        rabbitMQConnectorIncomingConfiguration.getQueueXMaxPriority().ifPresent(num -> {
            jsonObject.put("x-max-priority", num);
        });
        rabbitMQConnectorIncomingConfiguration.getQueueXDeliveryLimit().ifPresent(l2 -> {
            jsonObject.put("x-delivery-limit", l2);
        });
        return RabbitMQClientHelper.declareExchangeIfNeeded(rabbitMQClient, rabbitMQConnectorIncomingConfiguration, instance).flatMap(str3 -> {
            if (rabbitMQConnectorIncomingConfiguration.getQueueDeclare().booleanValue()) {
                String serverQueueName = RabbitMQClientHelper.serverQueueName(queueName);
                return (serverQueueName.isEmpty() ? rabbitMQClient.queueDeclare(serverQueueName, false, true, true) : rabbitMQClient.queueDeclare(serverQueueName, rabbitMQConnectorIncomingConfiguration.getQueueDurable().booleanValue(), rabbitMQConnectorIncomingConfiguration.getQueueExclusive().booleanValue(), rabbitMQConnectorIncomingConfiguration.getQueueAutoDelete().booleanValue(), jsonObject)).invoke(() -> {
                    RabbitMQLogging.log.queueEstablished(queueName);
                }).onFailure().invoke(th -> {
                    RabbitMQLogging.log.unableToEstablishQueue(queueName, th);
                }).flatMap(declareOk -> {
                    return RabbitMQClientHelper.establishBindings(rabbitMQClient, rabbitMQConnectorIncomingConfiguration);
                }).replaceWith(queueName);
            }
            UniOnFailure onFailure = rabbitMQClient.messageCount(queueName).onFailure();
            RabbitMQLogging rabbitMQLogging = RabbitMQLogging.log;
            Objects.requireNonNull(rabbitMQLogging);
            return onFailure.invoke(rabbitMQLogging::unableToConnectToBroker).replaceWith(queueName);
        });
    }

    private Uni<RabbitMQConsumer> createConsumer(RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration, RabbitMQClient rabbitMQClient) {
        QueueOptions queueOptions = new QueueOptions();
        queueOptions.setConsumerArguments(RabbitMQClientHelper.parseArguments(rabbitMQConnectorIncomingConfiguration.getConsumerArguments()));
        return rabbitMQClient.basicConsumer(RabbitMQClientHelper.serverQueueName(RabbitMQClientHelper.getQueueName(rabbitMQConnectorIncomingConfiguration)), queueOptions.setAutoAck(rabbitMQConnectorIncomingConfiguration.getAutoAcknowledgement().booleanValue()).setMaxInternalQueueSize(rabbitMQConnectorIncomingConfiguration.getMaxIncomingInternalQueueSize().intValue()).setKeepMostRecent(rabbitMQConnectorIncomingConfiguration.getKeepMostRecent().booleanValue()));
    }

    private Multi<? extends Message<?>> getStreamOfMessages(RabbitMQConsumer rabbitMQConsumer, ClientHolder clientHolder, RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration, RabbitMQFailureHandler rabbitMQFailureHandler, RabbitMQAckHandler rabbitMQAckHandler) {
        String queueName = RabbitMQClientHelper.getQueueName(rabbitMQConnectorIncomingConfiguration);
        boolean booleanValue = rabbitMQConnectorIncomingConfiguration.getTracingEnabled().booleanValue();
        String orElse = rabbitMQConnectorIncomingConfiguration.getContentTypeOverride().orElse(null);
        RabbitMQLogging.log.receiverListeningAddress(queueName);
        return booleanValue ? rabbitMQConsumer.toMulti().emitOn(runnable -> {
            VertxContext.runOnContext(clientHolder.getContext().getDelegate(), runnable);
        }).map(rabbitMQMessage -> {
            return new IncomingRabbitMQMessage(rabbitMQMessage, clientHolder, rabbitMQFailureHandler, rabbitMQAckHandler, orElse);
        }).map(incomingRabbitMQMessage -> {
            return this.instrumenter.traceIncoming(incomingRabbitMQMessage, RabbitMQTrace.traceQueue(queueName, incomingRabbitMQMessage.message.envelope().getRoutingKey(), incomingRabbitMQMessage.getHeaders()));
        }) : rabbitMQConsumer.toMulti().emitOn(runnable2 -> {
            VertxContext.runOnContext(clientHolder.getContext().getDelegate(), runnable2);
        }).map(rabbitMQMessage2 -> {
            return new IncomingRabbitMQMessage(rabbitMQMessage2, clientHolder, rabbitMQFailureHandler, rabbitMQAckHandler, orElse);
        });
    }

    public void terminate() {
        Flow.Subscription andSet = this.subscription.getAndSet(null);
        if (andSet != null) {
            andSet.cancel();
        }
    }
}
