/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs;

import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.util.Context;
import com.azure.core.util.IterableStream;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
import com.azure.messaging.eventhubs.EventHubProperties;
import com.azure.messaging.eventhubs.PartitionProperties;
import com.azure.messaging.eventhubs.implementation.SynchronousEventSubscriber;
import com.azure.messaging.eventhubs.implementation.SynchronousReceiveWork;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.io.Closeable;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

@ServiceClient(builder=EventHubClientBuilder.class)
public class EventHubConsumerClient
implements Closeable {
    private static final ClientLogger LOGGER = new ClientLogger(EventHubConsumerClient.class);
    private final EventHubConsumerAsyncClient consumer;
    private final ReceiveOptions defaultReceiveOptions = new ReceiveOptions();
    private final Duration timeout;
    private final AtomicInteger idGenerator = new AtomicInteger();
    private final EventHubsTracer tracer;

    EventHubConsumerClient(EventHubConsumerAsyncClient consumer, Duration tryTimeout) {
        Objects.requireNonNull(tryTimeout, "'tryTimeout' cannot be null.");
        this.consumer = Objects.requireNonNull(consumer, "'consumer' cannot be null.");
        this.timeout = tryTimeout;
        this.tracer = consumer.getInstrumentation().getTracer();
    }

    public String getFullyQualifiedNamespace() {
        return this.consumer.getFullyQualifiedNamespace();
    }

    public String getEventHubName() {
        return this.consumer.getEventHubName();
    }

    public String getConsumerGroup() {
        return this.consumer.getConsumerGroup();
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public EventHubProperties getEventHubProperties() {
        return (EventHubProperties)this.consumer.getEventHubProperties().block(this.timeout);
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public IterableStream<String> getPartitionIds() {
        return new IterableStream(this.consumer.getPartitionIds());
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public PartitionProperties getPartitionProperties(String partitionId) {
        return (PartitionProperties)this.consumer.getPartitionProperties(partitionId).block(this.timeout);
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition) {
        return this.receiveFromPartition(partitionId, maximumMessageCount, startingPosition, this.timeout);
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime) {
        if (Objects.isNull(maximumWaitTime)) {
            throw LOGGER.logExceptionAsError((RuntimeException)new NullPointerException("'maximumWaitTime' cannot be null."));
        }
        if (Objects.isNull(startingPosition)) {
            throw LOGGER.logExceptionAsError((RuntimeException)new NullPointerException("'startingPosition' cannot be null."));
        }
        if (Objects.isNull(partitionId)) {
            throw LOGGER.logExceptionAsError((RuntimeException)new NullPointerException("'partitionId' cannot be null."));
        }
        if (partitionId.isEmpty()) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("'partitionId' cannot be empty."));
        }
        if (maximumMessageCount < 1) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("'maximumMessageCount' cannot be less than 1."));
        }
        if (maximumWaitTime.isNegative() || maximumWaitTime.isZero()) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("'maximumWaitTime' cannot be zero or less."));
        }
        Instant startTime = this.tracer.isEnabled() ? Instant.now() : null;
        Flux<PartitionEvent> events = Flux.create(emitter -> this.queueWork(partitionId, maximumMessageCount, startingPosition, maximumWaitTime, this.defaultReceiveOptions, (FluxSink<PartitionEvent>)emitter));
        events = this.tracer.reportSyncReceiveSpan("EventHubs.receiveFromPartition", startTime, events, Context.NONE);
        return new IterableStream(events);
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime, ReceiveOptions receiveOptions) {
        if (Objects.isNull(maximumWaitTime)) {
            throw LOGGER.logExceptionAsError((RuntimeException)new NullPointerException("'maximumWaitTime' cannot be null."));
        }
        if (Objects.isNull(startingPosition)) {
            throw LOGGER.logExceptionAsError((RuntimeException)new NullPointerException("'startingPosition' cannot be null."));
        }
        if (Objects.isNull(partitionId)) {
            throw LOGGER.logExceptionAsError((RuntimeException)new NullPointerException("'partitionId' cannot be null."));
        }
        if (Objects.isNull(receiveOptions)) {
            throw LOGGER.logExceptionAsError((RuntimeException)new NullPointerException("'receiveOptions' cannot be null."));
        }
        if (partitionId.isEmpty()) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("'partitionId' cannot be empty."));
        }
        if (maximumMessageCount < 1) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("'maximumMessageCount' cannot be less than 1."));
        }
        if (maximumWaitTime.isNegative() || maximumWaitTime.isZero()) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("'maximumWaitTime' cannot be zero or less."));
        }
        Instant startTime = this.tracer.isEnabled() ? Instant.now() : null;
        Flux<PartitionEvent> events = Flux.create(emitter -> this.queueWork(partitionId, maximumMessageCount, startingPosition, maximumWaitTime, receiveOptions, (FluxSink<PartitionEvent>)emitter));
        events = this.tracer.reportSyncReceiveSpan("EventHubs.receiveFromPartition", startTime, events, Context.NONE);
        return new IterableStream(events);
    }

    @Override
    public void close() {
        this.consumer.close();
    }

    private void queueWork(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime, ReceiveOptions receiveOptions, FluxSink<PartitionEvent> emitter) {
        long id = this.idGenerator.getAndIncrement();
        SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maximumWaitTime, emitter);
        SynchronousEventSubscriber syncSubscriber = new SynchronousEventSubscriber(work);
        LOGGER.atInfo().addKeyValue("partitionId", partitionId).log("Started synchronous event subscriber.");
        this.consumer.receiveFromPartition(partitionId, startingPosition, receiveOptions).subscribeWith((Subscriber)syncSubscriber);
    }

    public String getIdentifier() {
        return this.consumer.getIdentifier();
    }
}

