/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.integrations.aws.transports;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.graylog.integrations.aws.AWSMessageType;
import org.graylog.integrations.aws.cloudwatch.KinesisLogEntry;
import org.graylog.integrations.aws.transports.KinesisPayloadDecoder;
import org.graylog.integrations.aws.transports.KinesisTransport;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

public class KinesisShardProcessorFactory
implements ShardRecordProcessorFactory {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisShardProcessor.class);
    private final String kinesisStreamName;
    private final ObjectMapper objectMapper;
    private final KinesisTransport transport;
    private final Consumer<byte[]> handleMessageCallback;
    private final KinesisPayloadDecoder kinesisPayloadDecoder;

    KinesisShardProcessorFactory(ObjectMapper objectMapper, KinesisTransport transport, Consumer<byte[]> handleMessageCallback, String kinesisStreamName, AWSMessageType awsMessageType) {
        this.objectMapper = objectMapper;
        this.transport = transport;
        this.handleMessageCallback = Objects.requireNonNull(handleMessageCallback, "dataHandler");
        this.kinesisStreamName = kinesisStreamName;
        this.kinesisPayloadDecoder = new KinesisPayloadDecoder(objectMapper, awsMessageType, kinesisStreamName);
    }

    public ShardRecordProcessor shardRecordProcessor() {
        return new KinesisShardProcessor();
    }

    public class KinesisShardProcessor
    implements ShardRecordProcessor {
        private DateTime lastCheckpoint = DateTime.now((DateTimeZone)DateTimeZone.UTC);

        public void initialize(InitializationInput initializationInput) {
            LOG.debug("Initializing Kinesis worker for stream [{}].", (Object)KinesisShardProcessorFactory.this.kinesisStreamName);
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            LOG.debug("Received [{}] Kinesis events.", (Object)processRecordsInput.records().size());
            if (KinesisShardProcessorFactory.this.transport.isThrottled()) {
                LOG.info("[throttled] The Kinesis consumer will pause message processing until the throttle state clears.");
                KinesisShardProcessorFactory.this.transport.blockUntilUnthrottled();
                LOG.info("[unthrottled] Kinesis consumer will now resume processing records.");
            }
            for (KinesisClientRecord record : processRecordsInput.records()) {
                try {
                    ByteBuffer dataBuffer = record.data().asReadOnlyBuffer();
                    byte[] dataBytes = new byte[dataBuffer.remaining()];
                    dataBuffer.get(dataBytes);
                    List<KinesisLogEntry> kinesisLogEntries = KinesisShardProcessorFactory.this.kinesisPayloadDecoder.processMessages(dataBytes, record.approximateArrivalTimestamp());
                    for (KinesisLogEntry kinesisLogEntry : kinesisLogEntries) {
                        KinesisShardProcessorFactory.this.handleMessageCallback.accept(KinesisShardProcessorFactory.this.objectMapper.writeValueAsBytes((Object)kinesisLogEntry));
                    }
                }
                catch (Exception e) {
                    LOG.error("Could not read Kinesis record from stream [{}]", (Object)KinesisShardProcessorFactory.this.kinesisStreamName, (Object)e);
                }
            }
            if (this.lastCheckpoint.plusMinutes(1).isBeforeNow()) {
                this.lastCheckpoint = DateTime.now((DateTimeZone)DateTimeZone.UTC);
                this.checkpoint(processRecordsInput.checkpointer(), null);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            LOG.debug("Lease lost.");
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            this.checkpoint(shardEndedInput.checkpointer(), null);
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            LOG.debug("Beginning shutdown Kinesis worker for stream [{}].", (Object)KinesisShardProcessorFactory.this.kinesisStreamName);
            this.checkpoint(shutdownRequestedInput.checkpointer(), null);
        }

        private void checkpoint(RecordProcessorCheckpointer checkpointer, String lastSequence) {
            LOG.debug("Checkpointing stream [{}]", (Object)KinesisShardProcessorFactory.this.kinesisStreamName);
            Retryer retryer = RetryerBuilder.newBuilder().retryIfExceptionOfType(ThrottlingException.class).withWaitStrategy(WaitStrategies.fixedWait((long)1L, (TimeUnit)TimeUnit.SECONDS)).withStopStrategy(StopStrategies.stopAfterDelay((long)10L, (TimeUnit)TimeUnit.MINUTES)).withRetryListener(new RetryListener(){

                public <V> void onRetry(Attempt<V> attempt) {
                    if (attempt.hasException()) {
                        LOG.warn("Checkpointing stream [{}] failed, retrying. (attempt {})", (Object)KinesisShardProcessorFactory.this.kinesisStreamName, (Object)attempt.getAttemptNumber());
                    }
                }
            }).build();
            try {
                retryer.call(() -> {
                    try {
                        if (lastSequence != null) {
                            checkpointer.checkpoint(lastSequence);
                        } else {
                            checkpointer.checkpoint();
                        }
                    }
                    catch (InvalidStateException e) {
                        LOG.error("Couldn't save checkpoint to DynamoDB table used by the Kinesis client library - check database table", (Throwable)e);
                    }
                    catch (ShutdownException e) {
                        LOG.debug("Processor is shutting down, skipping checkpoint");
                    }
                    return null;
                });
            }
            catch (ExecutionException e) {
                LOG.error("Couldn't checkpoint stream [{}]", (Object)KinesisShardProcessorFactory.this.kinesisStreamName, (Object)e);
            }
            catch (RetryException e) {
                LOG.error("Checkpoint retry for stream [{}] finally failed", (Object)KinesisShardProcessorFactory.this.kinesisStreamName, (Object)e);
            }
        }
    }
}

