package io.siddhi.extension.io.sqs.source;

import com.amazonaws.AbortedException;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.sqs.util.SQSConstants;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/siddhi/extension/io/sqs/source/SQSSourceTask.class */
public class SQSSourceTask implements Runnable {
    private static final Logger logger = LogManager.getLogger(SQSSourceTask.class);
    private ReceiveMessageRequest messageRequest;
    private AmazonSQS amazonSQS;
    private SourceEventListener sourceEventListener;
    private SQSSourceConfig sourceConfig;
    private String[] reqTransportProperties;

    public SQSSourceTask(SQSSourceConfig sQSSourceConfig, AmazonSQS amazonSQS, SourceEventListener sourceEventListener) {
        this.sourceEventListener = sourceEventListener;
        this.amazonSQS = amazonSQS;
        initializeRequest(sQSSourceConfig);
    }

    @Override // java.lang.Runnable
    public void run() {
        boolean z = false;
        while (!z) {
            try {
                List<Message> messages = this.amazonSQS.receiveMessage(this.messageRequest).getMessages();
                if (messages == null || messages.isEmpty()) {
                    z = true;
                } else {
                    messages.forEach(message -> {
                        this.sourceEventListener.onEvent(message.getBody(), getTransportProperties(message, this.reqTransportProperties));
                        if (this.sourceConfig.deleteAfterConsume()) {
                            delete(message);
                        }
                    });
                }
            } catch (AbortedException e) {
                logger.info("Executor thread aborted.");
                return;
            } catch (SdkClientException e2) {
                logger.error("Error occurred while trying to receive messages from the queue. Hence waiting for polling cycle.\n" + e2.getMessage(), e2);
                return;
            }
        }
    }

    private void initializeRequest(SQSSourceConfig sQSSourceConfig) {
        this.sourceConfig = sQSSourceConfig;
        this.reqTransportProperties = this.sourceConfig.getRequestedTransportProperties();
        this.messageRequest = new ReceiveMessageRequest().withQueueUrl(this.sourceConfig.getQueueUrl()).withMaxNumberOfMessages(Integer.valueOf(this.sourceConfig.getMaxNumberOfMessages()));
        if (this.sourceConfig.getWaitTime() != -1) {
            this.messageRequest = this.messageRequest.withWaitTimeSeconds(Integer.valueOf(this.sourceConfig.getWaitTime()));
        }
        if (this.sourceConfig.getVisibilityTimeout() != -1) {
            this.messageRequest = this.messageRequest.withVisibilityTimeout(Integer.valueOf(this.sourceConfig.getVisibilityTimeout()));
        }
    }

    private void delete(Message message) {
        int i = 0;
        while (!deleteMessageFromQueue(message) && i < this.sourceConfig.getRetryCountLimit()) {
            i++;
            try {
                Thread.sleep(this.sourceConfig.getRetryInterval());
            } catch (InterruptedException e) {
                logger.info("SQS source thread got interrupted during delete retry.");
                Thread.currentThread().interrupt();
            }
        }
    }

    private boolean deleteMessageFromQueue(Message message) {
        try {
            return this.amazonSQS.deleteMessage(new DeleteMessageRequest(this.sourceConfig.getQueueUrl(), message.getReceiptHandle())).getSdkHttpMetadata().getHttpStatusCode() == 200;
        } catch (SdkClientException e) {
            logger.error(String.format("Failed to delete message '%s' from the queue '%s'. Hence retrying.", message.getBody(), this.sourceConfig.getQueueUrl()), e);
            return false;
        }
    }

    private String[] getTransportProperties(Message message, String[] strArr) {
        String[] strArr2 = new String[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            if (strArr[i].equalsIgnoreCase(SQSConstants.MESSAGE_ID_PROPERTY_NAME)) {
                strArr2[i] = message.getMessageId();
            }
        }
        return strArr2;
    }
}
