package io.smallrye.reactive.messaging.jms;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.reactive.messaging.jms.i18n.JmsExceptions;
import io.smallrye.reactive.messaging.jms.i18n.JmsLogging;
import io.smallrye.reactive.messaging.jms.tracing.JmsOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.jms.tracing.JmsTrace;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.inject.Instance;
import jakarta.jms.Destination;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.JMSRuntimeException;
import jakarta.jms.Message;
import jakarta.jms.Queue;
import jakarta.jms.Topic;
import java.time.Duration;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/smallrye/reactive/messaging/jms/JmsSource.class */
class JmsSource {
    private final Multi<IncomingJmsMessage<?>> source;
    private final JmsResourceHolder<JMSConsumer> resourceHolder;
    private final JmsPublisher publisher;
    private final boolean isTracingEnabled;
    private final JmsOpenTelemetryInstrumenter jmsInstrumenter;
    private final Context context;

    /* loaded from: input_file:io/smallrye/reactive/messaging/jms/JmsSource$JmsPublisher.class */
    private static class JmsPublisher implements Flow.Publisher<Message>, Flow.Subscription {
        private final JmsResourceHolder<JMSConsumer> consumerHolder;
        private boolean unbounded;
        private final AtomicLong requests = new AtomicLong();
        private final AtomicReference<Flow.Subscriber<? super Message>> downstream = new AtomicReference<>();
        private final ExecutorService executor = Executors.newSingleThreadExecutor();

        private JmsPublisher(JmsResourceHolder<JMSConsumer> jmsResourceHolder) {
            this.consumerHolder = jmsResourceHolder;
        }

        void close() {
            Flow.Subscriber<? super Message> andSet = this.downstream.getAndSet(null);
            if (andSet != null) {
                andSet.onComplete();
            }
            this.executor.shutdown();
        }

