package org.graylog.integrations.aws.transports;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.graylog.integrations.aws.AWSClientBuilderUtil;
import org.graylog.integrations.aws.AWSMessageType;
import org.graylog.integrations.aws.resources.requests.AWSRequest;
import org.graylog2.plugin.InputFailureRecorder;
import org.graylog2.plugin.system.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.NoOpWorkerStateChangeListener;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.lifecycle.NoOpTaskExecutionListener;
import software.amazon.kinesis.lifecycle.TaskOutcome;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/* loaded from: input_file:org/graylog/integrations/aws/transports/KinesisConsumer.class */
public class KinesisConsumer implements Runnable {
    private static final int GRACEFUL_SHUTDOWN_TIMEOUT = 20;
    private final String kinesisStreamName;
    private final NodeId nodeId;
    private final KinesisTransport transport;
    private final Integer recordBatchSize;
    private final ObjectMapper objectMapper;
    private final AWSMessageType awsMessageType;
    private final Consumer<byte[]> handleMessageCallback;
    private final AWSRequest request;
    private final AWSClientBuilderUtil awsClientBuilderUtil;
    private final InputFailureRecorder inputFailureRecorder;
    private Scheduler kinesisScheduler;
    private static final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
    private static final TimeUnit GRACEFUL_SHUTDOWN_TIMEOUT_UNIT = TimeUnit.SECONDS;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisConsumer(NodeId nodeId, KinesisTransport kinesisTransport, ObjectMapper objectMapper, Consumer<byte[]> consumer, String str, AWSMessageType aWSMessageType, int i, AWSRequest aWSRequest, AWSClientBuilderUtil aWSClientBuilderUtil, InputFailureRecorder inputFailureRecorder) {
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "A Kinesis stream name is required.");
        Preconditions.checkNotNull(aWSMessageType, "A AWSMessageType is required.");
        this.nodeId = (NodeId) Objects.requireNonNull(nodeId, "nodeId");
        this.transport = kinesisTransport;
        this.handleMessageCallback = consumer;
        this.kinesisStreamName = (String) Objects.requireNonNull(str, "kinesisStream");
        this.objectMapper = objectMapper;
        this.awsMessageType = aWSMessageType;
        this.recordBatchSize = Integer.valueOf(i);
        this.request = aWSRequest;
        this.awsClientBuilderUtil = aWSClientBuilderUtil;
        this.inputFailureRecorder = inputFailureRecorder;
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.debug("Starting the Kinesis Consumer.");
        AwsCredentialsProvider createCredentialsProvider = this.awsClientBuilderUtil.createCredentialsProvider(this.request);
        Region of = Region.of(this.request.region());
        AwsClientBuilder builder = DynamoDbAsyncClient.builder();
        this.awsClientBuilderUtil.initializeBuilder(builder, this.request.dynamodbEndpoint(), of, createCredentialsProvider);
        DynamoDbAsyncClient dynamoDbAsyncClient = (DynamoDbAsyncClient) builder.build();
        AwsClientBuilder builder2 = CloudWatchAsyncClient.builder();
        this.awsClientBuilderUtil.initializeBuilder(builder2, this.request.cloudwatchEndpoint(), of, createCredentialsProvider);
        CloudWatchAsyncClient cloudWatchAsyncClient = (CloudWatchAsyncClient) builder2.build();
        AwsClientBuilder builder3 = KinesisAsyncClient.builder();
        this.awsClientBuilderUtil.initializeBuilder(builder3, this.request.kinesisEndpoint(), of, createCredentialsProvider);
        KinesisAsyncClient createKinesisAsyncClient = KinesisClientUtil.createKinesisAsyncClient(builder3);
        String format = String.format(Locale.ENGLISH, "graylog-node-%s", this.nodeId.anonymize());
        LOG.debug("Using workerId [{}].", format);
        String format2 = String.format(Locale.ENGLISH, "graylog-aws-plugin-%s", this.kinesisStreamName);
        LOG.debug("Using Kinesis applicationName [{}].", format2);
        ConfigsBuilder configsBuilder = new ConfigsBuilder(this.kinesisStreamName, format2, createKinesisAsyncClient, dynamoDbAsyncClient, cloudWatchAsyncClient, format, new KinesisShardProcessorFactory(this.objectMapper, this.transport, this.handleMessageCallback, this.kinesisStreamName, this.awsMessageType));
        PollingConfig pollingConfig = new PollingConfig(this.kinesisStreamName, createKinesisAsyncClient);
        if (this.recordBatchSize != null) {
            LOG.debug("Using explicit batch size [{}]", this.recordBatchSize);
            pollingConfig.maxRecords(this.recordBatchSize.intValue());
        }
        this.kinesisScheduler = new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig().workerStateChangeListener(new NoOpWorkerStateChangeListener() { // from class: org.graylog.integrations.aws.transports.KinesisConsumer.1
            public void onAllInitializationAttemptsFailed(Throwable th) {
                KinesisConsumer.this.inputFailureRecorder.setFailing(KinesisConsumer.class, String.format(Locale.ROOT, "Initialization for Kinesis stream <%s> failed.", KinesisConsumer.this.kinesisStreamName), th);
            }
        }), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig().taskExecutionListener(new NoOpTaskExecutionListener() { // from class: org.graylog.integrations.aws.transports.KinesisConsumer.2
            public void afterTaskExecution(TaskExecutionListenerInput taskExecutionListenerInput) {
                if (TaskOutcome.FAILURE.equals(taskExecutionListenerInput.taskOutcome())) {
                    KinesisConsumer.this.inputFailureRecorder.setFailing(KinesisConsumer.class, String.format(Locale.ROOT, "Errors for Kinesis stream <%s>!", KinesisConsumer.this.kinesisStreamName));
                } else if (TaskOutcome.SUCCESSFUL.equals(taskExecutionListenerInput.taskOutcome()) && TaskType.PROCESS.equals(taskExecutionListenerInput.taskType())) {
                    KinesisConsumer.this.inputFailureRecorder.setRunning();
                }
            }
        }), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig));
        LOG.debug("Starting Kinesis scheduler.");
        this.kinesisScheduler.run();
        LOG.debug("After Kinesis scheduler stopped.");
    }

    public void stop() {
        if (this.kinesisScheduler != null) {
            CompletableFuture startGracefulShutdown = this.kinesisScheduler.startGracefulShutdown();
            LOG.info("Waiting up to 20 seconds for Kinesis Consumer shutdown to complete.");
            try {
                startGracefulShutdown.get(20L, GRACEFUL_SHUTDOWN_TIMEOUT_UNIT);
            } catch (InterruptedException e) {
                LOG.info("Interrupted while waiting for graceful shutdown. Continuing.");
            } catch (ExecutionException e2) {
                LOG.error("Exception while executing graceful shutdown.", e2);
            } catch (TimeoutException e3) {
                LOG.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
                this.kinesisScheduler.shutdown();
            }
        }
    }
}
