package software.amazon.kinesis.retrieval.polling;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.lang3.Validate;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

@KinesisClientInternalApi
/* loaded from: input_file:software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.class */
public class PrefetchRecordsPublisher implements RecordsPublisher {
    private static final Logger log = LoggerFactory.getLogger(PrefetchRecordsPublisher.class);
    private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
    LinkedBlockingQueue<ProcessRecordsInput> getRecordsResultQueue;
    private int maxPendingProcessRecordsInput;
    private int maxByteSize;
    private int maxRecordsCount;
    private final int maxRecordsPerCall;
    private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
    private final ExecutorService executorService;
    private final MetricsFactory metricsFactory;
    private final long idleMillisBetweenCalls;
    private Instant lastSuccessfulCall;
    private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
    private PrefetchCounters prefetchCounters;
    private final String operation;
    private final KinesisDataFetcher dataFetcher;
    private final String shardId;
    private Subscriber<? super ProcessRecordsInput> subscriber;
    private boolean started = false;
    private final AtomicLong requestedResponses = new AtomicLong(0);

    /* loaded from: input_file:software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.class */
    private class DefaultGetRecordsCacheDaemon implements Runnable {
        volatile boolean isShutdown;

        private DefaultGetRecordsCacheDaemon() {
            this.isShutdown = false;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                if (this.isShutdown) {
                    break;
                }
                if (Thread.currentThread().isInterrupted()) {
                    PrefetchRecordsPublisher.log.warn("Prefetch thread was interrupted.");
                    break;
                }
                MetricsScope createMetricsWithOperation = MetricsUtil.createMetricsWithOperation(PrefetchRecordsPublisher.this.metricsFactory, PrefetchRecordsPublisher.this.operation);
                if (PrefetchRecordsPublisher.this.prefetchCounters.shouldGetNewRecords()) {
                    try {
                        try {
                            try {
                                sleepBeforeNextCall();
                                GetRecordsResponse records = PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.getRecords(PrefetchRecordsPublisher.this.maxRecordsPerCall);
                                PrefetchRecordsPublisher.this.lastSuccessfulCall = Instant.now();
                                PrefetchRecordsPublisher.this.addArrivedRecordsInput(ProcessRecordsInput.builder().records((List) records.records().stream().map(KinesisClientRecord::fromRecord).collect(Collectors.toList())).millisBehindLatest(records.millisBehindLatest()).cacheEntryTime(PrefetchRecordsPublisher.this.lastSuccessfulCall).isAtShardEnd(PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.getDataFetcher().isShardEndReached()).build());
                                PrefetchRecordsPublisher.this.drainQueueForRequests();
                                MetricsUtil.endScope(createMetricsWithOperation);
                            } catch (ExpiredIteratorException e) {
                                PrefetchRecordsPublisher.log.info("ShardId {}: records threw ExpiredIteratorException - restarting after greatest seqNum passed to customer", PrefetchRecordsPublisher.this.shardId, e);
                                createMetricsWithOperation.addData(PrefetchRecordsPublisher.EXPIRED_ITERATOR_METRIC, 1.0d, StandardUnit.COUNT, MetricsLevel.SUMMARY);
                                PrefetchRecordsPublisher.this.dataFetcher.restartIterator();
                                MetricsUtil.endScope(createMetricsWithOperation);
                            } catch (InterruptedException e2) {
                                PrefetchRecordsPublisher.log.info("Thread was interrupted, indicating shutdown was called on the cache.");
                                MetricsUtil.endScope(createMetricsWithOperation);
                            }
                        } catch (SdkClientException e3) {
                            PrefetchRecordsPublisher.log.error("Exception thrown while fetching records from Kinesis", e3);
                            MetricsUtil.endScope(createMetricsWithOperation);
                        } catch (Throwable th) {
                            PrefetchRecordsPublisher.log.error("Unexpected exception was thrown. This could probably be an issue or a bug. Please search for the exception/error online to check what is going on. If the issue persists or is a recurring problem, feel free to open an issue on, https://github.com/awslabs/amazon-kinesis-client.", th);
                            MetricsUtil.endScope(createMetricsWithOperation);
                        }
                    } catch (Throwable th2) {
                        MetricsUtil.endScope(createMetricsWithOperation);
                        throw th2;
                    }
                } else {
                    try {
                        PrefetchRecordsPublisher.this.prefetchCounters.waitForConsumer();
                    } catch (InterruptedException e4) {
                        PrefetchRecordsPublisher.log.info("Thread was interrupted while waiting for the consumer.  Shutdown has probably been started");
                    }
                }
            }
            callShutdownOnStrategy();
        }

        private void callShutdownOnStrategy() {
            if (PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.isShutdown()) {
                return;
            }
            PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.shutdown();
        }

