package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.handler.DeliverySettleMode;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

/* loaded from: input_file:com/azure/core/amqp/implementation/RequestResponseChannel.class */
public class RequestResponseChannel implements AsyncCloseable {
    private static final String MANAGEMENT_OPERATION_KEY = "operation";
    private final ClientLogger logger;
    private final Sender sendLink;
    private final Receiver receiveLink;
    private final SendLinkHandler sendLinkHandler;
    private final RequestChannelWrapper receiveLinkHandler;
    private final SenderSettleMode senderSettleMode;
    private volatile AmqpEndpointState sendLinkState;
    private volatile AmqpEndpointState receiveLinkState;
    private final String connectionId;
    private final AmqpRetryOptions retryOptions;
    private final String replyTo;
    private final String activeEndpointTimeoutMessage;
    private final MessageSerializer messageSerializer;
    private final ReactorProvider provider;
    private final AmqpMetricsProvider metricsProvider;
    private static final String START_SEND_TIME_CONTEXT_KEY = "send-start-time";
    private static final String OPERATION_CONTEXT_KEY = "amqpOperation";
    private final Sinks.Many<AmqpEndpointState> endpointStates = Sinks.many().multicast().onBackpressureBuffer();
    private final AtomicLong requestId = new AtomicLong(0);
    private final ConcurrentSkipListMap<UnsignedLong, MonoSink<Message>> unconfirmedSends = new ConcurrentSkipListMap<>();
    private final AtomicInteger pendingLinkTerminations = new AtomicInteger(2);
    private final Sinks.One<Void> closeMono = Sinks.one();
    private final AtomicBoolean hasError = new AtomicBoolean();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Disposable.Composite subscriptions = Disposables.composite();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/RequestResponseChannel$RequestChannelWrapper.class */
    public static final class RequestChannelWrapper {
        private final boolean isV2;
        private final ReceiveLinkHandler receiveLinkHandler;
        private final ReceiveLinkHandler2 receiveLinkHandler2;

        RequestChannelWrapper(String str, String str2, String str3, String str4, Receiver receiver, ReactorHandlerProvider reactorHandlerProvider, ReactorProvider reactorProvider, AmqpRetryOptions amqpRetryOptions, boolean z) {
            this.isV2 = z;
            if (this.isV2) {
                this.receiveLinkHandler2 = reactorHandlerProvider.createReceiveLinkHandler(str, str2, str3, str4, DeliverySettleMode.ACCEPT_AND_SETTLE_ON_DELIVERY, false, reactorProvider.getReactorDispatcher(), amqpRetryOptions);
                BaseHandler.setHandler(receiver, this.receiveLinkHandler2);
                this.receiveLinkHandler = null;
            } else {
                this.receiveLinkHandler = reactorHandlerProvider.createReceiveLinkHandler(str, str2, str3, str4);
                BaseHandler.setHandler(receiver, this.receiveLinkHandler);
                this.receiveLinkHandler2 = null;
            }
        }

        Flux<EndpointState> getEndpointStates() {
            return this.isV2 ? this.receiveLinkHandler2.getEndpointStates() : this.receiveLinkHandler.getEndpointStates();
        }

        Flux<Message> getDeliveredMessages(Function<Delivery, Message> function) {
            return this.isV2 ? this.receiveLinkHandler2.getMessages() : this.receiveLinkHandler.getDeliveredMessages().map(function);
        }

