package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ReplayProcessor;

/* loaded from: input_file:com/azure/core/amqp/implementation/ReactorReceiver.class */
public class ReactorReceiver implements AmqpReceiveLink {
    private final String entityPath;
    private final Receiver receiver;
    private final ReceiveLinkHandler handler;
    private final TokenManager tokenManager;
    private final ReactorDispatcher dispatcher;
    private final Disposable subscriptions;
    private final EmitterProcessor<Message> messagesProcessor;
    private final ReplayProcessor<AmqpEndpointState> endpointStates;
    private final AtomicBoolean hasAuthorized = new AtomicBoolean(true);
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final ClientLogger logger = new ClientLogger((Class<?>) ReactorReceiver.class);
    private final AtomicReference<Supplier<Integer>> creditSupplier = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: protected */
    public ReactorReceiver(String str, Receiver receiver, ReceiveLinkHandler receiveLinkHandler, TokenManager tokenManager, ReactorDispatcher reactorDispatcher) {
        this.entityPath = str;
        this.receiver = receiver;
        this.handler = receiveLinkHandler;
        this.tokenManager = tokenManager;
        this.dispatcher = reactorDispatcher;
        this.messagesProcessor = (EmitterProcessor) this.handler.getDeliveredMessages().map(this::decodeDelivery).doOnNext(message -> {
            Supplier<Integer> supplier;
            Integer num;
            if (receiver.getRemoteCredit() != 0 || this.isDisposed.get() || (supplier = this.creditSupplier.get()) == null || (num = supplier.get()) == null || num.intValue() <= 0) {
                return;
            }
            addCredits(num.intValue());
        }).subscribeWith(EmitterProcessor.create());
        this.endpointStates = (ReplayProcessor) this.handler.getEndpointStates().map(endpointState -> {
            this.logger.verbose("connectionId[{}], path[{}], linkName[{}]: State {}", receiveLinkHandler.getConnectionId(), str, getLinkName(), endpointState);
            return AmqpEndpointStateUtil.getConnectionState(endpointState);
        }).subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED));
        this.subscriptions = this.tokenManager.getAuthorizationResults().subscribe(amqpResponseCode -> {
            this.logger.verbose("Token refreshed: {}", amqpResponseCode);
            this.hasAuthorized.set(true);
        }, th -> {
            this.logger.info("connectionId[{}], path[{}], linkName[{}] - tokenRenewalFailure[{}]", receiveLinkHandler.getConnectionId(), this.entityPath, getLinkName(), th.getMessage());
            this.hasAuthorized.set(false);
        }, () -> {
            this.hasAuthorized.set(false);
        });
    }

    @Override // com.azure.core.amqp.AmqpLink
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates.distinct();
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public Flux<Message> receive() {
        return this.messagesProcessor;
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public void addCredits(int i) {
        if (this.isDisposed.get()) {
            return;
        }
        try {
            this.dispatcher.invoke(() -> {
                this.receiver.flow(i);
            });
        } catch (IOException e) {
            this.logger.warning("Unable to schedule work to add more credits.", e);
        }
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public int getCredits() {
        return this.receiver.getRemoteCredit();
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public void setEmptyCreditListener(Supplier<Integer> supplier) {
        Objects.requireNonNull(supplier, "'creditSupplier' cannot be null.");
        this.creditSupplier.set(supplier);
    }

    @Override // com.azure.core.amqp.AmqpLink
    public String getLinkName() {
        return this.receiver.getName();
    }

    @Override // com.azure.core.amqp.AmqpLink
    public String getEntityPath() {
        return this.entityPath;
    }

    @Override // com.azure.core.amqp.AmqpLink
    public String getHostname() {
        return this.handler.getHostname();
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.subscriptions.dispose();
        this.messagesProcessor.onComplete();
        this.tokenManager.close();
        this.receiver.close();
        try {
            this.dispatcher.invoke(() -> {
                this.receiver.free();
                this.handler.close();
            });
        } catch (IOException e) {
            this.logger.warning("Could not schedule disposing of receiver on ReactorDispatcher.", e);
            this.handler.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispose(ErrorCondition errorCondition) {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.logger.verbose("connectionId[{}], path[{}], linkName[{}]: setting error condition {}", this.handler.getConnectionId(), this.entityPath, getLinkName(), errorCondition);
        if (this.receiver.getLocalState() != EndpointState.CLOSED) {
            this.receiver.close();
            if (this.receiver.getCondition() == null) {
                this.receiver.setCondition(errorCondition);
            }
        }
        try {
            this.dispatcher.invoke(() -> {
                this.receiver.free();
                this.handler.close();
            });
        } catch (IOException e) {
            this.logger.warning("Could not schedule disposing of receiver on ReactorDispatcher.", e);
            this.handler.close();
        }
        this.messagesProcessor.onComplete();
        this.tokenManager.close();
    }

    protected Message decodeDelivery(Delivery delivery) {
        int pending = delivery.pending();
        byte[] bArr = new byte[pending];
        int recv = this.receiver.recv(bArr, 0, pending);
        this.receiver.advance();
        Message message = Proton.message();
        message.decode(bArr, 0, recv);
        delivery.settle();
        return message;
    }

    public String toString() {
        return String.format("link name: [%s], entity path: [%s]", this.receiver.getName(), this.entityPath);
    }
}
