package io.smallrye.reactive.messaging.aws.sqs;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.aws.sqs.ack.SqsDeleteAckHandler;
import io.smallrye.reactive.messaging.aws.sqs.ack.SqsNothingAckHandler;
import io.smallrye.reactive.messaging.aws.sqs.i18n.AwsSqsLogging;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.helpers.PausablePollingStream;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.eclipse.microprofile.reactive.messaging.Message;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.SqsException;

/* loaded from: input_file:io/smallrye/reactive/messaging/aws/sqs/SqsInboundChannel.class */
public class SqsInboundChannel {
    private final String channel;
    private final SqsAsyncClient client;
    private final Context context;
    private final Uni<String> queueUrlUni;
    private final Flow.Publisher<? extends Message<?>> stream;
    private final int waitTimeSeconds;
    private final int maxNumberOfMessages;
    private final SqsReceiveMessageRequestCustomizer customizer;
    private final long retries;
    private final boolean healthEnabled;
    private final List<String> messageAttributeNames;
    private final Integer visibilityTimeout;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final List<Throwable> failures = new ArrayList();
    private final ScheduledExecutorService requestExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
        return new Thread(runnable, "smallrye-aws-sqs-request-thread-" + this.channel);
    });

    public SqsInboundChannel(SqsConnectorIncomingConfiguration sqsConnectorIncomingConfiguration, Vertx vertx, SqsManager sqsManager, SqsReceiveMessageRequestCustomizer sqsReceiveMessageRequestCustomizer, JsonMapping jsonMapping) {
        this.channel = sqsConnectorIncomingConfiguration.getChannel();
        this.healthEnabled = sqsConnectorIncomingConfiguration.getHealthEnabled().booleanValue();
        this.retries = sqsConnectorIncomingConfiguration.getReceiveRequestRetries().longValue();
        this.client = sqsManager.getClient(sqsConnectorIncomingConfiguration);
        this.queueUrlUni = sqsManager.getQueueUrl(sqsConnectorIncomingConfiguration).memoize().indefinitely();
        this.context = Context.newInstance(vertx.getDelegate().createEventLoopContext());
        this.waitTimeSeconds = sqsConnectorIncomingConfiguration.getWaitTimeSeconds().intValue();
        this.visibilityTimeout = sqsConnectorIncomingConfiguration.getVisibilityTimeout().orElse(null);
        this.maxNumberOfMessages = sqsConnectorIncomingConfiguration.getMaxNumberOfMessages().intValue();
        this.messageAttributeNames = getMessageAttributeNames(sqsConnectorIncomingConfiguration);
        this.customizer = sqsReceiveMessageRequestCustomizer;
        SqsAckHandler sqsDeleteAckHandler = sqsConnectorIncomingConfiguration.getAckDelete().booleanValue() ? new SqsDeleteAckHandler(this.client, this.queueUrlUni) : new SqsNothingAckHandler();
        PausablePollingStream pausablePollingStream = new PausablePollingStream(this.channel, request(null, 0), (list, processor) -> {
            if (list != null) {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    processor.onNext((software.amazon.awssdk.services.sqs.model.Message) it.next());
                }
            }
        }, this.requestExecutor, this.maxNumberOfMessages * 2, sqsConnectorIncomingConfiguration.getReceiveRequestPauseResume().booleanValue());
        this.stream = Multi.createFrom().deferred(() -> {
            return this.queueUrlUni.onItem().transformToMulti(str -> {
                return pausablePollingStream.getStream();
            });
        }).emitOn(runnable -> {
            this.context.runOnContext(runnable);
        }).onItem().transform(message -> {
            return new SqsMessage(message, jsonMapping, sqsDeleteAckHandler);
        }).onFailure().invoke(th -> {
            AwsSqsLogging.log.errorReceivingMessage(this.channel, th);
            reportFailure(th, false);
        });
    }

    private List<String> getMessageAttributeNames(SqsConnectorIncomingConfiguration sqsConnectorIncomingConfiguration) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(SqsConnector.CLASS_NAME_ATTRIBUTE);
        sqsConnectorIncomingConfiguration.getReceiveRequestMessageAttributeNames().ifPresent(str -> {
            arrayList.addAll(Arrays.asList(str.split(",")));
        });
        return arrayList;
    }

    public synchronized void reportFailure(Throwable th, boolean z) {
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(th);
        if (z) {
            close();
        }
    }

    public Uni<List<software.amazon.awssdk.services.sqs.model.Message>> request(String str, int i) {
        return this.queueUrlUni.map(str2 -> {
            ReceiveMessageRequest.Builder maxNumberOfMessages = ReceiveMessageRequest.builder().queueUrl(str2).messageAttributeNames(this.messageAttributeNames).waitTimeSeconds(Integer.valueOf(this.waitTimeSeconds)).maxNumberOfMessages(Integer.valueOf(this.maxNumberOfMessages));
            if (str != null) {
                maxNumberOfMessages.receiveRequestAttemptId(str);
            }
            if (this.visibilityTimeout != null) {
                maxNumberOfMessages.visibilityTimeout(this.visibilityTimeout);
            }
            if (this.customizer != null) {
                this.customizer.customize(maxNumberOfMessages);
            }
            return maxNumberOfMessages;
        }).chain(builder -> {
            return Uni.createFrom().completionStage(() -> {
                return this.client.receiveMessage((ReceiveMessageRequest) builder.build());
            });
        }).onItem().transform(receiveMessageResponse -> {
            List messages = receiveMessageResponse.messages();
            if (messages == null || messages.isEmpty()) {
                AwsSqsLogging.log.receivedEmptyMessage();
                return null;
            }
            if (AwsSqsLogging.log.isTraceEnabled()) {
                messages.forEach(message -> {
                    AwsSqsLogging.log.receivedMessage(message.body());
                });
            }
            return messages;
        }).onFailure(th -> {
            return (th instanceof SqsException) && ((SqsException) th).retryable();
        }).recoverWithUni(th2 -> {
            return ((long) i) < this.retries ? request(((SqsException) th2).requestId(), i + 1) : Uni.createFrom().failure(th2);
        });
    }

    public Flow.Publisher<? extends Message<?>> getStream() {
        return this.stream;
    }

    public void close() {
        this.closed.set(true);
        this.requestExecutor.shutdown();
    }

    public void isAlive(HealthReport.HealthReportBuilder healthReportBuilder) {
        ArrayList arrayList;
        if (this.healthEnabled) {
            synchronized (this) {
                arrayList = new ArrayList(this.failures);
            }
            if (arrayList.isEmpty()) {
                healthReportBuilder.add(this.channel, true);
            } else {
                healthReportBuilder.add(this.channel, false, (String) arrayList.stream().map((v0) -> {
                    return v0.getMessage();
                }).collect(Collectors.joining()));
            }
        }
    }
}