        public AmqpErrorContext getErrorContext(Receiver receiver) {
            return this.isV2 ? this.receiveLinkHandler2.getErrorContext(receiver) : this.receiveLinkHandler.getErrorContext(receiver);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RequestResponseChannel(AmqpConnection amqpConnection, String str, String str2, String str3, String str4, Session session, AmqpRetryOptions amqpRetryOptions, ReactorHandlerProvider reactorHandlerProvider, ReactorProvider reactorProvider, MessageSerializer messageSerializer, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode, AmqpMetricsProvider amqpMetricsProvider, boolean z) {
        Map<String, Object> createContextWithConnectionId = AmqpLoggingUtils.createContextWithConnectionId(str);
        createContextWithConnectionId.put(ClientConstants.LINK_NAME_KEY, str3);
        this.logger = new ClientLogger(RequestResponseChannel.class, createContextWithConnectionId);
        this.connectionId = str;
        this.retryOptions = amqpRetryOptions;
        this.provider = reactorProvider;
        this.senderSettleMode = senderSettleMode;
        this.activeEndpointTimeoutMessage = String.format("RequestResponseChannel connectionId[%s], linkName[%s]: Waiting for send and receive handler to be ACTIVE", str, str3);
        this.replyTo = str4.replace("$", "") + "-client-reply-to";
        this.messageSerializer = messageSerializer;
        this.sendLink = session.sender(str3 + ":sender");
        Target target = new Target();
        target.setAddress(str4);
        this.sendLink.setTarget(target);
        this.sendLink.setSource(new Source());
        this.sendLink.setSenderSettleMode(senderSettleMode);
        this.sendLinkHandler = reactorHandlerProvider.createSendLinkHandler(str, str2, str3, str4);
        BaseHandler.setHandler(this.sendLink, this.sendLinkHandler);
        this.receiveLink = session.receiver(str3 + ":receiver");
        Source source = new Source();
        source.setAddress(str4);
        this.receiveLink.setSource(source);
        Target target2 = new Target();
        target2.setAddress(this.replyTo);
        this.receiveLink.setTarget(target2);
        this.receiveLink.setSenderSettleMode(senderSettleMode);
        this.receiveLink.setReceiverSettleMode(receiverSettleMode);
        this.receiveLinkHandler = new RequestChannelWrapper(str, str2, str3, str4, this.receiveLink, reactorHandlerProvider, reactorProvider, amqpRetryOptions, z);
        this.metricsProvider = amqpMetricsProvider;
        this.subscriptions.add(this.receiveLinkHandler.getDeliveredMessages(this::decodeDelivery).subscribe(message -> {
            this.logger.atVerbose().addKeyValue("messageId", message.getCorrelationId()).log("Settling message.");
            settleMessage(message);
        }));
        this.subscriptions.add(this.receiveLinkHandler.getEndpointStates().subscribe(endpointState -> {
            updateEndpointState(null, AmqpEndpointStateUtil.getConnectionState(endpointState));
        }, th -> {
            handleError(th, "Error in ReceiveLinkHandler.");
            onTerminalState("ReceiveLinkHandler");
        }, () -> {
            closeAsync().subscribe();
            onTerminalState("ReceiveLinkHandler");
        }));
        this.subscriptions.add(this.sendLinkHandler.getEndpointStates().subscribe(endpointState2 -> {
            updateEndpointState(AmqpEndpointStateUtil.getConnectionState(endpointState2), null);
        }, th2 -> {
            handleError(th2, "Error in SendLinkHandler.");
            onTerminalState("SendLinkHandler");
        }, () -> {
            closeAsync().subscribe();
            onTerminalState("SendLinkHandler");
        }));
        this.subscriptions.add(amqpConnection.getShutdownSignals().next().flatMap(amqpShutdownSignal -> {
            this.logger.verbose("Shutdown signal received.");
            return closeAsync();
        }).subscribe());
        try {
            this.provider.getReactorDispatcher().invoke(() -> {
                this.sendLink.open();
                this.receiveLink.open();
            });
        } catch (IOException | RejectedExecutionException e) {
            throw this.logger.logExceptionAsWarning(new RuntimeException("Unable to open send and receive link.", e));
        }
    }

    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates.asFlux();
    }