        @Override // java.util.concurrent.Flow.Publisher
        public void subscribe(Flow.Subscriber<? super Message> subscriber) {
            if (this.downstream.compareAndSet(null, subscriber)) {
                subscriber.onSubscribe(this);
            } else {
                Subscriptions.fail(subscriber, JmsExceptions.ex.illegalStateAlreadySubscriber());
            }
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void request(long j) {
            if (j <= 0 || this.unbounded) {
                return;
            }
            if (add(j) != Long.MAX_VALUE) {
                enqueue(j);
            } else {
                this.unbounded = true;
                startUnboundedReception();
            }
        }

        private void enqueue(long j) {
            for (int i = 0; i < j; i++) {
                this.executor.execute(() -> {
                    Message message = null;
                    while (message == null) {
                        try {
                            if (this.downstream.get() == null) {
                                break;
                            }
                            message = this.consumerHolder.getClient().receive();
                            if (message != null) {
                                this.requests.decrementAndGet();
                                this.downstream.get().onNext(message);
                            }
                        } catch (JMSRuntimeException e) {
                            JmsLogging.log.clientClosed();
                            Flow.Subscriber<? super Message> andSet = this.downstream.getAndSet(null);
                            if (andSet != null) {
                                andSet.onError(e);
                                return;
                            }
                            return;
                        }
                    }
                });
            }
        }

        private void startUnboundedReception() {
            this.consumerHolder.getClient().setMessageListener(message -> {
                this.downstream.get().onNext(message);
            });
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void cancel() {
            close();
        }

        long add(long j) {
            long j2;
            long j3;
            do {
                j2 = this.requests.get();
                if (j2 == Long.MAX_VALUE) {
                    return Long.MAX_VALUE;
                }
                long j4 = j2 + j;
                j3 = j4 < 0 ? Long.MAX_VALUE : j4;
            } while (!this.requests.compareAndSet(j2, j3));
            return j3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JmsSource(Vertx vertx, JmsResourceHolder<JMSConsumer> jmsResourceHolder, JmsConnectorIncomingConfiguration jmsConnectorIncomingConfiguration, Instance<OpenTelemetry> instance, JsonMapping jsonMapping, Executor executor) {
        this.isTracingEnabled = jmsConnectorIncomingConfiguration.getTracingEnabled().booleanValue();
        String channel = jmsConnectorIncomingConfiguration.getChannel();
        Optional<String> destination = jmsConnectorIncomingConfiguration.getDestination();
        Objects.requireNonNull(jmsConnectorIncomingConfiguration);
        String orElseGet = destination.orElseGet(jmsConnectorIncomingConfiguration::getChannel);
        String orElse = jmsConnectorIncomingConfiguration.getSelector().orElse(null);
        boolean booleanValue = jmsConnectorIncomingConfiguration.getNoLocal().booleanValue();
        boolean booleanValue2 = jmsConnectorIncomingConfiguration.getDurable().booleanValue();
        String destinationType = jmsConnectorIncomingConfiguration.getDestinationType();
        boolean booleanValue3 = jmsConnectorIncomingConfiguration.getRetry().booleanValue();
        this.resourceHolder = jmsResourceHolder.configure(jmsResourceHolder2 -> {
            return getDestination(jmsResourceHolder2.getContext(), orElseGet, destinationType);
        }, jmsResourceHolder3 -> {
            if (!booleanValue2) {
                return jmsResourceHolder3.getContext().createConsumer(jmsResourceHolder3.getDestination(), orElse, booleanValue);
            }
            if (jmsResourceHolder3.getDestination() instanceof Topic) {
                return jmsResourceHolder3.getContext().createDurableConsumer(jmsResourceHolder3.getDestination(), orElseGet, orElse, booleanValue);
            }
            throw JmsExceptions.ex.illegalArgumentInvalidDestination();
        });
        jmsResourceHolder.getClient();
        if (this.isTracingEnabled) {
            this.jmsInstrumenter = JmsOpenTelemetryInstrumenter.createForSource(instance);
        } else {
            this.jmsInstrumenter = null;
        }
        this.publisher = new JmsPublisher(jmsResourceHolder);
        this.context = Context.newInstance(vertx.getDelegate().createEventLoopContext());
        Multi publisher = Multi.createFrom().publisher(this.publisher);
        Context context = this.context;
        Objects.requireNonNull(context);
        this.source = publisher.emitOn(context::runOnContext).map(message -> {
            return new IncomingJmsMessage(message, executor, jsonMapping);
        }).onItem().invoke(this::incomingTrace).onFailure(th -> {
            JmsLogging.log.terminalErrorOnChannel(channel);
            this.resourceHolder.close();
            return booleanValue3;
        }).retry().withBackOff(Duration.parse(jmsConnectorIncomingConfiguration.getRetryInitialDelay()), Duration.parse(jmsConnectorIncomingConfiguration.getRetryMaxDelay())).withJitter(jmsConnectorIncomingConfiguration.getRetryJitter().doubleValue()).atMost(jmsConnectorIncomingConfiguration.getRetryMaxRetries().intValue()).onFailure().invoke(th2 -> {
            JmsLogging.log.terminalErrorRetriesExhausted(jmsConnectorIncomingConfiguration.getChannel(), th2);
        }).plug(multi -> {
            return jmsConnectorIncomingConfiguration.getBroadcast().booleanValue() ? multi.broadcast().toAllSubscribers() : multi;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.publisher.close();
        this.resourceHolder.close();
    }

    private Destination getDestination(JMSContext jMSContext, String str, String str2) {
        String lowerCase = str2.toLowerCase();
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case 107944209:
                if (lowerCase.equals("queue")) {
                    z = false;
                    break;
                }
                break;
            case 110546223:
                if (lowerCase.equals("topic")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                JmsLogging.log.creatingQueue(str);
                return jMSContext.createQueue(str);
            case true:
                JmsLogging.log.creatingTopic(str);
                return jMSContext.createTopic(str);
            default:
                throw JmsExceptions.ex.illegalArgumentUnknownDestinationType(str2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Multi<IncomingJmsMessage<?>> getSource() {
        return this.source;
    }

    public void incomingTrace(IncomingJmsMessage<?> incomingJmsMessage) {
        if (this.isTracingEnabled) {
            Optional map = incomingJmsMessage.getMetadata(IncomingJmsMessageMetadata.class).map(incomingJmsMessageMetadata -> {
                Queue destination = incomingJmsMessageMetadata.getDestination();
                if (!(destination instanceof Queue)) {
                    return null;
                }
                try {
                    return destination.getQueueName();
                } catch (JMSException e) {
                    return null;
                }
            });
            Message message = (Message) incomingJmsMessage.unwrap(Message.class);
            HashMap hashMap = new HashMap();
            try {
                Enumeration propertyNames = message.getPropertyNames();
                while (propertyNames.hasMoreElements()) {
                    String str = (String) propertyNames.nextElement();
                    hashMap.put(str, message.getObjectProperty(str));
                }
                this.jmsInstrumenter.traceIncoming(incomingJmsMessage, new JmsTrace.Builder().withQueue((String) map.orElse(null)).withMessage(message).build());
            } catch (JMSException e) {
                throw new RuntimeException((Throwable) e);
            }
        }
    }
}
