package org.graylog.integrations.aws.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import javax.inject.Inject;
import javax.ws.rs.BadRequestException;
import org.apache.commons.collections.CollectionUtils;
import org.graylog.integrations.aws.AWSClientBuilderUtil;
import org.graylog.integrations.aws.AWSLogMessage;
import org.graylog.integrations.aws.AWSMessageType;
import org.graylog.integrations.aws.cloudwatch.CloudWatchLogEvent;
import org.graylog.integrations.aws.cloudwatch.CloudWatchLogSubscriptionData;
import org.graylog.integrations.aws.cloudwatch.KinesisLogEntry;
import org.graylog.integrations.aws.resources.requests.AWSRequest;
import org.graylog.integrations.aws.resources.requests.CreateRolePermissionRequest;
import org.graylog.integrations.aws.resources.requests.KinesisHealthCheckRequest;
import org.graylog.integrations.aws.resources.requests.KinesisNewStreamRequest;
import org.graylog.integrations.aws.resources.responses.CreateRolePermissionResponse;
import org.graylog.integrations.aws.resources.responses.KinesisHealthCheckResponse;
import org.graylog.integrations.aws.resources.responses.KinesisNewStreamResponse;
import org.graylog.integrations.aws.resources.responses.StreamsResponse;
import org.graylog.integrations.aws.transports.KinesisPayloadDecoder;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.shared.utilities.ExceptionUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.iam.IamClient;
import software.amazon.awssdk.services.iam.IamClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;

/* loaded from: input_file:org/graylog/integrations/aws/service/KinesisService.class */
public class KinesisService {
    private static final Logger LOG = LoggerFactory.getLogger(AWSService.class);
    private static final int EIGHT_BITS = 8;
    private static final int KINESIS_LIST_STREAMS_MAX_ATTEMPTS = 1000;
    private static final int KINESIS_LIST_STREAMS_LIMIT = 400;
    private static final int RECORDS_SAMPLE_SIZE = 10;
    private static final int SHARD_COUNT = 1;
    private static final String ROLE_NAME_FORMAT = "graylog-cloudwatch-role-%s";
    private static final String ROLE_POLICY_NAME_FORMAT = "graylog-cloudwatch-role-policy-%s";
    private static final String UNIQUE_ROLE_DATE_FORMAT = "yyyy-MM-dd-HH-mm-ss";
    private static final String CONTROL_MESSAGE_TOKEN = "CWL CONTROL MESSAGE";
    private final IamClientBuilder iamClientBuilder;
    private final KinesisClientBuilder kinesisClientBuilder;
    private final ObjectMapper objectMapper;
    private final Map<String, Codec.Factory<? extends Codec>> availableCodecs;
    private final AWSClientBuilderUtil awsClientBuilderUtil;

    @Inject
    public KinesisService(IamClientBuilder iamClientBuilder, KinesisClientBuilder kinesisClientBuilder, ObjectMapper objectMapper, Map<String, Codec.Factory<? extends Codec>> map, AWSClientBuilderUtil aWSClientBuilderUtil) {
        this.iamClientBuilder = iamClientBuilder;
        this.kinesisClientBuilder = kinesisClientBuilder;
        this.objectMapper = objectMapper;
        this.availableCodecs = map;
        this.awsClientBuilderUtil = aWSClientBuilderUtil;
    }

    public KinesisHealthCheckResponse healthCheck(KinesisHealthCheckRequest kinesisHealthCheckRequest) throws ExecutionException, IOException {
        LOG.debug("Executing healthCheck");
        LOG.debug("Requesting a list of streams to find out if the indicated stream exists.");
        if (!getKinesisStreamNames(kinesisHealthCheckRequest).streams().stream().anyMatch(str -> {
            return str.equals(kinesisHealthCheckRequest.streamName());
        })) {
            throw new BadRequestException(String.format(Locale.ROOT, "The requested stream [%s] was not found.", kinesisHealthCheckRequest.streamName()));
        }
        LOG.debug("The stream [{}] exists", kinesisHealthCheckRequest.streamName());
        List<Record> retrieveRecords = retrieveRecords(kinesisHealthCheckRequest.streamName(), this.awsClientBuilderUtil.buildClient(this.kinesisClientBuilder, kinesisHealthCheckRequest));
        if (retrieveRecords.size() == 0) {
            throw new BadRequestException(String.format(Locale.ROOT, "The Kinesis stream [%s] does not contain any messages.", kinesisHealthCheckRequest.streamName()));
        }
        Record selectRandomRecord = selectRandomRecord(retrieveRecords);
        byte[] asByteArray = selectRandomRecord.data().asByteArray();
        boolean isCompressed = isCompressed(asByteArray);
        return isCompressed ? handleCompressedMessages(kinesisHealthCheckRequest, asByteArray) : detectAndParseMessage(new String(asByteArray, StandardCharsets.UTF_8), new DateTime(selectRandomRecord.approximateArrivalTimestamp().toEpochMilli(), DateTimeZone.UTC), kinesisHealthCheckRequest.streamName(), "", "", isCompressed);
    }