    public Mono<Void> closeAsync() {
        Mono<Void> subscribeOn = this.closeMono.asMono().timeout(this.retryOptions.getTryTimeout()).onErrorResume(TimeoutException.class, timeoutException -> {
            return Mono.fromRunnable(() -> {
                this.logger.info("Timed out waiting for RequestResponseChannel to complete closing. Manually closing.");
                onTerminalState("SendLinkHandler");
                onTerminalState("ReceiveLinkHandler");
            });
        }).subscribeOn(Schedulers.boundedElastic());
        if (this.isDisposed.getAndSet(true)) {
            this.logger.verbose("Channel already closed.");
            return subscribeOn;
        }
        this.logger.verbose("Closing request/response channel.");
        return Mono.fromRunnable(() -> {
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    this.logger.verbose("Closing send link and receive link.");
                    this.sendLink.close();
                    this.receiveLink.close();
                });
            } catch (IOException | RejectedExecutionException e) {
                this.logger.info("Unable to schedule close work. Closing manually.");
                this.sendLink.close();
                this.receiveLink.close();
            }
        }).subscribeOn(Schedulers.boundedElastic()).then(subscribeOn);
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isDisposedOrDisposalInInProgress() {
        return isDisposed() || this.hasError.get() || this.pendingLinkTerminations.get() <= 1;
    }

    public Mono<Message> sendWithAck(Message message) {
        return sendWithAck(message, null);
    }

    public Mono<Message> sendWithAck(Message message, DeliveryState deliveryState) {
        if (isDisposed()) {
            return FluxUtil.monoError(this.logger, new RequestResponseChannelClosedException(this.connectionId));
        }
        if (message == null) {
            return FluxUtil.monoError(this.logger, new NullPointerException("message cannot be null"));
        }
        if (message.getMessageId() != null) {
            return FluxUtil.monoError(this.logger, new IllegalArgumentException("message.getMessageId() should be null"));
        }
        if (message.getReplyTo() != null) {
            return FluxUtil.monoError(this.logger, new IllegalArgumentException("message.getReplyTo() should be null"));
        }
        UnsignedLong valueOf = UnsignedLong.valueOf(this.requestId.incrementAndGet());
        message.setMessageId(valueOf);
        message.setReplyTo(this.replyTo);
        return RetryUtil.withRetry(Mono.when(new Publisher[]{this.sendLinkHandler.getEndpointStates().takeUntil(endpointState -> {
            return endpointState == EndpointState.ACTIVE;
        }), this.receiveLinkHandler.getEndpointStates().takeUntil(endpointState2 -> {
            return endpointState2 == EndpointState.ACTIVE;
        })}), this.retryOptions, this.activeEndpointTimeoutMessage).then(captureStartTime(message, Mono.create(monoSink -> {
            try {
                this.logger.atVerbose().addKeyValue("messageId", message.getCorrelationId()).log("Scheduling on dispatcher.");
                this.unconfirmedSends.putIfAbsent(valueOf, monoSink);
                this.provider.getReactorDispatcher().invoke(() -> {
                    if (isDisposed()) {
                        monoSink.error(new RequestResponseChannelClosedException(this.connectionId, this.sendLink.getLocalState(), this.receiveLink.getLocalState()));
                        return;
                    }
                    Delivery delivery = this.sendLink.delivery(UUID.randomUUID().toString().replace("-", "").getBytes(StandardCharsets.UTF_8));
                    if (deliveryState != null) {
                        this.logger.atVerbose().addKeyValue("state", deliveryState).log("Setting delivery state.");
                        delivery.setMessageFormat(0);
                        delivery.disposition(deliveryState);
                    }
                    int size = this.messageSerializer.getSize(message) + 512;
                    byte[] bArr = new byte[size];
                    int encode = message.encode(bArr, 0, size);
                    this.receiveLink.flow(1);
                    this.sendLink.send(bArr, 0, encode);
                    delivery.settle();
                    this.sendLink.advance();
                });
            } catch (IOException | RejectedExecutionException e) {
                recordDelivery(getSinkContext(monoSink), null);
                monoSink.error(e);
            }
        })));
    }

    public AmqpErrorContext getErrorContext() {
        return this.receiveLinkHandler.getErrorContext(this.receiveLink);
    }

    protected Message decodeDelivery(Delivery delivery) {
        Message message = Proton.message();
        int pending = delivery.pending();
        byte[] bArr = new byte[pending];
        message.decode(bArr, 0, this.receiveLink.recv(bArr, 0, pending));
        if (this.senderSettleMode == SenderSettleMode.SETTLED) {
            delivery.disposition(Accepted.getInstance());
            delivery.settle();
        }
        return message;
    }

    private void settleMessage(Message message) {
        MonoSink<Message> remove = this.unconfirmedSends.remove(message.getCorrelationId() instanceof UnsignedLong ? (UnsignedLong) message.getCorrelationId() : UnsignedLong.valueOf(String.valueOf(message.getCorrelationId())));
        if (remove == null) {
            this.logger.atWarning().addKeyValue("messageId", message.getCorrelationId()).log("Received delivery without pending message.");
        } else {
            recordDelivery(getSinkContext(remove), message);
            remove.success(message);
        }
    }

    private void handleError(Throwable th, String str) {
        if (this.hasError.getAndSet(true)) {
            return;
        }
        this.logger.atWarning().log("{} Disposing unconfirmed sends.", new Object[]{str, th});
        this.endpointStates.emitError(th, (signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atWarning(), signalType, emitResult).log("Could not emit error to sink.");
            return false;
        });
        terminateUnconfirmedSends(th);
        closeAsync().subscribe();
    }

    private void onTerminalState(String str) {
        if (this.pendingLinkTerminations.get() <= 0) {
            this.logger.atVerbose().log("Already disposed send/receive links.");
            return;
        }
        int decrementAndGet = this.pendingLinkTerminations.decrementAndGet();
        this.logger.verbose("{} disposed. Remaining: {}", new Object[]{str, Integer.valueOf(decrementAndGet)});
        if (decrementAndGet == 0) {
            this.subscriptions.dispose();
            terminateUnconfirmedSends(new AmqpException(true, "The RequestResponseChannel didn't receive the acknowledgment for the send due receive link termination.", null));
            this.endpointStates.emitComplete((signalType, emitResult) -> {
                return onEmitSinkFailure(signalType, emitResult, "Could not emit complete signal.");
            });
            this.closeMono.emitEmpty((signalType2, emitResult2) -> {
                return onEmitSinkFailure(signalType2, emitResult2, str + ". Error closing mono.");
            });
        }
    }

    private boolean onEmitSinkFailure(SignalType signalType, Sinks.EmitResult emitResult, String str) {
        AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atVerbose(), signalType, emitResult).log(str);
        return false;
    }

    private synchronized void updateEndpointState(AmqpEndpointState amqpEndpointState, AmqpEndpointState amqpEndpointState2) {
        if (amqpEndpointState != null) {
            this.sendLinkState = amqpEndpointState;
        } else if (amqpEndpointState2 != null) {
            this.receiveLinkState = amqpEndpointState2;
        }
        this.logger.atVerbose().addKeyValue("sendState", this.sendLinkState).addKeyValue("receiveState", this.receiveLinkState).log("Updating endpoint states.");
        if (this.sendLinkState == this.receiveLinkState) {
            this.endpointStates.emitNext(this.sendLinkState, Sinks.EmitFailureHandler.FAIL_FAST);
        }
    }

    private void terminateUnconfirmedSends(Throwable th) {
        this.logger.verbose("Terminating {} unconfirmed sends (reason: {}).", new Object[]{Integer.valueOf(this.unconfirmedSends.size()), th.getMessage()});
        int i = 0;
        while (true) {
            Map.Entry<UnsignedLong, MonoSink<Message>> pollFirstEntry = this.unconfirmedSends.pollFirstEntry();
            if (pollFirstEntry == null) {
                this.logger.atVerbose().log("completed the termination of {} unconfirmed sends (reason: {}).", new Object[]{Integer.valueOf(i), th.getMessage()});
                return;
            }
            MonoSink<Message> value = pollFirstEntry.getValue();
            recordDelivery(getSinkContext(value), null);
            value.error(th);
            i++;
        }
    }

    private Mono<Message> captureStartTime(Message message, Mono<Message> mono) {
        if (!this.metricsProvider.isRequestResponseDurationEnabled()) {
            return mono;
        }
        String str = "unknown";
        if (message != null && message.getApplicationProperties() != null && message.getApplicationProperties().getValue() != null) {
            Object obj = message.getApplicationProperties().getValue().get(MANAGEMENT_OPERATION_KEY);
            if (obj instanceof String) {
                str = (String) obj;
            }
        }
        return mono.contextWrite(Context.of(START_SEND_TIME_CONTEXT_KEY, Instant.now()).put("amqpOperation", str));
    }

    private static ContextView getSinkContext(MonoSink<?> monoSink) {
        return monoSink.currentContext();
    }

    private void recordDelivery(ContextView contextView, Message message) {
        if (this.metricsProvider.isRequestResponseDurationEnabled()) {
            Object orDefault = contextView.getOrDefault(START_SEND_TIME_CONTEXT_KEY, (Object) null);
            Object orDefault2 = contextView.getOrDefault("amqpOperation", (Object) null);
            AmqpResponseCode statusCode = message == null ? null : RequestResponseUtils.getStatusCode(message);
            if ((orDefault instanceof Instant) && (orDefault2 instanceof String)) {
                this.metricsProvider.recordRequestResponseDuration(((Instant) orDefault).toEpochMilli(), (String) orDefault2, statusCode);
            }
        }
    }
}
