package com.azure.messaging.eventhubs;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LogLevel;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.implementation.PartitionProcessorException;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.CloseContext;
import com.azure.messaging.eventhubs.models.CloseReason;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventBatchContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.InitializationContext;
import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.time.Duration;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.log4j.spi.Configurator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/messaging/eventhubs/PartitionPumpManager.class */
public class PartitionPumpManager {
    private final CheckpointStore checkpointStore;
    private final Supplier<PartitionProcessor> partitionProcessorFactory;
    private final EventHubClientBuilder eventHubClientBuilder;
    private final TracerProvider tracerProvider;
    private final boolean trackLastEnqueuedEventProperties;
    private final Map<String, EventPosition> initialPartitionEventPosition;
    private final Duration maxWaitTime;
    private final int maxBatchSize;
    private final boolean batchReceiveMode;
    private final ClientLogger logger = new ClientLogger((Class<?>) PartitionPumpManager.class);
    private final Map<String, EventHubConsumerAsyncClient> partitionPumps = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionPumpManager(CheckpointStore checkpointStore, Supplier<PartitionProcessor> supplier, EventHubClientBuilder eventHubClientBuilder, boolean z, TracerProvider tracerProvider, Map<String, EventPosition> map, int i, Duration duration, boolean z2) {
        this.checkpointStore = checkpointStore;
        this.partitionProcessorFactory = supplier;
        this.eventHubClientBuilder = eventHubClientBuilder;
        this.trackLastEnqueuedEventProperties = z;
        this.tracerProvider = tracerProvider;
        this.initialPartitionEventPosition = map;
        this.maxBatchSize = i;
        this.maxWaitTime = duration;
        this.batchReceiveMode = z2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopAllPartitionPumps() {
        this.partitionPumps.forEach((str, eventHubConsumerAsyncClient) -> {
            try {
                try {
                    eventHubConsumerAsyncClient.close();
                    this.partitionPumps.remove(str);
                } catch (Exception e) {
                    this.logger.warning(Messages.FAILED_CLOSE_CONSUMER_PARTITION, str, e);
                    this.partitionPumps.remove(str);
                }
            } catch (Throwable th) {
                this.partitionPumps.remove(str);
                throw th;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void verifyPartitionConnection(PartitionOwnership partitionOwnership) {
        String partitionId = partitionOwnership.getPartitionId();
        if (this.partitionPumps.containsKey(partitionId) && this.partitionPumps.get(partitionId).isConnectionClosed()) {
            this.logger.info("Connection closed for {}, partition {}. Removing the consumer.", partitionOwnership.getEventHubName(), partitionId);
            try {
                try {
                    this.partitionPumps.get(partitionId).close();
                    this.partitionPumps.remove(partitionId);
                } catch (Exception e) {
                    this.logger.warning(Messages.FAILED_CLOSE_CONSUMER_PARTITION, partitionId, e);
                    this.partitionPumps.remove(partitionId);
                }
            } catch (Throwable th) {
                this.partitionPumps.remove(partitionId);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startPartitionPump(PartitionOwnership partitionOwnership, Checkpoint checkpoint) {
        if (this.partitionPumps.containsKey(partitionOwnership.getPartitionId())) {
            this.logger.verbose("Consumer is already running for this partition {}", partitionOwnership.getPartitionId());
            return;
        }
        try {
            PartitionContext partitionContext = new PartitionContext(partitionOwnership.getFullyQualifiedNamespace(), partitionOwnership.getEventHubName(), partitionOwnership.getConsumerGroup(), partitionOwnership.getPartitionId());
            PartitionProcessor partitionProcessor = this.partitionProcessorFactory.get();
            partitionProcessor.initialize(new InitializationContext(partitionContext));
            EventPosition latest = (checkpoint == null || checkpoint.getOffset() == null) ? (checkpoint == null || checkpoint.getSequenceNumber() == null) ? this.initialPartitionEventPosition.containsKey(partitionOwnership.getPartitionId()) ? this.initialPartitionEventPosition.get(partitionOwnership.getPartitionId()) : EventPosition.latest() : EventPosition.fromSequenceNumber(checkpoint.getSequenceNumber().longValue()) : EventPosition.fromOffset(checkpoint.getOffset().longValue());
            this.logger.info("Starting event processing from {} for partition {}", latest, partitionOwnership.getPartitionId());
            ReceiveOptions trackLastEnqueuedEventProperties = new ReceiveOptions().setOwnerLevel(0L).setTrackLastEnqueuedEventProperties(this.trackLastEnqueuedEventProperties);
            EventHubConsumerAsyncClient createConsumer = this.eventHubClientBuilder.buildAsyncClient().createConsumer(partitionOwnership.getConsumerGroup(), 500);
            this.partitionPumps.put(partitionOwnership.getPartitionId(), createConsumer);
            Flux<PartitionEvent> doOnNext = createConsumer.receiveFromPartition(partitionOwnership.getPartitionId(), latest, trackLastEnqueuedEventProperties).doOnNext(partitionEvent -> {
                if (this.logger.canLogAtLevel(LogLevel.VERBOSE)) {
                    this.logger.verbose("On next {}, {}, {}", partitionContext.getEventHubName(), partitionContext.getPartitionId(), partitionEvent.getData().getSequenceNumber());
                }
            });
            (this.maxWaitTime != null ? doOnNext.windowTimeout(this.maxBatchSize, this.maxWaitTime) : doOnNext.window(this.maxBatchSize)).concatMap((v0) -> {
                return v0.collectList();
            }).publishOn(Schedulers.boundedElastic()).subscribe(list -> {
                processEvents(partitionContext, partitionProcessor, createConsumer, list);
            }, th -> {
                handleError(partitionOwnership, createConsumer, partitionProcessor, th, partitionContext);
            }, () -> {
                partitionProcessor.close(new CloseContext(partitionContext, CloseReason.EVENT_PROCESSOR_SHUTDOWN));
                cleanup(partitionOwnership, createConsumer);
            });
        } catch (Exception e) {
            if (this.partitionPumps.containsKey(partitionOwnership.getPartitionId())) {
                cleanup(partitionOwnership, this.partitionPumps.get(partitionOwnership.getPartitionId()));
            }
            throw this.logger.logExceptionAsError(new PartitionProcessorException("Error occurred while starting partition pump for partition " + partitionOwnership.getPartitionId(), e));
        }
    }

    private void processEvent(PartitionContext partitionContext, PartitionProcessor partitionProcessor, EventHubConsumerAsyncClient eventHubConsumerAsyncClient, EventContext eventContext) {
        Context context = null;
        EventData eventData = eventContext.getEventData();
        if (eventData != null) {
            context = startProcessTracingSpan(eventData, eventHubConsumerAsyncClient.getEventHubName(), eventHubConsumerAsyncClient.getFullyQualifiedNamespace());
            if (context.getData(Tracer.SPAN_CONTEXT_KEY).isPresent()) {
                eventData.addContext(Tracer.SPAN_CONTEXT_KEY, context);
            }
        }
        try {
            if (this.logger.canLogAtLevel(LogLevel.VERBOSE)) {
                this.logger.verbose("Processing event {}, {}", partitionContext.getEventHubName(), partitionContext.getPartitionId());
            }
            partitionProcessor.processEvent(new EventContext(partitionContext, eventData, this.checkpointStore, eventContext.getLastEnqueuedEventProperties()));
            if (this.logger.canLogAtLevel(LogLevel.VERBOSE)) {
                this.logger.verbose("Completed processing event {}, {}", partitionContext.getEventHubName(), partitionContext.getPartitionId());
            }
            endProcessTracingSpan(context, Signal.complete());
        } catch (Throwable th) {
            endProcessTracingSpan(context, Signal.error(th));
            throw this.logger.logExceptionAsError(new PartitionProcessorException("Error in event processing callback", th));
        }
    }

    private void processEvents(PartitionContext partitionContext, PartitionProcessor partitionProcessor, EventHubConsumerAsyncClient eventHubConsumerAsyncClient, List<PartitionEvent> list) {
        try {
            if (this.batchReceiveMode) {
                LastEnqueuedEventProperties[] lastEnqueuedEventPropertiesArr = new LastEnqueuedEventProperties[1];
                EventBatchContext eventBatchContext = new EventBatchContext(partitionContext, (List) list.stream().map(partitionEvent -> {
                    lastEnqueuedEventPropertiesArr[0] = partitionEvent.getLastEnqueuedEventProperties();
                    return partitionEvent.getData();
                }).collect(Collectors.toList()), this.checkpointStore, lastEnqueuedEventPropertiesArr[0]);
                if (this.logger.canLogAtLevel(LogLevel.VERBOSE)) {
                    this.logger.verbose("Processing event batch {}, {}", partitionContext.getEventHubName(), partitionContext.getPartitionId());
                }
                partitionProcessor.processEventBatch(eventBatchContext);
                if (this.logger.canLogAtLevel(LogLevel.VERBOSE)) {
                    this.logger.verbose("Completed processing event batch{}, {}", partitionContext.getEventHubName(), partitionContext.getPartitionId());
                }
            } else {
                processEvent(partitionContext, partitionProcessor, eventHubConsumerAsyncClient, new EventContext(partitionContext, list.size() == 1 ? list.get(0).getData() : null, this.checkpointStore, list.size() == 1 ? list.get(0).getLastEnqueuedEventProperties() : null));
            }
        } catch (Throwable th) {
            throw this.logger.logExceptionAsError(new PartitionProcessorException("Error in event processing callback", th));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, EventHubConsumerAsyncClient> getPartitionPumps() {
        return this.partitionPumps;
    }

    private void handleError(PartitionOwnership partitionOwnership, EventHubConsumerAsyncClient eventHubConsumerAsyncClient, PartitionProcessor partitionProcessor, Throwable th, PartitionContext partitionContext) {
        boolean z = true;
        if (!(th instanceof PartitionProcessorException)) {
            z = false;
            this.logger.warning("Error receiving events from partition {}", partitionContext.getPartitionId(), th);
            partitionProcessor.processError(new ErrorContext(partitionContext, th));
        }
        partitionProcessor.close(new CloseContext(partitionContext, CloseReason.LOST_PARTITION_OWNERSHIP));
        cleanup(partitionOwnership, eventHubConsumerAsyncClient);
        if (z) {
            throw this.logger.logExceptionAsError((PartitionProcessorException) th);
        }
    }

    private void cleanup(PartitionOwnership partitionOwnership, EventHubConsumerAsyncClient eventHubConsumerAsyncClient) {
        try {
            this.logger.info("Closing consumer for partition id {}", partitionOwnership.getPartitionId());
            eventHubConsumerAsyncClient.close();
            this.logger.info("Removing partition id {} from list of processing partitions", partitionOwnership.getPartitionId());
            this.partitionPumps.remove(partitionOwnership.getPartitionId());
        } catch (Throwable th) {
            this.logger.info("Removing partition id {} from list of processing partitions", partitionOwnership.getPartitionId());
            this.partitionPumps.remove(partitionOwnership.getPartitionId());
            throw th;
        }
    }

    private Context startProcessTracingSpan(EventData eventData, String str, String str2) {
        Object obj = eventData.getProperties().get(Tracer.DIAGNOSTIC_ID_KEY);
        if (obj == null || !this.tracerProvider.isEnabled()) {
            return Context.NONE;
        }
        Context addData = this.tracerProvider.extractContext(obj.toString(), Context.NONE).addData(Tracer.ENTITY_PATH_KEY, str).addData("hostname", str2).addData(Tracer.AZ_TRACING_NAMESPACE_KEY, ClientConstants.AZ_NAMESPACE_VALUE);
        return this.tracerProvider.startSpan(ClientConstants.AZ_TRACING_SERVICE_NAME, eventData.getEnqueuedTime() == null ? addData : addData.addData(Tracer.MESSAGE_ENQUEUED_TIME, Long.valueOf(eventData.getEnqueuedTime().getEpochSecond())), ProcessKind.PROCESS);
    }

    private void endProcessTracingSpan(Context context, Signal<Void> signal) {
        if (context == null) {
            return;
        }
        Optional<Object> data = context.getData("scope");
        if (data.isPresent() && this.tracerProvider.isEnabled()) {
            Object obj = data.get();
            if (obj instanceof AutoCloseable) {
                try {
                    ((AutoCloseable) obj).close();
                } catch (Exception e) {
                    this.logger.error(Messages.EVENT_PROCESSOR_RUN_END, e);
                }
            } else {
                ClientLogger clientLogger = this.logger;
                Locale locale = Locale.US;
                String str = Messages.PROCESS_SPAN_SCOPE_TYPE_ERROR;
                Object[] objArr = new Object[1];
                objArr[0] = obj != null ? obj.getClass() : Configurator.NULL;
                clientLogger.verbose(String.format(locale, str, objArr));
            }
            this.tracerProvider.endSpan(context, signal);
        }
    }
}
