package io.siddhi.extension.io.sqs.sink;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.sqs.api.SQSBuilder;
import io.siddhi.extension.io.sqs.util.SQSConstants;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.exception.SiddhiAppValidationException;

@Extension(name = "sqs", namespace = "sink", description = "SQS sink allows users to connect and publish messages to an AWS SQS Queue. It has the ability to only publish Text messages", parameters = {@Parameter(name = SQSConstants.QUEUE_URL_NAME, description = "Queue url which SQS Sink should connect to", type = {DataType.STRING}), @Parameter(name = SQSConstants.ACCESS_KEY_NAME, description = "Access Key for the Amazon Web Services. (This is a mandatory field and should be provided either in the deployment.yml or in the sink definition itself)", type = {DataType.STRING}, optional = true, defaultValue = "none"), @Parameter(name = SQSConstants.SECRET_KEY_NAME, description = "Secret Key of the Amazon User. (This is a mandatory field and should be provided either in the deployment.yml or in the sink definition itself)", type = {DataType.STRING}, optional = true, defaultValue = "none"), @Parameter(name = "region", description = "Amazon Web Service Region", type = {DataType.STRING}), @Parameter(name = SQSConstants.MESSAGE_GROUP_ID_NAME, description = "ID of the group that the message belong to(only applicable for FIFO Queues)", type = {DataType.STRING}, optional = true, dynamic = true, defaultValue = "null"), @Parameter(name = SQSConstants.DEDUPLICATION_ID_NAME, description = "ID by which a FIFO queue identifies the duplication in the queue(only applicable for FIFO queues)", type = {DataType.STRING}, optional = true, dynamic = true, defaultValue = "null"), @Parameter(name = SQSConstants.DELAY_INTERVAL_NAME, description = "Time in seconds for how long the message remain in the queue until it is available for the consumers to consume.", type = {DataType.INT}, optional = true, defaultValue = "-1")}, examples = {@Example(syntax = "@sink(type='sqs',queue='https://amazon.sqs.queue.url',access.key='aws.access.key',secret.key='aws.secret.key',region='us-east-1',delay.interval='5',message.group.id='group-1',@map(type='xml') )define stream outStream(symbol string, deduplicationID string);", description = "Above example demonstrate how an SQS sink is getting configured in order to publish messages to a SQS queue.\nOnce an event is received by outStream, an xml message will be generated by 'xml' mapper from the attribute values of the event. Then SQS sink will connect to the queue using provided configurations and send the message to the queue.\n"), @Example(syntax = "@sink(type='sqs',queue='https://amazon.sqs.queue.fifo',access.key='aws.access.key',secret.key='aws.secret.key',region='us-east-1',delay.interval='5',deduplication.id='{{deduplicationID}}',message.group.id='group-1',@map(type='xml') )define stream outStream(symbol string, deduplicationID string);", description = "Above example demonstrate how an SQS sink is getting configured in order to publish messages to a SQS FIFO queue.\nOnce an event is received by outStream, an xml message will be generated by 'xml' mapper from the attribute values of the event. SQS sink will connect to the queue using provided configurations and send the messages to the queue.\nFor each message deduplciation id will be selected from the attriibute 'deduplicationID' in the outStream.\n")})
/* loaded from: input_file:io/siddhi/extension/io/sqs/sink/SQSSink.class */
public class SQSSink extends Sink {
    private SQSSinkConfig sinkConfig;
    private SQSMessagePublisher sqsMessagePublisher;
    private OptionHolder optionHolder;

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{SQSConstants.MESSAGE_GROUP_ID_NAME, SQSConstants.DEDUPLICATION_ID_NAME};
    }

    protected StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sinkConfig = new SQSSinkConfig(optionHolder);
        this.optionHolder = optionHolder;
        this.sqsMessagePublisher = null;
        if (this.sinkConfig.getAccessKey() == null || this.sinkConfig.getAccessKey().isEmpty()) {
            this.sinkConfig.setAccessKey(configReader.readConfig(SQSConstants.ACCESS_KEY_NAME, (String) null));
        }
        if (this.sinkConfig.getSecretKey() == null || this.sinkConfig.getSecretKey().isEmpty()) {
            this.sinkConfig.setSecretKey(configReader.readConfig(SQSConstants.SECRET_KEY_NAME, (String) null));
        }
        if (this.sinkConfig.getAccessKey() == null || this.sinkConfig.getSecretKey() == null || this.sinkConfig.getAccessKey().isEmpty() || this.sinkConfig.getSecretKey().isEmpty()) {
            throw new SiddhiAppValidationException("Access key and Secret key are mandatory parameters for the SQS client");
        }
        return null;
    }

    public void publish(Object obj, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        this.sqsMessagePublisher.sendMessageRequest(obj, dynamicOptions);
    }

    public void connect() throws ConnectionUnavailableException {
        this.sqsMessagePublisher = new SQSBuilder(this.sinkConfig).buildSinkPublisher(this.optionHolder, checkFIFO(this.sinkConfig.getQueueUrl()));
    }

    public void disconnect() {
    }

    public void destroy() {
    }

    private boolean checkFIFO(String str) {
        return str.endsWith(".fifo") || str.substring(0, str.length() - 1).endsWith(".fifo");
    }
}