        private void sleepBeforeNextCall() throws InterruptedException {
            if (PrefetchRecordsPublisher.this.lastSuccessfulCall == null) {
                return;
            }
            long millis = Duration.between(PrefetchRecordsPublisher.this.lastSuccessfulCall, Instant.now()).abs().toMillis();
            if (millis < PrefetchRecordsPublisher.this.idleMillisBetweenCalls) {
                Thread.sleep(PrefetchRecordsPublisher.this.idleMillisBetweenCalls - millis);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher$PrefetchCounters.class */
    public class PrefetchCounters {
        private long size;
        private long byteSize;

        private PrefetchCounters() {
            this.size = 0L;
            this.byteSize = 0L;
        }

        public synchronized void added(ProcessRecordsInput processRecordsInput) {
            this.size += getSize(processRecordsInput);
            this.byteSize += getByteSize(processRecordsInput);
        }

        public synchronized void removed(ProcessRecordsInput processRecordsInput) {
            this.size -= getSize(processRecordsInput);
            this.byteSize -= getByteSize(processRecordsInput);
            notifyAll();
        }

        private long getSize(ProcessRecordsInput processRecordsInput) {
            return processRecordsInput.records().size();
        }

        private long getByteSize(ProcessRecordsInput processRecordsInput) {
            return processRecordsInput.records().stream().mapToLong(kinesisClientRecord -> {
                return kinesisClientRecord.data().limit();
            }).sum();
        }

        public synchronized void waitForConsumer() throws InterruptedException {
            if (shouldGetNewRecords()) {
                return;
            }
            PrefetchRecordsPublisher.log.debug("Queue is full waiting for consumer for {} ms", Long.valueOf(PrefetchRecordsPublisher.this.idleMillisBetweenCalls));
            wait(PrefetchRecordsPublisher.this.idleMillisBetweenCalls);
        }

        public synchronized boolean shouldGetNewRecords() {
            if (PrefetchRecordsPublisher.log.isDebugEnabled()) {
                PrefetchRecordsPublisher.log.debug("Current Prefetch Counter States: {}", toString());
            }
            return this.size < ((long) PrefetchRecordsPublisher.this.maxRecordsCount) && this.byteSize < ((long) PrefetchRecordsPublisher.this.maxByteSize);
        }

        public String toString() {
            return String.format("{ Requests: %d, Records: %d, Bytes: %d }", Integer.valueOf(PrefetchRecordsPublisher.this.getRecordsResultQueue.size()), Long.valueOf(this.size), Long.valueOf(this.byteSize));
        }
    }

    public PrefetchRecordsPublisher(int i, int i2, int i3, int i4, @NonNull GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, @NonNull ExecutorService executorService, long j, @NonNull MetricsFactory metricsFactory, @NonNull String str, @NonNull String str2) {
        if (getRecordsRetrievalStrategy == null) {
            throw new NullPointerException("getRecordsRetrievalStrategy");
        }
        if (executorService == null) {
            throw new NullPointerException("executorService");
        }
        if (metricsFactory == null) {
            throw new NullPointerException("metricsFactory");
        }
        if (str == null) {
            throw new NullPointerException("operation");
        }
        if (str2 == null) {
            throw new NullPointerException("shardId");
        }
        this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
        this.maxRecordsPerCall = i4;
        this.maxPendingProcessRecordsInput = i;
        this.maxByteSize = i2;
        this.maxRecordsCount = i3;
        this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput);
        this.prefetchCounters = new PrefetchCounters();
        this.executorService = executorService;
        this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory);
        this.idleMillisBetweenCalls = j;
        this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
        Validate.notEmpty(str, "Operation cannot be empty", new Object[0]);
        this.operation = str;
        this.dataFetcher = this.getRecordsRetrievalStrategy.getDataFetcher();
        this.shardId = str2;
    }

    @Override // software.amazon.kinesis.retrieval.RecordsPublisher
    public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("ExecutorService has been shutdown.");
        }
        this.dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended);
        if (!this.started) {
            log.info("Starting prefetching thread.");
            this.executorService.execute(this.defaultGetRecordsCacheDaemon);
        }
        this.started = true;
    }

    ProcessRecordsInput getNextResult() {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests.");
        }
        if (!this.started) {
            throw new IllegalStateException("Cache has not been initialized, make sure to call start.");
        }
        ProcessRecordsInput processRecordsInput = null;
        try {
            processRecordsInput = this.getRecordsResultQueue.take().toBuilder().cacheExitTime(Instant.now()).build();
            this.prefetchCounters.removed(processRecordsInput);
            this.requestedResponses.decrementAndGet();
        } catch (InterruptedException e) {
            log.error("Interrupted while getting records from the cache", e);
        }
        return processRecordsInput;
    }

    @Override // software.amazon.kinesis.retrieval.RecordsPublisher
    public void shutdown() {
        this.defaultGetRecordsCacheDaemon.isShutdown = true;
        this.executorService.shutdownNow();
        this.started = false;
    }

    public void subscribe(Subscriber<? super ProcessRecordsInput> subscriber) {
        this.subscriber = subscriber;
        this.subscriber.onSubscribe(new Subscription() { // from class: software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.1
            public void request(long j) {
                PrefetchRecordsPublisher.this.requestedResponses.addAndGet(j);
                PrefetchRecordsPublisher.this.drainQueueForRequests();
            }

            public void cancel() {
                PrefetchRecordsPublisher.this.requestedResponses.set(0L);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void addArrivedRecordsInput(ProcessRecordsInput processRecordsInput) throws InterruptedException {
        this.getRecordsResultQueue.put(processRecordsInput);
        this.prefetchCounters.added(processRecordsInput);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void drainQueueForRequests() {
        while (this.requestedResponses.get() > 0 && !this.getRecordsResultQueue.isEmpty()) {
            this.subscriber.onNext(getNextResult());
        }
    }
}
