package org.wso2.carbon.event.input.adapter.sqs.internal;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterListener;

/* loaded from: input_file:org/wso2/carbon/event/input/adapter/sqs/internal/SQSTask.class */
public class SQSTask implements Runnable {
    private static final Log LOG = LogFactory.getLog(SQSTask.class);
    private ReceiveMessageRequest receiveMessageRequest;
    private AmazonSQS sqs;
    private InputEventAdapterListener eventAdapterListener;
    private int tenantId;
    private SQSConfig configs;

    public SQSTask(AmazonSQS amazonSQS, SQSConfig sQSConfig, InputEventAdapterListener inputEventAdapterListener, int i) {
        this.tenantId = i;
        this.sqs = amazonSQS;
        this.eventAdapterListener = inputEventAdapterListener;
        this.configs = sQSConfig;
        this.receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(sQSConfig.getQueueURL()).withMaxNumberOfMessages(Integer.valueOf(sQSConfig.getMaxNumberOfMessages()));
        if (sQSConfig.getWaitTime() != null) {
            this.receiveMessageRequest = this.receiveMessageRequest.withWaitTimeSeconds(sQSConfig.getWaitTime());
        }
        if (sQSConfig.getVisibilityTimeout() != null) {
            this.receiveMessageRequest = this.receiveMessageRequest.withVisibilityTimeout(sQSConfig.getVisibilityTimeout());
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            PrivilegedCarbonContext.startTenantFlow();
            PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(this.tenantId);
            List<Message> messages = this.sqs.receiveMessage(this.receiveMessageRequest).getMessages();
            if (messages != null) {
                for (Message message : messages) {
                    sendEventToListener(message.getBody());
                    if (this.configs.shouldDeleteAfterConsuming()) {
                        delete(message);
                    }
                }
            }
        } catch (Exception e) {
            LOG.error("Error occured while trying to receive messages from the queue. Hence waiting for the next polling cycle.", e);
        } finally {
            PrivilegedCarbonContext.endTenantFlow();
        }
    }

    private void sendEventToListener(String str) {
        try {
            this.eventAdapterListener.onEvent(str);
        } catch (Exception e) {
            LOG.error(String.format("Error while transforming the event : %s", str), e);
        }
    }

    private void delete(Message message) {
        int i = 0;
        while (!deleteMessageFromQueue(message) && i < this.configs.getRetryCountLimit()) {
            i++;
            try {
                Thread.sleep(this.configs.getRetryInterval());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private boolean deleteMessageFromQueue(Message message) {
        try {
            return this.sqs.deleteMessage(new DeleteMessageRequest(this.configs.getQueueURL(), message.getReceiptHandle())).getSdkHttpMetadata().getHttpStatusCode() == 200;
        } catch (AmazonServiceException e) {
            LOG.error(String.format("Failed to delete message '%s' from the queue '%s'. Hence retrying.", message.getBody(), this.configs.getQueueURL()), e);
            return false;
        }
    }
}
