package io.awspring.cloud.sqs.listener.source;

import io.awspring.cloud.sqs.ConfigUtils;
import io.awspring.cloud.sqs.listener.BackPressureHandler;
import io.awspring.cloud.sqs.listener.BatchAwareBackPressureHandler;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.IdentifiableContainerComponent;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.TaskExecutorAware;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.ExecutingAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.sink.MessageSink;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.BackOffContext;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.class */
public abstract class AbstractPollingMessageSource<T, S> extends AbstractMessageConvertingMessageSource<T, S> implements PollingMessageSource<T>, IdentifiableContainerComponent {
    private static final Logger logger = LoggerFactory.getLogger(AbstractPollingMessageSource.class);
    private String pollingEndpointName;
    private Duration shutdownTimeout;
    private BackOffPolicy pollBackOffPolicy;
    private TaskExecutor taskExecutor;
    private BatchAwareBackPressureHandler backPressureHandler;
    private AcknowledgementProcessor<T> acknowledgmentProcessor;
    private MessageSink<T> messageSink;
    private volatile boolean running;
    private String id;
    private AsyncAcknowledgementResultCallback<T> acknowledgementResultCallback;
    private AtomicReference<BackOffContext> pollBackOffContext = new AtomicReference<>();
    private final Object lifecycleMonitor = new Object();
    private final Collection<CompletableFuture<?>> pollingFutures = Collections.synchronizedCollection(new ArrayList());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource$NoOpsBackOffContext.class */
    public static class NoOpsBackOffContext implements BackOffContext {
        private NoOpsBackOffContext() {
        }
    }

    @Override // io.awspring.cloud.sqs.listener.source.AbstractMessageConvertingMessageSource
    protected void configureMessageSource(ContainerOptions<?, ?> containerOptions) {
        this.shutdownTimeout = containerOptions.getListenerShutdownTimeout();
        this.pollBackOffPolicy = containerOptions.getPollBackOffPolicy();
        doConfigure(containerOptions);
    }

    protected void doConfigure(ContainerOptions<?, ?> containerOptions) {
    }

    @Override // io.awspring.cloud.sqs.listener.IdentifiableContainerComponent
    public void setId(String str) {
        Assert.notNull(str, "id cannot be null");
        this.id = str;
    }

    @Override // io.awspring.cloud.sqs.listener.source.PollingMessageSource
    public void setPollingEndpointName(String str) {
        Assert.isTrue(StringUtils.hasText(str), "pollingEndpointName must have text");
        this.pollingEndpointName = str;
    }

    @Override // io.awspring.cloud.sqs.listener.source.PollingMessageSource
    public void setBackPressureHandler(BackPressureHandler backPressureHandler) {
        Assert.notNull(backPressureHandler, "backPressureHandler cannot be null");
        Assert.isInstanceOf(BatchAwareBackPressureHandler.class, backPressureHandler, getClass().getSimpleName() + " requires a " + BatchAwareBackPressureHandler.class);
        this.backPressureHandler = (BatchAwareBackPressureHandler) backPressureHandler;
    }

    @Override // io.awspring.cloud.sqs.listener.source.AcknowledgementProcessingMessageSource
    public void setAcknowledgementProcessor(AcknowledgementProcessor<T> acknowledgementProcessor) {
        Assert.notNull(acknowledgementProcessor, "acknowledgementProcessor cannot be null");
        this.acknowledgmentProcessor = acknowledgementProcessor;
    }

    @Override // io.awspring.cloud.sqs.listener.source.AcknowledgementProcessingMessageSource
    public void setAcknowledgementResultCallback(AsyncAcknowledgementResultCallback<T> asyncAcknowledgementResultCallback) {
        Assert.notNull(asyncAcknowledgementResultCallback, "acknowledgementResultCallback must not be null");
        this.acknowledgementResultCallback = asyncAcknowledgementResultCallback;
    }

