package org.apache.pulsar.reactive.client.internal.api;

import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.reactive.client.api.MessageGroupingFunction;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.scheduler.Scheduler;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:org/apache/pulsar/reactive/client/internal/api/GroupOrderedMessageProcessors.class */
public final class GroupOrderedMessageProcessors {
    private GroupOrderedMessageProcessors() {
    }

    public static <T> Flux<GroupedFlux<Integer, Message<T>>> groupByProcessingGroup(Flux<Message<T>> flux, MessageGroupingFunction messageGroupingFunction, int i) {
        return flux.groupBy(message -> {
            return Integer.valueOf(messageGroupingFunction.resolveProcessingGroup(message, i));
        }, Math.max(Queues.XS_BUFFER_SIZE, i));
    }

    public static <T, R> Flux<R> processGroupsInOrderConcurrently(Flux<Message<T>> flux, MessageGroupingFunction messageGroupingFunction, Function<? super Message<T>, ? extends Publisher<? extends R>> function, Scheduler scheduler, int i) {
        return groupByProcessingGroup(flux, messageGroupingFunction, i).flatMap(groupedFlux -> {
            return groupedFlux.publishOn(scheduler).concatMap(function);
        }, i);
    }

    public static <T, R> Flux<R> processGroupsInOrderInParallel(Flux<Message<T>> flux, MessageGroupingFunction messageGroupingFunction, Function<? super Message<T>, ? extends Publisher<? extends R>> function, Scheduler scheduler, int i) {
        return groupByProcessingGroup(flux, messageGroupingFunction, i).parallel(i).runOn(scheduler).flatMap(groupedFlux -> {
            return groupedFlux.concatMap(function);
        }).sequential();
    }
}
