package io.awspring.cloud.sqs.listener.acknowledgement;

import io.awspring.cloud.sqs.CollectionUtils;
import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StopWatch;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/acknowledgement/AbstractOrderingAcknowledgementProcessor.class */
public abstract class AbstractOrderingAcknowledgementProcessor<T> implements ExecutingAcknowledgementProcessor<T>, AcknowledgementCallback<T> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractOrderingAcknowledgementProcessor.class);
    private static final String DEFAULT_MESSAGE_GROUP = "default";
    private int maxAcknowledgementsPerBatch;
    private AcknowledgementExecutor<T> acknowledgementExecutor;
    private AcknowledgementOrdering acknowledgementOrdering;
    private boolean running;
    private String id;
    private Function<Message<T>, String> messageGroupingFunction;
    private final Object lifecycleMonitor = new Object();
    private final Lock orderedExecutionLock = new ReentrantLock(true);
    private final Map<String, CompletableFuture<Void>> lastAcknowledgementFutureMap = new ConcurrentHashMap();
    private AsyncAcknowledgementResultCallback<T> acknowledgementResultCallback = new AsyncAcknowledgementResultCallback<T>() { // from class: io.awspring.cloud.sqs.listener.acknowledgement.AbstractOrderingAcknowledgementProcessor.1
    };

    @Override // io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementProcessor
    public AcknowledgementCallback<T> getAcknowledgementCallback() {
        return this;
    }

    @Override // io.awspring.cloud.sqs.listener.ConfigurableContainerComponent
    public void configure(ContainerOptions<?, ?> containerOptions) {
        this.acknowledgementOrdering = containerOptions.getAcknowledgementOrdering();
        doConfigure(containerOptions);
    }

    protected void doConfigure(ContainerOptions<?, ?> containerOptions) {
    }

    @Override // io.awspring.cloud.sqs.listener.acknowledgement.ExecutingAcknowledgementProcessor
    public void setAcknowledgementExecutor(AcknowledgementExecutor<T> acknowledgementExecutor) {
        Assert.notNull(acknowledgementExecutor, "acknowledgementExecutor cannot be null");
        this.acknowledgementExecutor = acknowledgementExecutor;
    }

    @Override // io.awspring.cloud.sqs.listener.acknowledgement.ExecutingAcknowledgementProcessor
    public void setAcknowledgementResultCallback(AsyncAcknowledgementResultCallback<T> asyncAcknowledgementResultCallback) {
        Assert.notNull(asyncAcknowledgementResultCallback, "acknowledgementResultCallback cannot be null");
        this.acknowledgementResultCallback = asyncAcknowledgementResultCallback;
    }

    public void setMaxAcknowledgementsPerBatch(int i) {
        Assert.isTrue(i > 0, "maxAcknowledgementsPerBatch must be greater than zero");
        this.maxAcknowledgementsPerBatch = i;
    }

    public void setMessageGroupingFunction(Function<Message<T>, String> function) {
        Assert.notNull(function, "messageGroupingFunction cannot be null");
        this.messageGroupingFunction = function;
    }

    @Override // io.awspring.cloud.sqs.listener.IdentifiableContainerComponent
    public void setId(String str) {
        Assert.notNull(str, "id cannot be null");
        this.id = str;
    }

    @Override // io.awspring.cloud.sqs.listener.IdentifiableContainerComponent
    public String getId() {
        return this.id;
    }

    public void start() {
        synchronized (this.lifecycleMonitor) {
            Assert.notNull(this.acknowledgementExecutor, "acknowledgementExecutor not set");
            Assert.notNull(this.acknowledgementOrdering, "acknowledgementOrdering not set");
            Assert.notNull(this.id, "id not set");
            logger.debug("Starting {} with ordering {} and batch size {}", new Object[]{this.id, this.acknowledgementOrdering, Integer.valueOf(this.maxAcknowledgementsPerBatch)});
            this.running = true;
            validateAndInitializeMessageGrouping();
            doStart();
        }
    }

    private void validateAndInitializeMessageGrouping() {
        Assert.isTrue(isValidOrderedByGroup() || isValidNotOrderedByGroup(), "Invalid configuration for acknowledgement ordering.");
        if (this.messageGroupingFunction == null) {
            this.messageGroupingFunction = message -> {
                return DEFAULT_MESSAGE_GROUP;
            };
        }
    }

    private boolean isValidOrderedByGroup() {
        return AcknowledgementOrdering.ORDERED_BY_GROUP.equals(this.acknowledgementOrdering) && this.messageGroupingFunction != null;
    }

    private boolean isValidNotOrderedByGroup() {
        return !AcknowledgementOrdering.ORDERED_BY_GROUP.equals(this.acknowledgementOrdering) && this.messageGroupingFunction == null;
    }

    protected void doStart() {
    }

    @Override // io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementCallback
    public CompletableFuture<Void> onAcknowledge(Message<T> message) {
        if (isRunning()) {
            logger.trace("Received message {} to acknowledge.", MessageHeaderUtils.getId((Message<?>) message));
            return doOnAcknowledge(message);
        }
        logger.debug("{} not running, returning for message {}", this.id, MessageHeaderUtils.getId((Message<?>) message));
        return CompletableFuture.completedFuture(null);
    }

    @Override // io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementCallback
    public CompletableFuture<Void> onAcknowledge(Collection<Message<T>> collection) {
        logger.trace("Received messages {} to acknowledge.", MessageHeaderUtils.getId(collection));
        if (isRunning()) {
            return doOnAcknowledge(collection);
        }
        logger.debug("{} not running, returning for messages {}", this.id, MessageHeaderUtils.getId(collection));
        return CompletableFuture.completedFuture(null);
    }

    public void stop() {
        if (!isRunning()) {
            logger.debug("{} already stopped", this.id);
            return;
        }
        synchronized (this.lifecycleMonitor) {
            logger.debug("Stopping {}", this.id);
            this.running = false;
            doStop();
        }
    }

    protected void doStop() {
    }

    public boolean isRunning() {
        return this.running;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Function<Message<T>, String> getMessageGroupingFunction() {
        return this.messageGroupingFunction;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> sendToExecutor(Collection<Message<T>> collection) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        return CompletableFutures.exceptionallyCompose(sendToExecutorParallelOrOrdered(collection), th -> {
            return logAcknowledgementError(collection, th);
        }).whenComplete((BiConsumer) logExecutionTime(collection, stopWatch));
    }

    private BiConsumer<Void, Throwable> logExecutionTime(Collection<Message<T>> collection, StopWatch stopWatch) {
        return (r7, th) -> {
            stopWatch.stop();
            logger.trace("Took {}ms to acknowledge messages {}", Long.valueOf(stopWatch.getTotalTimeMillis()), MessageHeaderUtils.getId(collection));
        };
    }

    private CompletableFuture<Void> sendToExecutorParallelOrOrdered(Collection<Message<T>> collection) {
        return AcknowledgementOrdering.PARALLEL.equals(this.acknowledgementOrdering) ? sendToExecutorParallel(collection) : sendToExecutorOrdered(collection);
    }

    private CompletableFuture<Void> sendToExecutorParallel(Collection<Message<T>> collection) {
        return CompletableFuture.allOf((CompletableFuture[]) partitionMessages(collection).stream().map(this::doSendToExecutor).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    private CompletableFuture<Void> sendToExecutorOrdered(Collection<Message<T>> collection) {
        this.orderedExecutionLock.lock();
        try {
            return CompletableFuture.allOf((CompletableFuture[]) partitionMessages(collection).stream().map(this::doSendToExecutorOrdered).flatMap((v0) -> {
                return v0.stream();
            }).toArray(i -> {
                return new CompletableFuture[i];
            }));
        } finally {
            this.orderedExecutionLock.unlock();
        }
    }

    private Collection<CompletableFuture<Void>> doSendToExecutorOrdered(Collection<Message<T>> collection) {
        return (Collection) ((Map) collection.stream().collect(Collectors.groupingBy(this.messageGroupingFunction))).entrySet().stream().filter(entry -> {
            return ((List) entry.getValue()).size() > 0;
        }).map(entry2 -> {
            return sendGroupToExecutor((String) entry2.getKey(), (List) entry2.getValue());
        }).collect(Collectors.toList());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private CompletableFuture<Void> sendGroupToExecutor(String str, List<Message<T>> list) {
        CompletableFuture thenCompose = this.lastAcknowledgementFutureMap.computeIfAbsent(str, str2 -> {
            return CompletableFuture.completedFuture(null);
        }).exceptionally(th -> {
            return null;
        }).thenCompose(r5 -> {
            return doSendToExecutor(list);
        });
        this.lastAcknowledgementFutureMap.put(str, thenCompose);
        removeCompletedFutures();
        return thenCompose;
    }

    private void removeCompletedFutures() {
        List list = (List) this.lastAcknowledgementFutureMap.entrySet().stream().filter(entry -> {
            return ((CompletableFuture) entry.getValue()).isDone();
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
        logger.trace("Removing completed futures from groups {}", list);
        Map<String, CompletableFuture<Void>> map = this.lastAcknowledgementFutureMap;
        Objects.requireNonNull(map);
        list.forEach((v1) -> {
            r1.remove(v1);
        });
    }

    private CompletableFuture<Void> doSendToExecutor(Collection<Message<T>> collection) {
        return CompletableFutures.handleCompose(this.acknowledgementExecutor.execute(collection), (r6, th) -> {
            return th == null ? executeResultCallback(collection, null) : executeResultCallback(collection, th).thenCompose(r3 -> {
                return CompletableFutures.failedFuture(th);
            });
        });
    }

    private CompletableFuture<Void> executeResultCallback(Collection<Message<T>> collection, Throwable th) {
        return CompletableFutures.exceptionallyCompose(doExecuteResultCallback(collection, th), th2 -> {
            return CompletableFutures.failedFuture(new AcknowledgementResultCallbackException("Error executing acknowledgement result callback", th2));
        });
    }

    private CompletableFuture<Void> doExecuteResultCallback(Collection<Message<T>> collection, Throwable th) {
        logger.trace("Executing result callback for {} in {}", MessageHeaderUtils.getId(collection), this.id);
        return th == null ? this.acknowledgementResultCallback.onSuccess(collection) : this.acknowledgementResultCallback.onFailure(collection, th);
    }

    private CompletableFuture<Void> logAcknowledgementError(Collection<Message<T>> collection, Throwable th) {
        logger.error("Acknowledgement processing has thrown an error for messages {} in {}", new Object[]{MessageHeaderUtils.getId(collection), this.id, th});
        return CompletableFutures.failedFuture(th);
    }

    private Collection<Collection<Message<T>>> partitionMessages(Collection<Message<T>> collection) {
        logger.trace("Partitioning {} messages in {}", Integer.valueOf(collection.size()), this.id);
        return CollectionUtils.partition(collection, this.maxAcknowledgementsPerBatch);
    }

    protected abstract CompletableFuture<Void> doOnAcknowledge(Message<T> message);

    protected abstract CompletableFuture<Void> doOnAcknowledge(Collection<Message<T>> collection);
}
