package com.azure.messaging.eventhubs;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import java.time.Duration;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import reactor.core.publisher.Mono;

@ServiceClient(builder = EventProcessorClientBuilder.class)
/* loaded from: input_file:com/azure/messaging/eventhubs/EventProcessorClient.class */
public class EventProcessorClient {
    private static final long BASE_JITTER_IN_SECONDS = 2;
    private final ClientLogger logger;
    private final String identifier;
    private final PartitionPumpManager partitionPumpManager;
    private final PartitionBasedLoadBalancer partitionBasedLoadBalancer;
    private final CheckpointStore checkpointStore;
    private final String fullyQualifiedNamespace;
    private final String eventHubName;
    private final String consumerGroup;
    private final Duration loadBalancerUpdateInterval;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final AtomicReference<ScheduledFuture<?>> runner = new AtomicReference<>();
    private final AtomicReference<ScheduledExecutorService> scheduler = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventProcessorClient(EventHubClientBuilder eventHubClientBuilder, String str, Supplier<PartitionProcessor> supplier, CheckpointStore checkpointStore, boolean z, TracerProvider tracerProvider, Consumer<ErrorContext> consumer, Map<String, EventPosition> map, int i, Duration duration, boolean z2, Duration duration2, Duration duration3, LoadBalancingStrategy loadBalancingStrategy) {
        Objects.requireNonNull(eventHubClientBuilder, "eventHubClientBuilder cannot be null.");
        Objects.requireNonNull(str, "consumerGroup cannot be null.");
        Objects.requireNonNull(supplier, "partitionProcessorFactory cannot be null.");
        EventHubAsyncClient buildAsyncClient = eventHubClientBuilder.buildAsyncClient();
        this.checkpointStore = (CheckpointStore) Objects.requireNonNull(checkpointStore, "checkpointStore cannot be null");
        this.identifier = buildAsyncClient.getIdentifier();
        HashMap hashMap = new HashMap();
        hashMap.put("eventProcessorId", this.identifier);
        this.logger = new ClientLogger(EventProcessorClient.class, hashMap);
        this.fullyQualifiedNamespace = buildAsyncClient.getFullyQualifiedNamespace().toLowerCase(Locale.ROOT);
        this.eventHubName = buildAsyncClient.getEventHubName().toLowerCase(Locale.ROOT);
        this.consumerGroup = str.toLowerCase(Locale.ROOT);
        this.loadBalancerUpdateInterval = duration2;
        this.partitionPumpManager = new PartitionPumpManager(checkpointStore, supplier, eventHubClientBuilder, z, tracerProvider, map, i, duration, z2);
        this.partitionBasedLoadBalancer = new PartitionBasedLoadBalancer(this.checkpointStore, buildAsyncClient, this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroup, this.identifier, duration3.getSeconds(), this.partitionPumpManager, consumer, loadBalancingStrategy);
    }

    public String getIdentifier() {
        return this.identifier;
    }

    public synchronized void start() {
        if (!this.isRunning.compareAndSet(false, true)) {
            this.logger.info("Event processor is already running");
            return;
        }
        this.logger.info("Starting a new event processor instance.");
        this.scheduler.set(Executors.newSingleThreadScheduledExecutor());
        Double valueOf = Double.valueOf(ThreadLocalRandom.current().nextDouble() * TimeUnit.SECONDS.toMillis(BASE_JITTER_IN_SECONDS));
        AtomicReference<ScheduledFuture<?>> atomicReference = this.runner;
        ScheduledExecutorService scheduledExecutorService = this.scheduler.get();
        PartitionBasedLoadBalancer partitionBasedLoadBalancer = this.partitionBasedLoadBalancer;
        Objects.requireNonNull(partitionBasedLoadBalancer);
        atomicReference.set(scheduledExecutorService.scheduleWithFixedDelay(partitionBasedLoadBalancer::loadBalance, valueOf.longValue(), this.loadBalancerUpdateInterval.toMillis(), TimeUnit.MILLISECONDS));
    }

    public synchronized void stop() {
        if (!this.isRunning.compareAndSet(true, false)) {
            this.logger.info("Event processor has already stopped");
            return;
        }
        this.runner.get().cancel(true);
        this.scheduler.get().shutdown();
        stopProcessing();
    }

    public synchronized boolean isRunning() {
        return this.isRunning.get();
    }

    private void stopProcessing() {
        this.partitionPumpManager.stopAllPartitionPumps();
        Mono collect = this.checkpointStore.listOwnership(this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroup).filter(partitionOwnership -> {
            return this.identifier.equals(partitionOwnership.getOwnerId());
        }).map(partitionOwnership2 -> {
            return partitionOwnership2.setOwnerId("");
        }).collect(Collectors.toList());
        CheckpointStore checkpointStore = this.checkpointStore;
        Objects.requireNonNull(checkpointStore);
        collect.flatMapMany(checkpointStore::claimOwnership).blockLast(Duration.ofSeconds(10L));
    }
}
