package io.siddhi.extension.io.sqs.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.sqs.api.SQSBuilder;
import io.siddhi.extension.io.sqs.util.SQSConstants;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

@Extension(name = "sqs", namespace = "source", description = "SQS source allows users to connect and consume messages from a AWS SQS Queue. It has the ability to receive Text messages", parameters = {@Parameter(name = SQSConstants.QUEUE_URL_NAME, description = "Queue name which SQS Source should subscribe to", type = {DataType.STRING}), @Parameter(name = SQSConstants.ACCESS_KEY_NAME, description = "Access Key for the Amazon Web Services. (This is a mandatory field and should be provided either in the deployment.yml or in the source definition itself)", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = SQSConstants.SECRET_KEY_NAME, description = "Secret Key of the Amazon User. (This is a mandatory field and should be provided either in the deployment.yml or in the source definition itself)", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = "region", description = "Amazon Web Service Region", type = {DataType.STRING}), @Parameter(name = SQSConstants.POLLING_INTERVAL_NAME, description = "Interval (in milliseconds) between two message retrieval operations", type = {DataType.INT}), @Parameter(name = SQSConstants.WAIT_TIME_NAME, description = "Maximum amount (in seconds) that a polling call will wait for a message to become available in the queue", type = {DataType.INT}, optional = true, defaultValue = "-1"), @Parameter(name = SQSConstants.MAX_NUMBER_OF_MESSAGES_NAME, description = "Maximum number of messages retrieved from the queue per polling call (Actual maybe smaller than this even if there's more messages in the queue)", type = {DataType.INT}, defaultValue = "1"), @Parameter(name = SQSConstants.VISIBILITY_TIMEOUT_NAME, description = "The length of time (in seconds) for which a message received from a queue will be invisible to other consumers(only applicable if consumer doesn't purge the received messages from the queue).", type = {DataType.INT}, optional = true, defaultValue = "-1"), @Parameter(name = SQSConstants.DELETE_MESSAGES_NAME, description = "Should the message be deleted from the queue after consuming it.", type = {DataType.BOOL}, optional = true, defaultValue = SQSConstants.DELETE_MESSAGES_NAME), @Parameter(name = SQSConstants.DELETE_RETRY_INTERVAL_NAME, description = "Time interval (in milliseconds) consumer should retry to delete a message in the case of failure during a message delete operation.", type = {DataType.INT}, optional = true, defaultValue = "5000"), @Parameter(name = SQSConstants.MAX_NUMBER_OF_DELETE_RETRY_ATTEMPTS_NAME, description = "Maximum number retry attempts to be performed in case of a failure.", type = {DataType.INT}, optional = true, defaultValue = "10"), @Parameter(name = SQSConstants.PARALLEL_CONSUMERS_NAME, description = "Size of the thread pool that should be used for polling.", type = {DataType.INT}, defaultValue = "1")}, examples = {@Example(syntax = "@source(type='sqs',queue='http://aws.sqs.queue.url',access.key='aws.access.key',secret.key='aws.secret.key',region='us-east-2',polling.interval='5000',max.number.of.messages='10',number.of.parallel.consumers='1',purge.messages='true',wait.time='2',visibility.timeout='30',delete.retry.interval='1000',max.number.of.delete.retry.attempts='10',@map(type='xml',enclosing.element=\"//events\",@attributes(symbol='symbol', message_id='trp:MESSAGE_ID') ))define stream inStream (symbol string, message_id string);", description = "Above example demonstrate how an SQS source is getting configured in order to consume messages from an SQS queue.\nSQS source will establish the connection to a queue using given configurations and start consuming xml messages from the queue.\nOnce a message is received by the source from the given queue, 'xml' mapper will generate a siddhi event from that message and pass it to the inStream.")})
/* loaded from: input_file:io/siddhi/extension/io/sqs/source/SQSSource.class */
public class SQSSource extends Source {
    private static final Logger logger = Logger.getLogger(SQSSource.class);
    private ScheduledExecutorService scheduledExecutorService;
    private SQSSourceConfig sourceConfig;
    private SourceEventListener sourceEventListener;
    private List<ScheduledFuture<?>> futures = new ArrayList();

    public StateFactory init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sourceConfig = new SQSSourceConfig(optionHolder, strArr);
        if (this.sourceConfig.getAccessKey() == null || this.sourceConfig.getAccessKey().isEmpty()) {
            this.sourceConfig.setAccessKey(configReader.readConfig(SQSConstants.ACCESS_KEY_NAME, (String) null));
        }
        if (this.sourceConfig.getSecretKey() == null || this.sourceConfig.getAccessKey().isEmpty()) {
            this.sourceConfig.setSecretKey(configReader.readConfig(SQSConstants.SECRET_KEY_NAME, (String) null));
        }
        if (this.sourceConfig.getAccessKey() == null || this.sourceConfig.getSecretKey() == null || this.sourceConfig.getAccessKey().isEmpty() || this.sourceConfig.getSecretKey().isEmpty()) {
            throw new SiddhiAppValidationException("Access key and Secret key are mandatory parameters for the SQS client");
        }
        this.scheduledExecutorService = siddhiAppContext.getScheduledExecutorService();
        this.sourceEventListener = sourceEventListener;
        return null;
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class};
    }

    public void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        startPolling();
    }

    public void disconnect() {
        stopAndRemoveFutures();
    }

    public void destroy() {
        stopAndRemoveFutures();
    }

    public void pause() {
        stopAndRemoveFutures();
    }

    public void resume() {
        startPolling();
    }

    private void startPolling() {
        for (int i = 0; i < this.sourceConfig.getThreadPoolSize(); i++) {
            this.futures.add(this.scheduledExecutorService.scheduleAtFixedRate(new SQSBuilder(this.sourceConfig).buildSourceTask(this.sourceEventListener), 0L, this.sourceConfig.getPollingInterval(), TimeUnit.MILLISECONDS));
        }
        logger.info("SQS Provider connected and started polling.");
    }

    private void stopAndRemoveFutures() {
        for (int i = 0; i < this.futures.size(); i++) {
            this.futures.get(i).cancel(true);
            this.futures.remove(i);
        }
    }
}
