package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.commons.cli.HelpFormatter;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.ReplayProcessor;

/* loaded from: input_file:com/azure/core/amqp/implementation/RequestResponseChannel.class */
public class RequestResponseChannel implements Disposable {
    private final Sender sendLink;
    private final Receiver receiveLink;
    private final String replyTo;
    private final MessageSerializer messageSerializer;
    private final ReactorProvider provider;
    private final Duration operationTimeout;
    private final SendLinkHandler sendLinkHandler;
    private final ReceiveLinkHandler receiveLinkHandler;
    private final Disposable.Composite subscriptions;
    private final AmqpRetryPolicy retryPolicy;
    private final SenderSettleMode senderSettleMode;
    private final String linkName;
    private final ConcurrentSkipListMap<UnsignedLong, MonoSink<Message>> unconfirmedSends = new ConcurrentSkipListMap<>();
    private final AtomicBoolean hasError = new AtomicBoolean();
    private final ReplayProcessor<AmqpEndpointState> endpointStates = ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED);
    private final FluxSink<AmqpEndpointState> endpointStatesSink = this.endpointStates.sink(FluxSink.OverflowStrategy.BUFFER);
    private final ClientLogger logger = new ClientLogger((Class<?>) RequestResponseChannel.class);
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicLong requestId = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: protected */
    public RequestResponseChannel(String str, String str2, String str3, String str4, Session session, AmqpRetryOptions amqpRetryOptions, ReactorHandlerProvider reactorHandlerProvider, ReactorProvider reactorProvider, MessageSerializer messageSerializer, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) {
        this.linkName = str3;
        this.provider = reactorProvider;
        this.operationTimeout = amqpRetryOptions.getTryTimeout();
        this.retryPolicy = RetryUtil.getRetryPolicy(amqpRetryOptions);
        this.senderSettleMode = senderSettleMode;
        this.replyTo = str4.replace("$", "") + "-client-reply-to";
        this.messageSerializer = messageSerializer;
        this.sendLink = session.sender(str3 + ":sender");
        Target target = new Target();
        target.setAddress(str4);
        this.sendLink.setTarget(target);
        this.sendLink.setSource(new Source());
        this.sendLink.setSenderSettleMode(senderSettleMode);
        this.sendLinkHandler = reactorHandlerProvider.createSendLinkHandler(str, str2, str3, str4);
        BaseHandler.setHandler(this.sendLink, this.sendLinkHandler);
        this.receiveLink = session.receiver(str3 + ":receiver");
        Source source = new Source();
        source.setAddress(str4);
        this.receiveLink.setSource(source);
        Target target2 = new Target();
        target2.setAddress(this.replyTo);
        this.receiveLink.setTarget(target2);
        this.receiveLink.setSenderSettleMode(senderSettleMode);
        this.receiveLink.setReceiverSettleMode(receiverSettleMode);
        this.receiveLinkHandler = reactorHandlerProvider.createReceiveLinkHandler(str, str2, str3, str4);
        BaseHandler.setHandler(this.receiveLink, this.receiveLinkHandler);
        this.subscriptions = Disposables.composite(this.receiveLinkHandler.getDeliveredMessages().map(this::decodeDelivery).subscribe((Consumer<? super V>) message -> {
            this.logger.verbose("{} - Settling message: {}", this.linkName, message.getCorrelationId());
            settleMessage(message);
        }), this.receiveLinkHandler.getEndpointStates().subscribe(endpointState -> {
            this.endpointStatesSink.next(AmqpEndpointStateUtil.getConnectionState(endpointState));
        }, this::handleError, this::dispose), this.sendLinkHandler.getEndpointStates().subscribe(endpointState2 -> {
            this.endpointStatesSink.next(AmqpEndpointStateUtil.getConnectionState(endpointState2));
        }, this::handleError, this::dispose));
        try {
            reactorProvider.getReactorDispatcher().invoke(() -> {
                this.sendLink.open();
                this.receiveLink.open();
            });
        } catch (IOException e) {
            throw this.logger.logExceptionAsError(new RuntimeException("Unable to open send and receive link.", e));
        }
    }

    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates;
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.subscriptions.dispose();
        this.sendLink.close();
        this.receiveLink.close();
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    public Mono<Message> sendWithAck(Message message) {
        return sendWithAck(message, null);
    }

    public Mono<Message> sendWithAck(Message message, DeliveryState deliveryState) {
        if (isDisposed()) {
            return FluxUtil.monoError(this.logger, new IllegalStateException("Cannot send a message when request response channel is disposed."));
        }
        if (message == null) {
            return FluxUtil.monoError(this.logger, new NullPointerException("message cannot be null"));
        }
        if (message.getMessageId() != null) {
            return FluxUtil.monoError(this.logger, new IllegalArgumentException("message.getMessageId() should be null"));
        }
        if (message.getReplyTo() != null) {
            return FluxUtil.monoError(this.logger, new IllegalArgumentException("message.getReplyTo() should be null"));
        }
        UnsignedLong valueOf = UnsignedLong.valueOf(this.requestId.incrementAndGet());
        message.setMessageId(valueOf);
        message.setReplyTo(this.replyTo);
        return RetryUtil.withRetry(Mono.when((Publisher<?>[]) new Publisher[]{this.sendLinkHandler.getEndpointStates().takeUntil(endpointState -> {
            return endpointState == EndpointState.ACTIVE;
        }), this.receiveLinkHandler.getEndpointStates().takeUntil(endpointState2 -> {
            return endpointState2 == EndpointState.ACTIVE;
        })}), this.operationTimeout, this.retryPolicy).then(Mono.create(monoSink -> {
            try {
                this.logger.verbose("{} - Scheduling on dispatcher. Message Id {}", this.linkName, valueOf);
                this.unconfirmedSends.putIfAbsent(valueOf, monoSink);
                this.provider.getReactorDispatcher().invoke(() -> {
                    Delivery delivery = this.sendLink.delivery(UUID.randomUUID().toString().replace(HelpFormatter.DEFAULT_OPT_PREFIX, "").getBytes(StandardCharsets.UTF_8));
                    if (deliveryState != null) {
                        this.logger.verbose("{} - Setting delivery state as [{}].", this.linkName, deliveryState);
                        delivery.setMessageFormat(0);
                        delivery.disposition(deliveryState);
                    }
                    int size = this.messageSerializer.getSize(message) + 512;
                    byte[] bArr = new byte[size];
                    int encode = message.encode(bArr, 0, size);
                    this.receiveLink.flow(1);
                    this.sendLink.send(bArr, 0, encode);
                    delivery.settle();
                    this.sendLink.advance();
                });
            } catch (IOException e) {
                monoSink.error(e);
            }
        }));
    }

    public AmqpErrorContext getErrorContext() {
        return this.receiveLinkHandler.getErrorContext(this.receiveLink);
    }

    protected Message decodeDelivery(Delivery delivery) {
        Message message = Proton.message();
        int pending = delivery.pending();
        byte[] bArr = new byte[pending];
        message.decode(bArr, 0, this.receiveLink.recv(bArr, 0, pending));
        if (this.senderSettleMode == SenderSettleMode.SETTLED) {
            delivery.disposition(Accepted.getInstance());
            delivery.settle();
        }
        return message;
    }

    private void settleMessage(Message message) {
        String valueOf = String.valueOf(message.getCorrelationId());
        MonoSink<Message> remove = this.unconfirmedSends.remove(UnsignedLong.valueOf(valueOf));
        if (remove != null) {
            remove.success(message);
        } else {
            this.logger.warning("{} - Received delivery without pending messageId[{}]. Size[{}]", this.linkName, valueOf, Integer.valueOf(this.unconfirmedSends.size()));
        }
    }

    private void handleError(Throwable th) {
        if (this.hasError.getAndSet(true)) {
            return;
        }
        this.endpointStatesSink.error(th);
        this.logger.error("{} - Exception in RequestResponse links. Disposing and clearing unconfirmed sends.", this.linkName, th);
        dispose();
        this.unconfirmedSends.forEach((unsignedLong, monoSink) -> {
            monoSink.error(th);
        });
        this.unconfirmedSends.clear();
    }
}