    @Override // io.awspring.cloud.sqs.listener.TaskExecutorAware
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        Assert.notNull(taskExecutor, "taskExecutor cannot be null");
        this.taskExecutor = taskExecutor;
    }

    @Override // io.awspring.cloud.sqs.listener.source.MessageSource
    public void setMessageSink(MessageSink<T> messageSink) {
        Assert.notNull(messageSink, "messageSink cannot be null");
        this.messageSink = messageSink;
    }

    @Override // io.awspring.cloud.sqs.listener.IdentifiableContainerComponent
    public String getId() {
        return this.id;
    }

    public boolean isRunning() {
        return this.running;
    }

    public void start() {
        if (isRunning()) {
            logger.debug("{} for queue {} already running", getClass().getSimpleName(), this.pollingEndpointName);
            return;
        }
        synchronized (this.lifecycleMonitor) {
            Assert.notNull(this.id, "id not set");
            Assert.notNull(this.messageSink, "messageSink not set");
            Assert.notNull(this.backPressureHandler, "backPressureHandler not set");
            Assert.notNull(this.acknowledgmentProcessor, "acknowledgmentProcessor not set");
            Assert.notNull(this.pollBackOffPolicy, "pollBackOffPolicy not set");
            logger.debug("Starting {} for queue {}", getClass().getSimpleName(), this.pollingEndpointName);
            this.running = true;
            ConfigUtils.INSTANCE.acceptIfInstance(this.backPressureHandler, IdentifiableContainerComponent.class, identifiableContainerComponent -> {
                identifiableContainerComponent.setId(this.id);
            }).acceptIfInstance(this.acknowledgmentProcessor, IdentifiableContainerComponent.class, identifiableContainerComponent2 -> {
                identifiableContainerComponent2.setId(this.id);
            }).acceptIfInstance(this.acknowledgmentProcessor, ExecutingAcknowledgementProcessor.class, executingAcknowledgementProcessor -> {
                executingAcknowledgementProcessor.setAcknowledgementResultCallback(this.acknowledgementResultCallback);
            }).acceptIfInstance(this.acknowledgmentProcessor, TaskExecutorAware.class, taskExecutorAware -> {
                taskExecutorAware.setTaskExecutor(this.taskExecutor);
            });
            doStart();
            setupAcknowledgementForConversion(this.acknowledgmentProcessor.getAcknowledgementCallback());
            this.acknowledgmentProcessor.start();
            startPollingThread();
        }
    }

    protected void doStart() {
    }

    private void startPollingThread() {
        this.taskExecutor.execute(this::pollAndEmitMessages);
    }

    private void pollAndEmitMessages() {
        while (isRunning()) {
            try {
                if (isRunning()) {
                    handlePollBackOff();
                    logger.trace("Requesting permits for queue {}", this.pollingEndpointName);
                    int requestBatch = this.backPressureHandler.requestBatch();
                    if (requestBatch == 0) {
                        logger.trace("No permits acquired for queue {}", this.pollingEndpointName);
                    } else {
                        logger.trace("{} permits acquired for queue {}", Integer.valueOf(requestBatch), this.pollingEndpointName);
                        if (isRunning()) {
                            managePollingFuture(doPollForMessages(requestBatch)).thenApply((Function<? super F, ? extends U>) this::resetBackOffContext).exceptionally((Function) this::handlePollingException).thenApply((Function) this::convertMessages).thenApply((Function<? super U, ? extends U>) collection -> {
                                return releaseUnusedPermits(requestBatch, collection);
                            }).thenCompose(this::emitMessagesToPipeline).exceptionally((Function) this::handleSinkException);
                        } else {
                            logger.debug("MessageSource was stopped after permits where acquired. Returning {} permits", Integer.valueOf(requestBatch));
                            this.backPressureHandler.release(requestBatch);
                        }
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("MessageSource thread interrupted for endpoint " + this.pollingEndpointName, e);
            } catch (Exception e2) {
                logger.error("Error in MessageSource for queue {}. Resuming", this.pollingEndpointName, e2);
            }
        }
        logger.debug("Execution thread stopped for queue {}", this.pollingEndpointName);
    }

    private void handlePollBackOff() {
        BackOffContext backOffContext = this.pollBackOffContext.get();
        if (backOffContext == null) {
            return;
        }
        logger.trace("Back off context found, backing off");
        this.pollBackOffPolicy.backOff(backOffContext);
        logger.trace("Resuming from back off");
    }

    protected abstract CompletableFuture<Collection<S>> doPollForMessages(int i);

    public Collection<Message<T>> releaseUnusedPermits(int i, Collection<Message<T>> collection) {
        if (collection.isEmpty() && i == this.backPressureHandler.getBatchSize()) {
            this.backPressureHandler.releaseBatch();
            logger.trace("Released batch of unused permits for queue {}", this.pollingEndpointName);
        } else {
            int size = i - collection.size();
            this.backPressureHandler.release(size);
            logger.trace("Released {} unused permits for queue {}", Integer.valueOf(size), this.pollingEndpointName);
        }
        return collection;
    }

    private CompletableFuture<Void> emitMessagesToPipeline(Collection<Message<T>> collection) {
        return collection.isEmpty() ? CompletableFuture.completedFuture(null) : this.messageSink.emit(collection, createContext());
    }

    protected MessageProcessingContext<T> createContext() {
        return MessageProcessingContext.create().setBackPressureReleaseCallback(this::releaseBackPressure).setAcknowledgmentCallback(getAcknowledgementCallback());
    }

    protected AcknowledgementCallback<T> getAcknowledgementCallback() {
        return this.acknowledgmentProcessor.getAcknowledgementCallback();
    }

    private void releaseBackPressure() {
        logger.debug("Releasing permit for queue {}", this.pollingEndpointName);
        this.backPressureHandler.release(1);
    }

    private Void handleSinkException(Throwable th) {
        logger.error("Error processing message", th instanceof CompletionException ? th.getCause() : th);
        return null;
    }

    private Collection<S> handlePollingException(Throwable th) {
        logger.error("Error polling for messages in queue {}.", this.pollingEndpointName, th);
        if (this.pollBackOffContext.get() == null) {
            logger.trace("Setting back off policy in queue {}", this.pollingEndpointName);
            this.pollBackOffContext.set(createBackOffContext());
        }
        return Collections.emptyList();
    }

    private BackOffContext createBackOffContext() {
        BackOffContext start = this.pollBackOffPolicy.start((RetryContext) null);
        return start != null ? start : new NoOpsBackOffContext();
    }

    private Collection<S> resetBackOffContext(Collection<S> collection) {
        if (this.pollBackOffContext.get() != null) {
            logger.trace("Polling successful, resetting back off context.");
            this.pollBackOffContext.set(null);
        }
        return collection;
    }

    private <F> CompletableFuture<F> managePollingFuture(CompletableFuture<F> completableFuture) {
        this.pollingFutures.add(completableFuture);
        completableFuture.thenRun(() -> {
            this.pollingFutures.remove(completableFuture);
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getPollingEndpointName() {
        return this.pollingEndpointName;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AcknowledgementProcessor<T> getAcknowledgmentProcessor() {
        return this.acknowledgmentProcessor;
    }

    public void stop() {
        if (!isRunning()) {
            logger.debug("{} for queue {} not running", getClass().getSimpleName(), this.pollingEndpointName);
        }
        synchronized (this.lifecycleMonitor) {
            logger.debug("Stopping {} for queue {}", getClass().getSimpleName(), this.pollingEndpointName);
            this.running = false;
            if (!waitExistingTasksToFinish()) {
                logger.warn("Tasks did not finish in {} seconds for queue {}, proceeding with shutdown", Long.valueOf(this.shutdownTimeout.getSeconds()), this.pollingEndpointName);
                this.pollingFutures.forEach(completableFuture -> {
                    completableFuture.cancel(true);
                });
            }
            doStop();
            this.acknowledgmentProcessor.stop();
            logger.debug("{} for queue {} stopped", getClass().getSimpleName(), this.pollingEndpointName);
        }
    }

    protected void doStop() {
    }

    private boolean waitExistingTasksToFinish() {
        if (!this.shutdownTimeout.isZero()) {
            return this.backPressureHandler.drain(this.shutdownTimeout);
        }
        logger.debug("Shutdown timeout set to zero for queue {} - not waiting for tasks to finish", this.pollingEndpointName);
        return false;
    }
}
