package org.graylog.integrations.aws.transports;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.graylog.integrations.aws.AWSMessageType;
import org.graylog.integrations.aws.cloudwatch.KinesisLogEntry;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

/* loaded from: input_file:org/graylog/integrations/aws/transports/KinesisShardProcessorFactory.class */
public class KinesisShardProcessorFactory implements ShardRecordProcessorFactory {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisShardProcessor.class);
    private final String kinesisStreamName;
    private final ObjectMapper objectMapper;
    private final KinesisTransport transport;
    private final Consumer<byte[]> handleMessageCallback;
    private final KinesisPayloadDecoder kinesisPayloadDecoder;

    /* loaded from: input_file:org/graylog/integrations/aws/transports/KinesisShardProcessorFactory$KinesisShardProcessor.class */
    public class KinesisShardProcessor implements ShardRecordProcessor {
        private DateTime lastCheckpoint = DateTime.now(DateTimeZone.UTC);

        public KinesisShardProcessor() {
        }

        public void initialize(InitializationInput initializationInput) {
            KinesisShardProcessorFactory.LOG.debug("Initializing Kinesis worker for stream [{}].", KinesisShardProcessorFactory.this.kinesisStreamName);
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            KinesisShardProcessorFactory.LOG.debug("Received [{}] Kinesis events.", Integer.valueOf(processRecordsInput.records().size()));
            if (KinesisShardProcessorFactory.this.transport.isThrottled()) {
                KinesisShardProcessorFactory.LOG.info("[throttled] The Kinesis consumer will pause message processing until the throttle state clears.");
                KinesisShardProcessorFactory.this.transport.blockUntilUnthrottled();
                KinesisShardProcessorFactory.LOG.info("[unthrottled] Kinesis consumer will now resume processing records.");
            }
            for (KinesisClientRecord kinesisClientRecord : processRecordsInput.records()) {
                try {
                    ByteBuffer asReadOnlyBuffer = kinesisClientRecord.data().asReadOnlyBuffer();
                    byte[] bArr = new byte[asReadOnlyBuffer.remaining()];
                    asReadOnlyBuffer.get(bArr);
                    Iterator<KinesisLogEntry> it = KinesisShardProcessorFactory.this.kinesisPayloadDecoder.processMessages(bArr, kinesisClientRecord.approximateArrivalTimestamp()).iterator();
                    while (it.hasNext()) {
                        KinesisShardProcessorFactory.this.handleMessageCallback.accept(KinesisShardProcessorFactory.this.objectMapper.writeValueAsBytes(it.next()));
                    }
                } catch (Exception e) {
                    KinesisShardProcessorFactory.LOG.error("Could not read Kinesis record from stream [{}]", KinesisShardProcessorFactory.this.kinesisStreamName, e);
                }
            }
            if (this.lastCheckpoint.plusMinutes(1).isBeforeNow()) {
                this.lastCheckpoint = DateTime.now(DateTimeZone.UTC);
                checkpoint(processRecordsInput.checkpointer(), null);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            KinesisShardProcessorFactory.LOG.debug("Lease lost.");
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            checkpoint(shardEndedInput.checkpointer(), null);
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            KinesisShardProcessorFactory.LOG.debug("Beginning shutdown Kinesis worker for stream [{}].", KinesisShardProcessorFactory.this.kinesisStreamName);
            checkpoint(shutdownRequestedInput.checkpointer(), null);
        }

        private void checkpoint(RecordProcessorCheckpointer recordProcessorCheckpointer, String str) {
            KinesisShardProcessorFactory.LOG.debug("Checkpointing stream [{}]", KinesisShardProcessorFactory.this.kinesisStreamName);
            try {
                RetryerBuilder.newBuilder().retryIfExceptionOfType(ThrottlingException.class).withWaitStrategy(WaitStrategies.fixedWait(1L, TimeUnit.SECONDS)).withStopStrategy(StopStrategies.stopAfterDelay(10L, TimeUnit.MINUTES)).withRetryListener(new RetryListener() { // from class: org.graylog.integrations.aws.transports.KinesisShardProcessorFactory.KinesisShardProcessor.1
                    public <V> void onRetry(Attempt<V> attempt) {
                        if (attempt.hasException()) {
                            KinesisShardProcessorFactory.LOG.warn("Checkpointing stream [{}] failed, retrying. (attempt {})", KinesisShardProcessorFactory.this.kinesisStreamName, Long.valueOf(attempt.getAttemptNumber()));
                        }
                    }
                }).build().call(() -> {
                    try {
                        if (str != null) {
                            recordProcessorCheckpointer.checkpoint(str);
                        } else {
                            recordProcessorCheckpointer.checkpoint();
                        }
                        return null;
                    } catch (ShutdownException e) {
                        KinesisShardProcessorFactory.LOG.debug("Processor is shutting down, skipping checkpoint");
                        return null;
                    } catch (InvalidStateException e2) {
                        KinesisShardProcessorFactory.LOG.error("Couldn't save checkpoint to DynamoDB table used by the Kinesis client library - check database table", e2);
                        return null;
                    }
                });
            } catch (RetryException e) {
                KinesisShardProcessorFactory.LOG.error("Checkpoint retry for stream [{}] finally failed", KinesisShardProcessorFactory.this.kinesisStreamName, e);
            } catch (ExecutionException e2) {
                KinesisShardProcessorFactory.LOG.error("Couldn't checkpoint stream [{}]", KinesisShardProcessorFactory.this.kinesisStreamName, e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisShardProcessorFactory(ObjectMapper objectMapper, KinesisTransport kinesisTransport, Consumer<byte[]> consumer, String str, AWSMessageType aWSMessageType) {
        this.objectMapper = objectMapper;
        this.transport = kinesisTransport;
        this.handleMessageCallback = (Consumer) Objects.requireNonNull(consumer, "dataHandler");
        this.kinesisStreamName = str;
        this.kinesisPayloadDecoder = new KinesisPayloadDecoder(objectMapper, aWSMessageType, str);
    }

    public ShardRecordProcessor shardRecordProcessor() {
        return new KinesisShardProcessor();
    }
}
