package com.azure.messaging.servicebus;

import com.azure.core.util.IterableStream;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.WindowedSubscriber;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.publisher.Flux;

/* loaded from: input_file:com/azure/messaging/servicebus/SynchronousReceiver.class */
final class SynchronousReceiver {
    private static final String ENTITY_PATH_KEY = "entityPath";
    private static final String SYNC_RECEIVE_SPAN_NAME = "ServiceBus.receiveMessages";
    private final ClientLogger logger;
    private final ServiceBusReceiverAsyncClient asyncClient;
    private final ServiceBusTracer tracer;
    private final AtomicReference<WindowedSubscriber<ServiceBusReceivedMessage>> subscriber = new AtomicReference<>(null);
    private static final String TERMINAL_MESSAGE = "The receiver client is terminated. Re-create the client to continue receive attempt.";
    private static final WindowedSubscriber<ServiceBusReceivedMessage> DISPOSED = Flux.error(new RuntimeException("Disposed.")).subscribeWith(new WindowedSubscriber(new HashMap(0), TERMINAL_MESSAGE, new WindowedSubscriber.WindowedSubscriberOptions()));
    private static final Duration TIMEOUT_BETWEEN_MESSAGES = Duration.ofMillis(1000);

    /* JADX INFO: Access modifiers changed from: package-private */
    public SynchronousReceiver(ClientLogger clientLogger, ServiceBusReceiverAsyncClient serviceBusReceiverAsyncClient) {
        this.logger = (ClientLogger) Objects.requireNonNull(clientLogger, "'logger' cannot be null.");
        this.asyncClient = (ServiceBusReceiverAsyncClient) Objects.requireNonNull(serviceBusReceiverAsyncClient, "'asyncClient' cannot be null.");
        this.tracer = serviceBusReceiverAsyncClient.getInstrumentation().getTracer();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IterableStream<ServiceBusReceivedMessage> receive(int i, Duration duration) {
        WindowedSubscriber<ServiceBusReceivedMessage> windowedSubscriber = this.subscriber.get();
        return windowedSubscriber != null ? windowedSubscriber.enqueueRequest(i, duration) : subscribeOnce().enqueueRequest(i, duration);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispose() {
        WindowedSubscriber<ServiceBusReceivedMessage> andSet = this.subscriber.getAndSet(DISPOSED);
        if (andSet != null) {
            andSet.dispose();
        }
    }

    private WindowedSubscriber<ServiceBusReceivedMessage> subscribeOnce() {
        if (!this.asyncClient.isV2()) {
            throw this.logger.logExceptionAsError(new UnsupportedOperationException("SynchronousReceiver requires v2 mode."));
        }
        WindowedSubscriber<ServiceBusReceivedMessage> createSubscriber = createSubscriber();
        if (this.subscriber.compareAndSet(null, createSubscriber)) {
            (this.asyncClient.isSessionEnabled() ? this.asyncClient.sessionSyncReceiveV2() : this.asyncClient.nonSessionSyncReceiveV2()).subscribeWith(createSubscriber);
        }
        return this.subscriber.get();
    }

    private WindowedSubscriber<ServiceBusReceivedMessage> createSubscriber() {
        String entityPath = this.asyncClient.getEntityPath();
        ReceiverOptions receiverOptions = this.asyncClient.getReceiverOptions();
        boolean z = receiverOptions.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK;
        boolean z2 = receiverOptions.getPrefetchCount() == 0;
        WindowedSubscriber.WindowedSubscriberOptions windowedSubscriberOptions = new WindowedSubscriber.WindowedSubscriberOptions();
        if (z && z2) {
            windowedSubscriberOptions.setReleaser(this::messageReleaser);
        }
        windowedSubscriberOptions.setWindowDecorator(this::traceDecorator);
        windowedSubscriberOptions.setNextItemTimeout(TIMEOUT_BETWEEN_MESSAGES);
        return new WindowedSubscriber<>(Collections.singletonMap(ENTITY_PATH_KEY, entityPath), TERMINAL_MESSAGE, windowedSubscriberOptions);
    }

    private void messageReleaser(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        this.asyncClient.release(serviceBusReceivedMessage).subscribe(r1 -> {
        }, th -> {
            this.logger.atWarning().addKeyValue(ServiceBusConstants.LOCK_TOKEN_KEY, serviceBusReceivedMessage.getLockToken()).log("couldn't release the message.", new Object[]{th});
        }, () -> {
            this.logger.atVerbose().addKeyValue(ServiceBusConstants.LOCK_TOKEN_KEY, serviceBusReceivedMessage.getLockToken()).log("message successfully released.");
        });
    }

    private Flux<ServiceBusReceivedMessage> traceDecorator(Flux<ServiceBusReceivedMessage> flux) {
        Flux<ServiceBusReceivedMessage> traceSyncReceive = this.tracer.traceSyncReceive(SYNC_RECEIVE_SPAN_NAME, flux);
        traceSyncReceive.subscribe();
        return traceSyncReceive;
    }
}
