/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RequestResponseChannel;
import com.azure.core.amqp.implementation.RequestResponseUtils;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.TokenRequestContext;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.EventHubProperties;
import com.azure.messaging.eventhubs.PartitionProperties;
import com.azure.messaging.eventhubs.implementation.EventHubManagementNode;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Scheduler;

public class ManagementChannel
implements EventHubManagementNode {
    public static final String MANAGEMENT_ENTITY_NAME_KEY = "name";
    public static final String MANAGEMENT_PARTITION_NAME_KEY = "partition";
    public static final String MANAGEMENT_RESULT_PARTITION_IDS = "partition_ids";
    public static final String MANAGEMENT_RESULT_CREATED_AT = "created_at";
    public static final String MANAGEMENT_RESULT_BEGIN_SEQUENCE_NUMBER = "begin_sequence_number";
    public static final String MANAGEMENT_RESULT_LAST_ENQUEUED_SEQUENCE_NUMBER = "last_enqueued_sequence_number";
    public static final String MANAGEMENT_RESULT_LAST_ENQUEUED_OFFSET = "last_enqueued_offset";
    public static final String MANAGEMENT_RESULT_LAST_ENQUEUED_TIME_UTC = "last_enqueued_time_utc";
    public static final String MANAGEMENT_RESULT_RUNTIME_INFO_RETRIEVAL_TIME_UTC = "runtime_info_retrieval_time_utc";
    public static final String MANAGEMENT_RESULT_PARTITION_IS_EMPTY = "is_partition_empty";
    private static final String MANAGEMENT_ENTITY_TYPE_KEY = "type";
    private static final String MANAGEMENT_OPERATION_KEY = "operation";
    private static final String MANAGEMENT_SECURITY_TOKEN_KEY = "security_token";
    private static final String READ_OPERATION_VALUE = "READ";
    private static final String MANAGEMENT_EVENTHUB_ENTITY_TYPE = "com.microsoft:eventhub";
    private static final String MANAGEMENT_PARTITION_ENTITY_TYPE = "com.microsoft:partition";
    private static final ClientLogger LOGGER = new ClientLogger(ManagementChannel.class);
    private final TokenCredential tokenProvider;
    private final Mono<RequestResponseChannel> channelMono;
    private final Scheduler scheduler;
    private final String eventHubName;
    private final MessageSerializer messageSerializer;
    private final TokenManagerProvider tokenManagerProvider;
    private final ReplayProcessor<AmqpEndpointState> endpointStateProcessor = ReplayProcessor.cacheLast();
    private final FluxSink<AmqpEndpointState> endpointStateSink = this.endpointStateProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
    private final Disposable subscription;
    private volatile boolean isDisposed;

    ManagementChannel(Mono<RequestResponseChannel> responseChannelMono, String eventHubName, TokenCredential credential, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, Scheduler scheduler) {
        this.tokenManagerProvider = Objects.requireNonNull(tokenManagerProvider, "'tokenManagerProvider' cannot be null.");
        this.tokenProvider = Objects.requireNonNull(credential, "'credential' cannot be null.");
        this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null.");
        this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.channelMono = Objects.requireNonNull(responseChannelMono, "'responseChannelMono' cannot be null.");
        this.scheduler = Objects.requireNonNull(scheduler, "'scheduler' cannot be null.");
        this.subscription = responseChannelMono.flatMapMany(e -> e.getEndpointStates().distinctUntilChanged()).subscribe(e -> {
            LOGGER.info("Management endpoint state: {}", new Object[]{e});
            this.endpointStateSink.next(e);
        }, error -> {
            LOGGER.error("Exception occurred:", new Object[]{error});
            this.endpointStateSink.error(error);
            this.close();
        }, () -> {
            LOGGER.info("Complete.");
            this.endpointStateSink.complete();
            this.close();
        });
    }

    @Override
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStateProcessor;
    }

    @Override
    public Mono<EventHubProperties> getEventHubProperties() {
        HashMap<String, Object> properties = new HashMap<String, Object>();
        properties.put(MANAGEMENT_ENTITY_TYPE_KEY, MANAGEMENT_EVENTHUB_ENTITY_TYPE);
        properties.put(MANAGEMENT_ENTITY_NAME_KEY, this.eventHubName);
        properties.put(MANAGEMENT_OPERATION_KEY, READ_OPERATION_VALUE);
        return this.getProperties(properties, EventHubProperties.class).publishOn(this.scheduler);
    }

    @Override
    public Mono<PartitionProperties> getPartitionProperties(String partitionId) {
        HashMap<String, Object> properties = new HashMap<String, Object>();
        properties.put(MANAGEMENT_ENTITY_TYPE_KEY, MANAGEMENT_PARTITION_ENTITY_TYPE);
        properties.put(MANAGEMENT_ENTITY_NAME_KEY, this.eventHubName);
        properties.put(MANAGEMENT_PARTITION_NAME_KEY, partitionId);
        properties.put(MANAGEMENT_OPERATION_KEY, READ_OPERATION_VALUE);
        return this.getProperties(properties, PartitionProperties.class).publishOn(this.scheduler);
    }

    private <T> Mono<T> getProperties(Map<String, Object> properties, Class<T> responseType) {
        String tokenAudience = this.tokenManagerProvider.getScopesFromResource(this.eventHubName);
        return this.tokenProvider.getToken(new TokenRequestContext().addScopes(new String[]{tokenAudience})).flatMap(accessToken -> {
            properties.put(MANAGEMENT_SECURITY_TOKEN_KEY, accessToken.getToken());
            Message request = Proton.message();
            ApplicationProperties applicationProperties = new ApplicationProperties(properties);
            request.setApplicationProperties(applicationProperties);
            return this.channelMono.flatMap(channel -> channel.sendWithAck(request).handle((message, sink) -> {
                if (RequestResponseUtils.isSuccessful((Message)message)) {
                    sink.next(this.messageSerializer.deserialize(message, responseType));
                } else {
                    AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode((Message)message);
                    String statusDescription = RequestResponseUtils.getStatusDescription((Message)message);
                    Exception error = ExceptionUtil.amqpResponseCodeToException((int)statusCode.getValue(), (String)statusDescription, (AmqpErrorContext)channel.getErrorContext());
                    sink.error((Throwable)LOGGER.logExceptionAsWarning(Exceptions.propagate((Throwable)error)));
                }
            }));
        });
    }

    @Override
    public void close() {
        if (this.isDisposed) {
            return;
        }
        this.isDisposed = true;
        this.subscription.dispose();
        if (this.channelMono instanceof Disposable) {
            ((Disposable)this.channelMono).dispose();
        }
    }
}

