package org.springframework.pulsar.test.support;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.PulsarException;
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/pulsar/test/support/PulsarConsumerTestUtil.class */
public final class PulsarConsumerTestUtil<T> implements TopicSpec<T>, SchemaSpec<T>, ConditionsSpec<T> {
    private static final LogAccessor LOG = new LogAccessor(PulsarConsumerTestUtil.class);
    private final PulsarClient locallyCreatedPulsarClient;
    private final PulsarConsumerFactory<T> consumerFactory;
    private ConsumedMessagesCondition<T> condition;
    private Schema<T> schema;
    private List<String> topics;
    private Duration timeout = Duration.ofSeconds(30);
    private boolean untilMethodAlreadyCalled = false;

    public static <T> TopicSpec<T> consumeMessages() {
        return PulsarTestContainerSupport.isContainerStarted() ? consumeMessages(PulsarTestContainerSupport.getPulsarBrokerUrl()) : consumeMessages("pulsar://localhost:6650");
    }

    public static <T> TopicSpec<T> consumeMessages(String str) {
        Assert.notNull(str, "url must not be null");
        try {
            PulsarClient build = PulsarClient.builder().serviceUrl(str).build();
            return consumeMessagesInternal(build, new DefaultPulsarConsumerFactory(build, List.of()));
        } catch (PulsarClientException e) {
            throw new PulsarException(e);
        }
    }

    public static <T> TopicSpec<T> consumeMessages(PulsarClient pulsarClient) {
        Assert.notNull(pulsarClient, "pulsarClient must not be null");
        return consumeMessagesInternal(null, new DefaultPulsarConsumerFactory(pulsarClient, List.of()));
    }

    public static <T> TopicSpec<T> consumeMessages(PulsarConsumerFactory<T> pulsarConsumerFactory) {
        return consumeMessagesInternal(null, pulsarConsumerFactory);
    }

    private static <T> TopicSpec<T> consumeMessagesInternal(PulsarClient pulsarClient, PulsarConsumerFactory<T> pulsarConsumerFactory) {
        return new PulsarConsumerTestUtil(pulsarClient, pulsarConsumerFactory);
    }

    private PulsarConsumerTestUtil(@Nullable PulsarClient pulsarClient, PulsarConsumerFactory<T> pulsarConsumerFactory) {
        Assert.notNull(pulsarConsumerFactory, "PulsarConsumerFactory must not be null");
        this.consumerFactory = pulsarConsumerFactory;
        this.locallyCreatedPulsarClient = pulsarClient;
    }

    @Override // org.springframework.pulsar.test.support.TopicSpec
    public SchemaSpec<T> fromTopic(String str) {
        Assert.notNull(str, "Topic must not be null");
        this.topics = List.of(str);
        return this;
    }

    @Override // org.springframework.pulsar.test.support.SchemaSpec
    public ConditionsSpec<T> withSchema(Schema<T> schema) {
        Assert.notNull(schema, "Schema must not be null");
        this.schema = schema;
        return this;
    }

    @Override // org.springframework.pulsar.test.support.ConditionsSpec
    public ConditionsSpec<T> awaitAtMost(Duration duration) {
        Assert.notNull(duration, "Timeout must not be null");
        this.timeout = duration;
        return this;
    }

    @Override // org.springframework.pulsar.test.support.ConditionsSpec
    public ConditionsSpec<T> until(ConsumedMessagesCondition<T> consumedMessagesCondition) {
        if (this.untilMethodAlreadyCalled) {
            throw new IllegalStateException("Multiple calls to 'until' are not allowed. Use 'and' to combine conditions.");
        }
        this.untilMethodAlreadyCalled = true;
        this.condition = consumedMessagesCondition;
        return this;
    }

    @Override // org.springframework.pulsar.test.support.ConditionsSpec
    public List<Message<T>> get() {
        ArrayList arrayList = new ArrayList();
        try {
            try {
                Consumer createConsumer = this.consumerFactory.createConsumer(this.schema, this.topics, "test-consumer-%s".formatted(UUID.randomUUID()), consumerBuilder -> {
                    consumerBuilder.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
                });
                try {
                    long millis = this.timeout.toMillis();
                    do {
                        long currentTimeMillis = System.currentTimeMillis();
                        Message receive = createConsumer.receive(200, TimeUnit.MILLISECONDS);
                        if (receive != null) {
                            arrayList.add(receive);
                            createConsumer.acknowledge(receive);
                        }
                        if (this.condition != null && this.condition.meets(arrayList)) {
                            if (createConsumer != null) {
                                createConsumer.close();
                            }
                            return arrayList;
                        }
                        millis -= System.currentTimeMillis() - currentTimeMillis;
                    } while (millis > 0);
                    if (createConsumer != null) {
                        createConsumer.close();
                    }
                    if (this.locallyCreatedPulsarClient != null && !this.locallyCreatedPulsarClient.isClosed()) {
                        try {
                            this.locallyCreatedPulsarClient.close();
                        } catch (PulsarClientException e) {
                            LOG.error(e, () -> {
                                return "Failed to close locally created Pulsar client due to: " + e.getMessage();
                            });
                        }
                    }
                    if (this.condition == null || this.condition.meets(arrayList)) {
                        return arrayList;
                    }
                    throw new ConditionTimeoutException("Condition was not met within %d seconds".formatted(Long.valueOf(this.timeout.toSeconds())));
                } catch (Throwable th) {
                    if (createConsumer != null) {
                        try {
                            createConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (PulsarClientException e2) {
                throw new PulsarException(e2);
            }
        } finally {
            if (this.locallyCreatedPulsarClient != null && !this.locallyCreatedPulsarClient.isClosed()) {
                try {
                    this.locallyCreatedPulsarClient.close();
                } catch (PulsarClientException e3) {
                    LOG.error(e3, () -> {
                        return "Failed to close locally created Pulsar client due to: " + e3.getMessage();
                    });
                }
            }
        }
    }
}
