/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.reader;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.pulsar.core.PulsarReaderFactory;
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.event.ReaderFailedToStartEvent;
import org.springframework.pulsar.event.ReaderStartedEvent;
import org.springframework.pulsar.event.ReaderStartingEvent;
import org.springframework.pulsar.reader.AbstractPulsarMessageReaderContainer;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
import org.springframework.scheduling.SchedulingAwareRunnable;

public class DefaultPulsarMessageReaderContainer<T>
extends AbstractPulsarMessageReaderContainer<T> {
    private final AtomicReference<InternalAsyncReader> internalAsyncReader = new AtomicReference();
    private volatile CountDownLatch startLatch = new CountDownLatch(1);
    private volatile CompletableFuture<?> readerFuture;
    private final AbstractPulsarMessageReaderContainer<?> thisOrParentContainer;
    private final AtomicReference<Thread> readerThread = new AtomicReference();

    public DefaultPulsarMessageReaderContainer(PulsarReaderFactory<? super T> pulsarReaderFactory, PulsarReaderContainerProperties pulsarReaderContainerProperties) {
        super(pulsarReaderFactory, pulsarReaderContainerProperties);
        this.thisOrParentContainer = this;
    }

    @Override
    protected void doStart() {
        PulsarReaderContainerProperties containerProperties = this.getContainerProperties();
        Object readerListenerObject = containerProperties.getReaderListener();
        AsyncTaskExecutor readerExecutor = containerProperties.getReaderTaskExecutor();
        ReaderListener readerListener = (ReaderListener)readerListenerObject;
        if (readerExecutor == null) {
            readerExecutor = new SimpleAsyncTaskExecutor((this.getBeanName() == null ? "" : this.getBeanName()) + "-C-");
            containerProperties.setReaderTaskExecutor(readerExecutor);
        }
        this.internalAsyncReader.set(new InternalAsyncReader(readerListener, containerProperties));
        this.setRunning(true);
        this.startLatch = new CountDownLatch(1);
        this.readerFuture = readerExecutor.submitCompletable((Runnable)((Object)this.internalAsyncReader.get()));
        try {
            if (!this.startLatch.await(containerProperties.getReaderStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.error((CharSequence)"Reader thread failed to start - does the configured task executor have enough threads to support all containers and concurrency?");
                this.publishReaderFailedToStart();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    protected void doStop() {
        this.setRunning(false);
        try {
            this.logger.info((CharSequence)"Closing this consumer.");
            this.internalAsyncReader.get().reader.close();
        }
        catch (IOException e) {
            this.logger.error((Throwable)e, () -> "Error closing Pulsar Client.");
        }
    }

    private void publishReaderStartingEvent() {
        this.startLatch.countDown();
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ReaderStartingEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishReaderStartedEvent() {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ReaderStartedEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishReaderFailedToStart() {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ReaderFailedToStartEvent(this, this.thisOrParentContainer));
        }
    }

    private final class InternalAsyncReader
    implements SchedulingAwareRunnable {
        private final ReaderListener<T> listener;
        private final PulsarReaderContainerProperties readerContainerProperties;
        private Reader<T> reader;
        private final ReaderBuilderCustomizer<T> readerBuilderCustomizer;

        InternalAsyncReader(ReaderListener<T> readerListener, PulsarReaderContainerProperties readerContainerProperties) {
            this.listener = readerListener;
            this.readerContainerProperties = readerContainerProperties;
            this.readerBuilderCustomizer = DefaultPulsarMessageReaderContainer.this.getReaderBuilderCustomizer();
            try {
                List customizers = this.readerBuilderCustomizer != null ? List.of(this.readerBuilderCustomizer) : Collections.emptyList();
                this.reader = DefaultPulsarMessageReaderContainer.this.getPulsarReaderFactory().createReader(readerContainerProperties.getTopics(), readerContainerProperties.getStartMessageId(), readerContainerProperties.getSchema(), customizers);
            }
            catch (PulsarClientException e) {
                throw new IllegalStateException("Pulsar client exceptions.", e);
            }
        }

        public boolean isLongLived() {
            return true;
        }

        public void run() {
            DefaultPulsarMessageReaderContainer.this.readerThread.set(Thread.currentThread());
            DefaultPulsarMessageReaderContainer.this.publishReaderStartingEvent();
            DefaultPulsarMessageReaderContainer.this.publishReaderStartedEvent();
            while (DefaultPulsarMessageReaderContainer.this.isRunning()) {
                try {
                    Message message = this.reader.readNext();
                    this.listener.received(this.reader, message);
                }
                catch (PulsarClientException e) {
                    DefaultPulsarMessageReaderContainer.this.logger.error((Throwable)e, () -> "Error receiving messages.");
                }
            }
        }
    }
}