    public StreamsResponse getKinesisStreamNames(AWSRequest aWSRequest) throws ExecutionException {
        LOG.debug("List Kinesis streams for region [{}]", aWSRequest.region());
        KinesisClient buildClient = this.awsClientBuilderUtil.buildClient(this.kinesisClientBuilder, aWSRequest);
        ListStreamsResponse listStreams = buildClient.listStreams((ListStreamsRequest) ListStreamsRequest.builder().limit(Integer.valueOf(KINESIS_LIST_STREAMS_LIMIT)).build());
        ArrayList arrayList = new ArrayList(listStreams.streamNames());
        Retryer build = RetryerBuilder.newBuilder().retryIfResult(bool -> {
            return Objects.equals(bool, Boolean.TRUE);
        }).retryIfExceptionOfType(LimitExceededException.class).withStopStrategy(StopStrategies.stopAfterAttempt(1000)).build();
        if (listStreams.hasMoreStreams().booleanValue()) {
            try {
                build.call(() -> {
                    LOG.debug("Requesting streams...");
                    ListStreamsResponse listStreams2 = buildClient.listStreams((ListStreamsRequest) ListStreamsRequest.builder().exclusiveStartStreamName((String) arrayList.get(arrayList.size() - 1)).limit(Integer.valueOf(KINESIS_LIST_STREAMS_LIMIT)).build());
                    arrayList.addAll(listStreams2.streamNames());
                    return listStreams2.hasMoreStreams();
                });
            } catch (RetryException e) {
                LOG.error("Failed to get all stream names after {} attempts. Proceeding to return currently obtained streams.", 1000);
            }
        }
        LOG.debug("Kinesis streams queried: [{}]", arrayList);
        if (arrayList.isEmpty()) {
            throw new BadRequestException(String.format(Locale.ROOT, "No Kinesis streams were found in the [%s] region.", aWSRequest.region()));
        }
        return StreamsResponse.create(arrayList, arrayList.size());
    }

    private KinesisHealthCheckResponse handleCompressedMessages(KinesisHealthCheckRequest kinesisHealthCheckRequest, byte[] bArr) throws IOException {
        LOG.debug("The supplied payload is GZip compressed. Proceeding to decompress.");
        CloudWatchLogSubscriptionData decompressCloudWatchMessages = KinesisPayloadDecoder.decompressCloudWatchMessages(bArr, this.objectMapper);
        Optional<CloudWatchLogEvent> findAny = decompressCloudWatchMessages.logEvents().stream().findAny();
        if (findAny.isEmpty()) {
            throw new BadRequestException("The CloudWatch payload did not contain any messages. This should not happen. See https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html");
        }
        CloudWatchLogEvent cloudWatchLogEvent = findAny.get();
        return detectAndParseMessage(cloudWatchLogEvent.message(), new DateTime(cloudWatchLogEvent.timestamp(), DateTimeZone.UTC), kinesisHealthCheckRequest.streamName(), decompressCloudWatchMessages.logGroup(), decompressCloudWatchMessages.logStream(), true);
    }

    List<Record> retrieveRecords(String str, KinesisClient kinesisClient) {
        LOG.debug("About to retrieve logs records from Kinesis.");
        ListShardsResponse listShards = kinesisClient.listShards((ListShardsRequest) ListShardsRequest.builder().streamName(str).build());
        ArrayList arrayList = new ArrayList();
        Iterator it = listShards.shards().iterator();
        while (it.hasNext()) {
            String shardId = ((Shard) it.next()).shardId();
            String shardIterator = kinesisClient.getShardIterator((GetShardIteratorRequest) GetShardIteratorRequest.builder().shardId(shardId).streamName(str).shardIteratorType(ShardIteratorType.TRIM_HORIZON).build()).shardIterator();
            boolean z = true;
            LOG.debug("Retrieved shard id: [{}] with shard iterator: [{}]", shardId, shardIterator);
            while (z) {
                LOG.debug("Getting more records");
                GetRecordsResponse records = kinesisClient.getRecords((GetRecordsRequest) GetRecordsRequest.builder().shardIterator(shardIterator).build());
                shardIterator = records.nextShardIterator();
                for (Record record : records.records()) {
                    if (!isControlMessage(record)) {
                        arrayList.add(record);
                        if (arrayList.size() == 10) {
                            LOG.debug("Returning the list of records now that sample size [{}] has been met.", 10);
                            return arrayList;
                        }
                    }
                }
                if (records.millisBehindLatest().longValue() == 0) {
                    LOG.debug("Found the end of the shard. No more records returned from the shard.");
                    z = false;
                }
            }
        }
        LOG.debug("Returning the list with [{}] records.", Integer.valueOf(arrayList.size()));
        return arrayList;
    }

