package com.azure.messaging.eventhubs.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.ChannelCacheWrapper;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RequestResponseUtils;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.TokenRequestContext;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.EventHubProperties;
import com.azure.messaging.eventhubs.PartitionProperties;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:com/azure/messaging/eventhubs/implementation/ManagementChannel.class */
public class ManagementChannel implements EventHubManagementNode {
    public static final String MANAGEMENT_ENTITY_NAME_KEY = "name";
    public static final String MANAGEMENT_PARTITION_NAME_KEY = "partition";
    public static final String MANAGEMENT_RESULT_PARTITION_IDS = "partition_ids";
    public static final String MANAGEMENT_RESULT_CREATED_AT = "created_at";
    public static final String MANAGEMENT_RESULT_BEGIN_SEQUENCE_NUMBER = "begin_sequence_number";
    public static final String MANAGEMENT_RESULT_LAST_ENQUEUED_SEQUENCE_NUMBER = "last_enqueued_sequence_number";
    public static final String MANAGEMENT_RESULT_LAST_ENQUEUED_OFFSET = "last_enqueued_offset";
    public static final String MANAGEMENT_RESULT_LAST_ENQUEUED_TIME_UTC = "last_enqueued_time_utc";
    public static final String MANAGEMENT_RESULT_RUNTIME_INFO_RETRIEVAL_TIME_UTC = "runtime_info_retrieval_time_utc";
    public static final String MANAGEMENT_RESULT_PARTITION_IS_EMPTY = "is_partition_empty";
    private static final String MANAGEMENT_ENTITY_TYPE_KEY = "type";
    private static final String MANAGEMENT_OPERATION_KEY = "operation";
    private static final String MANAGEMENT_SECURITY_TOKEN_KEY = "security_token";
    private static final String READ_OPERATION_VALUE = "READ";
    private static final String MANAGEMENT_EVENTHUB_ENTITY_TYPE = "com.microsoft:eventhub";
    private static final String MANAGEMENT_PARTITION_ENTITY_TYPE = "com.microsoft:partition";
    private static final ClientLogger LOGGER = new ClientLogger(ManagementChannel.class);
    private final TokenCredential tokenProvider;
    private final ChannelCacheWrapper channelCache;
    private final Scheduler scheduler;
    private final String eventHubName;
    private final MessageSerializer messageSerializer;
    private final TokenManagerProvider tokenManagerProvider;
    private final ReplayProcessor<AmqpEndpointState> endpointStateProcessor = ReplayProcessor.cacheLast();
    private final FluxSink<AmqpEndpointState> endpointStateSink = this.endpointStateProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
    private final Disposable subscription;
    private volatile boolean isDisposed;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ManagementChannel(ChannelCacheWrapper channelCacheWrapper, String str, TokenCredential tokenCredential, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, Scheduler scheduler) {
        this.tokenManagerProvider = (TokenManagerProvider) Objects.requireNonNull(tokenManagerProvider, "'tokenManagerProvider' cannot be null.");
        this.tokenProvider = (TokenCredential) Objects.requireNonNull(tokenCredential, "'credential' cannot be null.");
        this.eventHubName = (String) Objects.requireNonNull(str, "'eventHubName' cannot be null.");
        this.messageSerializer = (MessageSerializer) Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.channelCache = (ChannelCacheWrapper) Objects.requireNonNull(channelCacheWrapper, "'channelCache' cannot be null.");
        this.scheduler = (Scheduler) Objects.requireNonNull(scheduler, "'scheduler' cannot be null.");
        this.subscription = channelCacheWrapper.get().flatMapMany(requestResponseChannel -> {
            return requestResponseChannel.getEndpointStates().distinctUntilChanged();
        }).subscribe(amqpEndpointState -> {
            LOGGER.info("Management endpoint state: {}", new Object[]{amqpEndpointState});
            this.endpointStateSink.next(amqpEndpointState);
        }, th -> {
            LOGGER.error("Exception occurred:", new Object[]{th});
            this.endpointStateSink.error(th);
            close();
        }, () -> {
            LOGGER.info("Complete.");
            this.endpointStateSink.complete();
            close();
        });
    }

    @Override // com.azure.messaging.eventhubs.implementation.EventHubManagementNode
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStateProcessor;
    }

    @Override // com.azure.messaging.eventhubs.implementation.EventHubManagementNode
    public Mono<EventHubProperties> getEventHubProperties() {
        HashMap hashMap = new HashMap();
        hashMap.put(MANAGEMENT_ENTITY_TYPE_KEY, MANAGEMENT_EVENTHUB_ENTITY_TYPE);
        hashMap.put(MANAGEMENT_ENTITY_NAME_KEY, this.eventHubName);
        hashMap.put(MANAGEMENT_OPERATION_KEY, READ_OPERATION_VALUE);
        return getProperties(hashMap, EventHubProperties.class).publishOn(this.scheduler);
    }

    @Override // com.azure.messaging.eventhubs.implementation.EventHubManagementNode
    public Mono<PartitionProperties> getPartitionProperties(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put(MANAGEMENT_ENTITY_TYPE_KEY, MANAGEMENT_PARTITION_ENTITY_TYPE);
        hashMap.put(MANAGEMENT_ENTITY_NAME_KEY, this.eventHubName);
        hashMap.put(MANAGEMENT_PARTITION_NAME_KEY, str);
        hashMap.put(MANAGEMENT_OPERATION_KEY, READ_OPERATION_VALUE);
        return getProperties(hashMap, PartitionProperties.class).publishOn(this.scheduler);
    }

    private <T> Mono<T> getProperties(Map<String, Object> map, Class<T> cls) {
        return this.tokenProvider.getToken(new TokenRequestContext().addScopes(new String[]{this.tokenManagerProvider.getScopesFromResource(this.eventHubName)})).flatMap(accessToken -> {
            map.put(MANAGEMENT_SECURITY_TOKEN_KEY, accessToken.getToken());
            Message message = Proton.message();
            message.setApplicationProperties(new ApplicationProperties(map));
            return this.channelCache.get().flatMap(requestResponseChannel -> {
                return requestResponseChannel.sendWithAck(message).handle((message2, synchronousSink) -> {
                    if (RequestResponseUtils.isSuccessful(message2)) {
                        synchronousSink.next(this.messageSerializer.deserialize(message2, cls));
                        return;
                    }
                    AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(message2);
                    synchronousSink.error(LOGGER.logExceptionAsWarning(Exceptions.propagate(ExceptionUtil.amqpResponseCodeToException(statusCode.getValue(), RequestResponseUtils.getStatusDescription(message2), requestResponseChannel.getErrorContext()))));
                });
            });
        });
    }

    @Override // com.azure.messaging.eventhubs.implementation.EventHubManagementNode, java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed) {
            return;
        }
        this.isDisposed = true;
        this.subscription.dispose();
        this.channelCache.dispose();
    }
}
