package org.apache.pulsar.reactive.client.internal.api;

import java.time.Duration;
import java.util.Iterator;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.reactive.client.api.MessageGroupingFunction;
import org.apache.pulsar.reactive.client.api.MessageResult;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.class */
public class DefaultReactiveMessagePipelineBuilder<T> implements ReactiveMessagePipelineBuilder.ConcurrentOneByOneMessagePipelineBuilder<T> {
    private static final MessageGroupingFunction KEY_ORDERED_GROUPING_FUNCTION;
    private final ReactiveMessageConsumer<T> messageConsumer;
    private Function<Message<T>, Publisher<Void>> messageHandler;
    private BiConsumer<Message<T>, Throwable> errorLogger;
    private Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> streamingMessageHandler;
    private int concurrency;
    private int maxInflight;
    private MessageGroupingFunction groupingFunction;
    private final Logger LOG = LoggerFactory.getLogger(DefaultReactiveMessagePipelineBuilder.class);
    private Retry pipelineRetrySpec = Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(5)).maxBackoff(Duration.ofMinutes(1)).doBeforeRetry(retrySignal -> {
        this.LOG.error("Message handler pipeline failed.Retrying to start message handler pipeline, retry #{}", Long.valueOf(retrySignal.totalRetriesInARow()), retrySignal.failure());
    });
    private Duration handlingTimeout = Duration.ofSeconds(120);
    private Function<Mono<Void>, Publisher<Void>> transformer = mono -> {
        return mono;
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultReactiveMessagePipelineBuilder(ReactiveMessageConsumer<T> reactiveMessageConsumer) {
        this.messageConsumer = reactiveMessageConsumer;
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder
    public ReactiveMessagePipelineBuilder.OneByOneMessagePipelineBuilder<T> messageHandler(Function<Message<T>, Publisher<Void>> function) {
        this.messageHandler = function;
        return this;
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder
    public ReactiveMessagePipelineBuilder<T> streamingMessageHandler(Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> function) {
        this.streamingMessageHandler = function;
        return this;
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder.OneByOneMessagePipelineBuilder
    public ReactiveMessagePipelineBuilder.OneByOneMessagePipelineBuilder<T> errorLogger(BiConsumer<Message<T>, Throwable> biConsumer) {
        this.errorLogger = biConsumer;
        return this;
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder.ConcurrentOneByOneMessagePipelineBuilder
    public ReactiveMessagePipelineBuilder.ConcurrentOneByOneMessagePipelineBuilder<T> useKeyOrderedProcessing() {
        Objects.requireNonNull(KEY_ORDERED_GROUPING_FUNCTION, "MessageGroupingFunction to use for key ordered processing wasn't found by service loader.");
        groupOrderedProcessing(KEY_ORDERED_GROUPING_FUNCTION);
        return this;
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder.ConcurrentOneByOneMessagePipelineBuilder
    public ReactiveMessagePipelineBuilder.ConcurrentOneByOneMessagePipelineBuilder<T> groupOrderedProcessing(MessageGroupingFunction messageGroupingFunction) {
        this.groupingFunction = messageGroupingFunction;
        return this;
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder.OneByOneMessagePipelineBuilder
    public ReactiveMessagePipelineBuilder.ConcurrentOneByOneMessagePipelineBuilder<T> concurrency(int i) {
        this.concurrency = i;
        return this;
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder.ConcurrentOneByOneMessagePipelineBuilder
    public ReactiveMessagePipelineBuilder.ConcurrentOneByOneMessagePipelineBuilder<T> maxInflight(int i) {
        this.maxInflight = i;
        return this;
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder.OneByOneMessagePipelineBuilder
    public ReactiveMessagePipelineBuilder.OneByOneMessagePipelineBuilder<T> handlingTimeout(Duration duration) {
        this.handlingTimeout = duration;
        return this;
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder
    public ReactiveMessagePipelineBuilder<T> pipelineRetrySpec(Retry retry) {
        this.pipelineRetrySpec = retry;
        return this;
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder
    public ReactiveMessagePipelineBuilder<T> transformPipeline(Function<Mono<Void>, Publisher<Void>> function) {
        this.transformer = function;
        return this;
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder
    public ReactiveMessagePipeline build() {
        if (this.messageHandler != null && this.streamingMessageHandler != null) {
            throw new IllegalStateException("messageHandler and streamingMessageHandler cannot be set at the same time.");
        }
        if (this.messageHandler == null && this.streamingMessageHandler == null) {
            throw new NullPointerException("messageHandler or streamingMessageHandler must be set.");
        }
        return new DefaultReactiveMessagePipeline(this.messageConsumer, this.messageHandler, this.errorLogger, this.pipelineRetrySpec, this.handlingTimeout, this.transformer, this.streamingMessageHandler, this.groupingFunction, this.concurrency, this.maxInflight);
    }

    static {
        Iterator it = ServiceLoader.load(MessageGroupingFunction.class).iterator();
        if (it.hasNext()) {
            KEY_ORDERED_GROUPING_FUNCTION = (MessageGroupingFunction) it.next();
        } else {
            KEY_ORDERED_GROUPING_FUNCTION = null;
        }
    }
}