    private boolean isControlMessage(Record record) {
        byte[] asByteArray = record.data().asByteArray();
        if (!isCompressed(asByteArray)) {
            return false;
        }
        try {
            return Tools.decompressGzip(asByteArray).contains(CONTROL_MESSAGE_TOKEN);
        } catch (IOException e) {
            throw new BadRequestException("Failed to decode message from CloudWatch and check if it's a control message.");
        }
    }

    private KinesisHealthCheckResponse detectAndParseMessage(String str, DateTime dateTime, String str2, String str3, String str4, boolean z) {
        LOG.debug("Attempting to detect the type of log message. message [{}] stream [{}] log group [{}].", new Object[]{str, str2, str3});
        AWSMessageType detectLogMessageType = new AWSLogMessage(str).detectLogMessageType(z);
        LOG.debug("The message is type [{}]", detectLogMessageType);
        String format = String.format(Locale.ROOT, "Success. The message is a %s message.", detectLogMessageType.getLabel());
        KinesisLogEntry create = KinesisLogEntry.create(str2, str3, str4, dateTime, str);
        Codec.Factory<? extends Codec> factory = this.availableCodecs.get(detectLogMessageType.getCodecName());
        if (factory == null) {
            throw new BadRequestException(String.format(Locale.ROOT, "A codec with name [%s] could not be found.", detectLogMessageType.getCodecName()));
        }
        try {
            Message decode = factory.create(Configuration.EMPTY_CONFIGURATION).decode(new RawMessage(this.objectMapper.writeValueAsBytes(create)));
            if (decode == null) {
                throw new BadRequestException(String.format(Locale.ROOT, "Message decoding failed. More information might be available by enabling Debug logging. message [%s]", str));
            }
            LOG.debug("Successfully parsed message type [{}] with codec [{}].", detectLogMessageType, detectLogMessageType.getCodecName());
            return KinesisHealthCheckResponse.create(detectLogMessageType, format, decode.getFields());
        } catch (JsonProcessingException e) {
            throw new BadRequestException("Encoding the message to bytes failed.", e);
        }
    }

    Record selectRandomRecord(List<Record> list) {
        Preconditions.checkArgument(CollectionUtils.isNotEmpty(list), "Records list can not be empty.");
        LOG.debug("Selecting a random Record from the sample list.");
        return list.get(new Random().nextInt(list.size()));
    }

    public static boolean isCompressed(byte[] bArr) {
        if (bArr == null || bArr.length < 2) {
            return false;
        }
        return (bArr[0] == 31) && (bArr[1] == -117);
    }

    public KinesisNewStreamResponse createNewKinesisStream(KinesisNewStreamRequest kinesisNewStreamRequest) {
        StreamDescription streamDescription;
        LOG.debug("Creating Kinesis client with the provided credentials.");
        KinesisClient buildClient = this.awsClientBuilderUtil.buildClient(this.kinesisClientBuilder, kinesisNewStreamRequest);
        LOG.debug("Creating new Kinesis stream request [{}].", kinesisNewStreamRequest.streamName());
        CreateStreamRequest createStreamRequest = (CreateStreamRequest) CreateStreamRequest.builder().streamName(kinesisNewStreamRequest.streamName()).shardCount(1).build();
        LOG.debug("Sending request to create new Kinesis stream [{}] with [{}] shards.", kinesisNewStreamRequest.streamName(), 1);
        try {
            buildClient.createStream(createStreamRequest);
            int i = 0;
            do {
                try {
                    Thread.sleep(1000L);
                    streamDescription = buildClient.describeStream((DescribeStreamRequest) DescribeStreamRequest.builder().streamName(kinesisNewStreamRequest.streamName()).build()).streamDescription();
                    if (i > 300) {
                        throw new BadRequestException(String.format(Locale.ROOT, "Fail. Stream [%s] has failed to become active within 60 seconds.", kinesisNewStreamRequest.streamName()));
                    }
                    i++;
                } catch (InterruptedException e) {
                    LOG.error("Request interrupted while waiting for shard to become available.");
                    return null;
                }
            } while (streamDescription.streamStatus() != StreamStatus.ACTIVE);
            String streamARN = streamDescription.streamARN();
            return KinesisNewStreamResponse.create(createStreamRequest.streamName(), streamARN, String.format(Locale.ROOT, "Success. The new stream [%s/%s] was created with [%d] shard.", kinesisNewStreamRequest.streamName(), streamARN, 1));
        } catch (Exception e2) {
            String format = String.format(Locale.ROOT, "Attempt to create [%s] new Kinesis stream with [%d] shards failed due to the following exception: [%s]", kinesisNewStreamRequest.streamName(), 1, ExceptionUtils.formatMessageCause(e2));
            LOG.error(format, e2);
            throw new BadRequestException(format, e2);
        }
    }

