package org.graylog.aws.inputs.cloudtrail;

import com.amazonaws.regions.Region;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import okhttp3.HttpUrl;
import org.graylog.aws.auth.AWSAuthProvider;
import org.graylog.aws.inputs.cloudtrail.json.CloudTrailRecord;
import org.graylog.aws.inputs.cloudtrail.messages.TreeReader;
import org.graylog.aws.inputs.cloudtrail.notifications.CloudtrailSNSNotification;
import org.graylog.aws.inputs.cloudtrail.notifications.CloudtrailSQSClient;
import org.graylog.aws.s3.S3Reader;
import org.graylog2.plugin.InputFailureRecorder;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.shared.utilities.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog/aws/inputs/cloudtrail/CloudTrailSubscriber.class */
public class CloudTrailSubscriber extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(CloudTrailSubscriber.class);
    public static final int SLEEP_INTERVAL_SECS = 5;
    private volatile boolean stopped = false;
    private volatile boolean paused = false;
    private volatile CountDownLatch pausedLatch = new CountDownLatch(0);
    private final MessageInput sourceInput;
    private final Region sqsRegion;
    private final Region s3Region;
    private final String queueName;
    private final AWSAuthProvider authProvider;
    private final HttpUrl proxyUrl;
    private final ObjectMapper objectMapper;
    private final InputFailureRecorder inputFailureRecorder;

    public CloudTrailSubscriber(Region region, Region region2, String str, MessageInput messageInput, AWSAuthProvider aWSAuthProvider, HttpUrl httpUrl, ObjectMapper objectMapper, InputFailureRecorder inputFailureRecorder) {
        this.sqsRegion = region;
        this.s3Region = region2;
        this.queueName = str;
        this.authProvider = aWSAuthProvider;
        this.sourceInput = messageInput;
        this.proxyUrl = httpUrl;
        this.objectMapper = objectMapper;
        this.inputFailureRecorder = inputFailureRecorder;
    }

    public void pause() {
        this.paused = true;
        this.pausedLatch = new CountDownLatch(1);
    }

    public void unpause() {
        this.paused = false;
        this.pausedLatch.countDown();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        LOG.debug("Starting CloudTrailSubscriber");
        CloudtrailSQSClient cloudtrailSQSClient = new CloudtrailSQSClient(this.sqsRegion, this.queueName, this.authProvider, this.proxyUrl, this.objectMapper);
        TreeReader treeReader = new TreeReader(this.objectMapper);
        S3Reader s3Reader = new S3Reader(this.s3Region, this.proxyUrl, this.authProvider);
        while (!this.stopped) {
            while (true) {
                if (this.stopped) {
                    break;
                }
                if (this.paused) {
                    LOG.debug("Processing paused");
                    Uninterruptibles.awaitUninterruptibly(this.pausedLatch);
                }
                if (this.stopped) {
                    break;
                }
                try {
                    List<CloudtrailSNSNotification> notifications = cloudtrailSQSClient.getNotifications();
                    LOG.debug("Subscriber returned [{}] notifications.", Integer.valueOf(notifications.size()));
                    if (notifications.size() == 0) {
                        LOG.debug("No more messages to read from SQS. Going into sleep loop.");
                        break;
                    }
                    LOG.debug("Proceeding to read message content from S3.");
                    for (CloudtrailSNSNotification cloudtrailSNSNotification : notifications) {
                        try {
                            LOG.debug("Checking for CloudTrail notifications in SQS.");
                            List<CloudTrailRecord> read = treeReader.read(s3Reader.readCompressed(cloudtrailSNSNotification.getS3Bucket(), cloudtrailSNSNotification.getS3ObjectKey()));
                            LOG.debug("[{}] records read from S3.", Integer.valueOf(read.size()));
                            for (CloudTrailRecord cloudTrailRecord : read) {
                                LOG.debug("Processing message content.");
                                if (LOG.isTraceEnabled()) {
                                    LOG.trace("Processing cloud trail record: {}", this.objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(cloudTrailRecord));
                                }
                                this.sourceInput.processRawMessage(new RawMessage(this.objectMapper.writeValueAsBytes(cloudTrailRecord)));
                            }
                            cloudtrailSQSClient.deleteNotification(cloudtrailSNSNotification);
                            this.inputFailureRecorder.setRunning();
                        } catch (Exception e) {
                            this.inputFailureRecorder.setFailing(getClass(), StringUtils.f("Could not read CloudTrail log file for <%s>. Skipping.", cloudtrailSNSNotification.getS3Bucket()), e);
                        }
                    }
                } catch (Exception e2) {
                    this.inputFailureRecorder.setFailing(getClass(), "Could not read messages from SQS. This is most likely a misconfiguration of the plugin. Going into sleep loop and retrying.", e2);
                }
            }
            if (!this.stopped) {
                LOG.debug("Waiting {} seconds until next CloudTrail SQS check.", 5);
                Uninterruptibles.sleepUninterruptibly(5L, TimeUnit.SECONDS);
            }
        }
    }

    public void terminate() {
        this.stopped = true;
        this.paused = false;
        this.pausedLatch.countDown();
    }
}
