package org.springframework.pulsar.reader;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.pulsar.core.PulsarReaderFactory;
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.event.ReaderFailedToStartEvent;
import org.springframework.pulsar.event.ReaderStartedEvent;
import org.springframework.pulsar.event.ReaderStartingEvent;
import org.springframework.scheduling.SchedulingAwareRunnable;

/* loaded from: input_file:org/springframework/pulsar/reader/DefaultPulsarMessageReaderContainer.class */
public class DefaultPulsarMessageReaderContainer<T> extends AbstractPulsarMessageReaderContainer<T> {
    private final AtomicReference<DefaultPulsarMessageReaderContainer<T>.InternalAsyncReader> internalAsyncReader;
    private volatile CountDownLatch startLatch;
    private volatile CompletableFuture<?> readerFuture;
    private final AbstractPulsarMessageReaderContainer<?> thisOrParentContainer;
    private final AtomicReference<Thread> readerThread;

    /* loaded from: input_file:org/springframework/pulsar/reader/DefaultPulsarMessageReaderContainer$InternalAsyncReader.class */
    private final class InternalAsyncReader implements SchedulingAwareRunnable {
        private final ReaderListener<T> listener;
        private final PulsarReaderContainerProperties readerContainerProperties;
        private Reader<T> reader;
        private final ReaderBuilderCustomizer<T> readerBuilderCustomizer;

        InternalAsyncReader(ReaderListener<T> readerListener, PulsarReaderContainerProperties pulsarReaderContainerProperties) {
            this.listener = readerListener;
            this.readerContainerProperties = pulsarReaderContainerProperties;
            this.readerBuilderCustomizer = DefaultPulsarMessageReaderContainer.this.getReaderBuilderCustomizer();
            try {
                this.reader = DefaultPulsarMessageReaderContainer.this.getPulsarReaderFactory().createReader(pulsarReaderContainerProperties.getTopics(), pulsarReaderContainerProperties.getStartMessageId(), pulsarReaderContainerProperties.getSchema(), this.readerBuilderCustomizer != null ? List.of(this.readerBuilderCustomizer) : Collections.emptyList());
            } catch (PulsarClientException e) {
                throw new IllegalStateException("Pulsar client exceptions.", e);
            }
        }

        public boolean isLongLived() {
            return true;
        }

        public void run() {
            DefaultPulsarMessageReaderContainer.this.readerThread.set(Thread.currentThread());
            DefaultPulsarMessageReaderContainer.this.publishReaderStartingEvent();
            DefaultPulsarMessageReaderContainer.this.publishReaderStartedEvent();
            while (DefaultPulsarMessageReaderContainer.this.isRunning()) {
                try {
                    this.listener.received(this.reader, this.reader.readNext());
                } catch (PulsarClientException e) {
                    DefaultPulsarMessageReaderContainer.this.logger.error(e, () -> {
                        return "Error receiving messages.";
                    });
                }
            }
        }
    }

    public DefaultPulsarMessageReaderContainer(PulsarReaderFactory<? super T> pulsarReaderFactory, PulsarReaderContainerProperties pulsarReaderContainerProperties) {
        super(pulsarReaderFactory, pulsarReaderContainerProperties);
        this.internalAsyncReader = new AtomicReference<>();
        this.startLatch = new CountDownLatch(1);
        this.readerThread = new AtomicReference<>();
        this.thisOrParentContainer = this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.pulsar.core.AbstractPulsarMessageContainer
    public void doStart() {
        PulsarReaderContainerProperties containerProperties = getContainerProperties();
        Object readerListener = containerProperties.getReaderListener();
        AsyncTaskExecutor readerTaskExecutor = containerProperties.getReaderTaskExecutor();
        ReaderListener readerListener2 = (ReaderListener) readerListener;
        if (readerTaskExecutor == null) {
            readerTaskExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");
            containerProperties.setReaderTaskExecutor(readerTaskExecutor);
        }
        this.internalAsyncReader.set(new InternalAsyncReader(readerListener2, containerProperties));
        setRunning(true);
        this.startLatch = new CountDownLatch(1);
        this.readerFuture = readerTaskExecutor.submitCompletable((Runnable) this.internalAsyncReader.get());
        try {
            if (!this.startLatch.await(containerProperties.getReaderStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.error("Reader thread failed to start - does the configured task executor have enough threads to support all containers and concurrency?");
                publishReaderFailedToStart();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.pulsar.core.AbstractPulsarMessageContainer
    public void doStop() {
        setRunning(false);
        try {
            this.logger.info("Closing this consumer.");
            ((InternalAsyncReader) this.internalAsyncReader.get()).reader.close();
        } catch (IOException e) {
            this.logger.error(e, () -> {
                return "Error closing Pulsar Client.";
            });
        }
    }

    private void publishReaderStartingEvent() {
        this.startLatch.countDown();
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent(new ReaderStartingEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishReaderStartedEvent() {
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent(new ReaderStartedEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishReaderFailedToStart() {
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent(new ReaderFailedToStartEvent(this, this.thisOrParentContainer));
        }
    }
}