    public CreateRolePermissionResponse autoKinesisPermissions(CreateRolePermissionRequest createRolePermissionRequest) {
        String format = String.format(Locale.ROOT, ROLE_NAME_FORMAT, DateTime.now(DateTimeZone.UTC).toString(UNIQUE_ROLE_DATE_FORMAT));
        try {
            IamClient buildClient = this.awsClientBuilderUtil.buildClient(this.iamClientBuilder, createRolePermissionRequest);
            LOG.debug(createRoleForKinesisAutoSetup(buildClient, createRolePermissionRequest.region(), format));
            setPermissionsForKinesisAutoSetupRole(buildClient, format, createRolePermissionRequest.streamArn());
            String rolePermissionsArn = getRolePermissionsArn(buildClient, format);
            return CreateRolePermissionResponse.create(String.format(Locale.ROOT, "Success! The role [%s/%s] has been created.", format, rolePermissionsArn), rolePermissionsArn, format);
        } catch (Exception e) {
            throw new BadRequestException(String.format(Locale.ROOT, "Unable to automatically set up Kinesis role [%s] due to the following error [%s]", format, ExceptionUtils.formatMessageCause(e)));
        }
    }

    private static void setPermissionsForKinesisAutoSetupRole(IamClient iamClient, String str, String str2) {
        String str3 = "{\n  \"Statement\": [\n    {\n      \"Effect\": \"Allow\",\n      \"Action\": \"kinesis:PutRecord\",\n      \"Resource\": \"" + str2 + "\"\n    }\n  ]\n}";
        String format = String.format(Locale.ROOT, ROLE_POLICY_NAME_FORMAT, DateTime.now(DateTimeZone.UTC).toString(UNIQUE_ROLE_DATE_FORMAT));
        LOG.debug("Attaching [{}] policy to [{}] role", format, str);
        try {
            iamClient.putRolePolicy(builder -> {
                builder.roleName(str).policyName(format).policyDocument(str3);
            });
            LOG.debug("Success! The role policy [{}] was assigned.", format);
        } catch (Exception e) {
            throw new BadRequestException(String.format(Locale.ROOT, "Unable to create role [%s] due to the following error [%s]", str, ExceptionUtils.formatMessageCause(e)));
        }
    }

    private static String createRoleForKinesisAutoSetup(IamClient iamClient, String str, String str2) {
        LOG.debug("Create Kinesis Auto Setup Role [{}] to region [{}]", str2, str);
        String str3 = "{\n  \"Statement\": [\n    {\n      \"Effect\": \"Allow\",\n      \"Principal\": { \"Service\": \"logs." + str + ".amazonaws.com\" },\n      \"Action\": \"sts:AssumeRole\"\n    }\n  ]\n}";
        LOG.debug("Role [{}] was created.", str2);
        try {
            iamClient.createRole(builder -> {
                builder.roleName(str2).assumeRolePolicyDocument(str3);
            });
            return String.format(Locale.ROOT, "Success! The role [%s] was created.", str2);
        } catch (Exception e) {
            throw new BadRequestException(String.format(Locale.ROOT, "The role [%s] was not created due to the following reason [%s]", str2, ExceptionUtils.formatMessageCause(e)));
        }
    }

    private static String getRolePermissionsArn(IamClient iamClient, String str) {
        LOG.debug("Acquiring the role ARN associated to the role [{}]", str);
        return iamClient.getRole(builder -> {
            builder.roleName(str);
        }).role().arn();
    }
}
