package io.smallrye.reactive.messaging.aws.sqs;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.aws.sqs.i18n.AwsSqsLogging;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.eclipse.microprofile.reactive.messaging.Message;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

/* loaded from: input_file:io/smallrye/reactive/messaging/aws/sqs/SqsOutboundChannel.class */
public class SqsOutboundChannel {
    private final SqsAsyncClient client;
    private final String channel;
    private final Uni<String> queueUrlUni;
    private final JsonMapping jsonMapping;
    private final boolean healthEnabled;
    private final String groupId;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final List<Throwable> failures = new ArrayList();
    private final Flow.Subscriber<? extends Message<?>> subscriber = MultiUtils.via(multi -> {
        return multi.onSubscription().call(subscription -> {
            return this.queueUrlUni;
        }).call(message -> {
            return publishMessage(this.client, message);
        }).onFailure().invoke(th -> {
            AwsSqsLogging.log.unableToDispatch(this.channel, th);
            reportFailure(th);
        });
    });

    public SqsOutboundChannel(SqsConnectorOutgoingConfiguration sqsConnectorOutgoingConfiguration, SqsManager sqsManager, JsonMapping jsonMapping) {
        this.channel = sqsConnectorOutgoingConfiguration.getChannel();
        this.healthEnabled = sqsConnectorOutgoingConfiguration.getHealthEnabled().booleanValue();
        this.client = sqsManager.getClient(sqsConnectorOutgoingConfiguration);
        this.queueUrlUni = sqsManager.getQueueUrl(sqsConnectorOutgoingConfiguration).memoize().indefinitely();
        this.groupId = sqsConnectorOutgoingConfiguration.getGroupId().orElse(null);
        this.jsonMapping = jsonMapping;
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber() {
        return this.subscriber;
    }

    private Uni<Void> publishMessage(SqsAsyncClient sqsAsyncClient, Message<?> message) {
        return this.closed.get() ? Uni.createFrom().voidItem() : message.getPayload() == null ? Uni.createFrom().nullItem() : this.queueUrlUni.map(str -> {
            return getSendMessageRequest(str, message);
        }).chain(sendMessageRequest -> {
            return Uni.createFrom().completionStage(() -> {
                return sqsAsyncClient.sendMessage(sendMessageRequest);
            });
        }).invoke(sendMessageResponse -> {
            AwsSqsLogging.log.messageSentToChannel(this.channel, sendMessageResponse.messageId(), sendMessageResponse.sequenceNumber());
        }).onItemOrFailure().transformToUni((sendMessageResponse2, th) -> {
            if (th != null) {
                return Uni.createFrom().completionStage(message.nack(th));
            }
            OutgoingMessageMetadata.setResultOnMessage(message, sendMessageResponse2);
            return Uni.createFrom().completionStage(message.ack());
        });
    }

    private SendMessageRequest getSendMessageRequest(String str, Message<?> message) {
        Object payload = message.getPayload();
        String str2 = str;
        if (payload instanceof SendMessageRequest) {
            return (SendMessageRequest) payload;
        }
        if (payload instanceof SendMessageRequest.Builder) {
            SendMessageRequest.Builder queueUrl = ((SendMessageRequest.Builder) payload).queueUrl(str2);
            if (this.groupId != null) {
                queueUrl.messageGroupId(this.groupId);
            }
            return (SendMessageRequest) queueUrl.build();
        }
        SendMessageRequest.Builder builder = SendMessageRequest.builder();
        HashMap hashMap = new HashMap();
        String str3 = this.groupId;
        Optional metadata = message.getMetadata(SqsOutboundMetadata.class);
        if (metadata.isPresent()) {
            SqsOutboundMetadata sqsOutboundMetadata = (SqsOutboundMetadata) metadata.get();
            if (sqsOutboundMetadata.getQueueUrl() != null) {
                str2 = sqsOutboundMetadata.getQueueUrl();
            }
            if (sqsOutboundMetadata.getDeduplicationId() != null) {
                builder.messageDeduplicationId(sqsOutboundMetadata.getDeduplicationId());
            }
            if (sqsOutboundMetadata.getGroupId() != null) {
                str3 = sqsOutboundMetadata.getGroupId();
            }
            if (sqsOutboundMetadata.getDelaySeconds() != null) {
                builder.delaySeconds(sqsOutboundMetadata.getDelaySeconds());
            }
            if (sqsOutboundMetadata.getMessageAttributes() != null) {
                hashMap.putAll(sqsOutboundMetadata.getMessageAttributes());
            }
        }
        if (!(payload instanceof software.amazon.awssdk.services.sqs.model.Message)) {
            return (SendMessageRequest) builder.queueUrl(str2).messageGroupId(str3).messageAttributes(hashMap).messageBody(outgoingPayloadClassName(payload, hashMap)).build();
        }
        software.amazon.awssdk.services.sqs.model.Message message2 = (software.amazon.awssdk.services.sqs.model.Message) payload;
        if (message2.hasAttributes()) {
            hashMap.putAll(message2.messageAttributes());
        }
        return (SendMessageRequest) builder.queueUrl(str2).messageGroupId(str3).messageAttributes(hashMap).messageBody(message2.body()).build();
    }

    private String outgoingPayloadClassName(Object obj, Map<String, MessageAttributeValue> map) {
        if ((obj instanceof String) || obj.getClass().isPrimitive() || isPrimitiveBoxed(obj.getClass())) {
            map.put(SqsConnector.CLASS_NAME_ATTRIBUTE, (MessageAttributeValue) MessageAttributeValue.builder().dataType("String").stringValue(obj.getClass().getName()).build());
            return String.valueOf(obj);
        }
        if (obj.getClass().isArray() && obj.getClass().getComponentType().equals(Byte.TYPE)) {
            return new String((byte[]) obj);
        }
        if (this.jsonMapping == null) {
            return String.valueOf(obj);
        }
        map.put(SqsConnector.CLASS_NAME_ATTRIBUTE, (MessageAttributeValue) MessageAttributeValue.builder().dataType("String").stringValue(obj.getClass().getName()).build());
        return this.jsonMapping.toJson(obj);
    }

    private boolean isPrimitiveBoxed(Class<?> cls) {
        return cls.equals(Boolean.class) || cls.equals(Integer.class) || cls.equals(Byte.class) || cls.equals(Double.class) || cls.equals(Float.class) || cls.equals(Short.class) || cls.equals(Character.class) || cls.equals(Long.class);
    }

    public void close() {
        this.closed.set(true);
    }

    private synchronized void reportFailure(Throwable th) {
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(th);
    }

    public void isAlive(HealthReport.HealthReportBuilder healthReportBuilder) {
        ArrayList arrayList;
        if (this.healthEnabled) {
            synchronized (this) {
                arrayList = new ArrayList(this.failures);
            }
            if (arrayList.isEmpty()) {
                healthReportBuilder.add(this.channel, true);
            } else {
                healthReportBuilder.add(this.channel, false, (String) arrayList.stream().map((v0) -> {
                    return v0.getMessage();
                }).collect(Collectors.joining()));
            }
        }
    }
}
