package org.voltdb.importclient.kinesis;

import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.Record;
import java.math.BigInteger;
import java.net.URI;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons_voltpatches.cli.HelpFormatter;
import org.voltcore.logging.Level;
import org.voltdb.client.ClientResponse;
import org.voltdb.client.ProcedureCallback;
import org.voltdb.importer.AbstractImporter;
import org.voltdb.importer.Invocation;
import org.voltdb.importer.formatter.FormatException;
import org.voltdb.importer.formatter.Formatter;

/* loaded from: input_file:org/voltdb/importclient/kinesis/KinesisStreamImporter.class */
public class KinesisStreamImporter extends AbstractImporter {
    private KinesisStreamImporterConfig m_config;
    private Worker m_worker;
    private AtomicLong m_submitCount = new AtomicLong(0);
    private AtomicLong m_cbcnt = new AtomicLong(0);
    private String m_workerId = UUID.randomUUID().toString();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/voltdb/importclient/kinesis/KinesisStreamImporter$Gap.class */
    public final class Gap {
        long[] lag;
        final int lagLen;
        BigInteger[] checkpoints;
        long c = 0;
        long s = -1;
        long offer = -1;
        private final long gapTrackerCheckMaxTimeMs = 2000;

        Gap(int i) {
            if (i <= 0) {
                throw new IllegalArgumentException("leeways is zero or negative");
            }
            this.lagLen = i;
            this.checkpoints = new BigInteger[(int) KinesisStreamImporter.this.m_config.getMaxReadBatchSize()];
        }

        synchronized void resetTo() {
            Arrays.fill(this.checkpoints, (Object) null);
            this.c = 0L;
            this.s = -1L;
            this.lag = new long[this.lagLen];
        }

        synchronized void submit(long j, BigInteger bigInteger) {
            if (validateOffset((int) j) && bigInteger != null && this.checkpoints[(int) j] == null) {
                if (this.s == -1 && j >= 0) {
                    long[] jArr = this.lag;
                    long idx = idx(j);
                    this.s = j;
                    this.c = j;
                    this[j] = idx;
                }
                if (j - this.c >= this.lag.length) {
                    this.offer = j;
                    try {
                        wait(2000L);
                    } catch (InterruptedException e) {
                        KinesisStreamImporter.this.rateLimitedLog(Level.WARN, e, "Gap tracker wait was interrupted." + KinesisStreamImporter.this.m_config.getResourceID().toString(), new Object[0]);
                    }
                }
                if (j > this.s) {
                    this.s = j;
                }
                this.checkpoints[(int) j] = bigInteger;
            }
        }

        private final int idx(long j) {
            return (int) (j % this.lagLen);
        }

        synchronized void commit(long j, BigInteger bigInteger) {
            if (!validateOffset((int) j) || bigInteger == null || this.checkpoints[(int) j] == null || j > this.s || j <= this.c || !bigInteger.equals(this.checkpoints[(int) j])) {
                return;
            }
            int min = (int) Math.min(this.lagLen, j - this.c);
            if (min == this.lagLen) {
                this.c = (j - this.lagLen) + 1;
                this.lag[idx(this.c)] = this.c;
            }
            this.lag[idx(j)] = j;
            while (min > 0 && this.lag[idx(this.c)] + 1 == this.lag[idx(this.c + 1)]) {
                this.c++;
            }
            if (this.offer < 0 || this.offer - this.c >= this.lag.length) {
                return;
            }
            this.offer = -1L;
            notify();
        }

        synchronized BigInteger getSafeCommitPoint() {
            if (this.checkpoints == null || !validateOffset((int) this.c)) {
                return null;
            }
            return this.checkpoints[(int) this.c];
        }

        private boolean validateOffset(int i) {
            return i >= 0 && i < this.checkpoints.length;
        }
    }

    /* loaded from: input_file:org/voltdb/importclient/kinesis/KinesisStreamImporter$RecordProcessorFactory.class */
    private class RecordProcessorFactory implements IRecordProcessorFactory {
        private RecordProcessorFactory() {
        }

        public IRecordProcessor createProcessor() {
            return new StreamConsumer();
        }
    }

    /* loaded from: input_file:org/voltdb/importclient/kinesis/KinesisStreamImporter$StreamConsumer.class */
    private class StreamConsumer implements IRecordProcessor {
        private Formatter m_formatter;
        Gap m_gapTracker;
        private String m_shardId = new String("unknown");
        private BigInteger m_lastFetchCommittedSequenceNumber = BigInteger.ZERO;

        public StreamConsumer() {
            this.m_gapTracker = new Gap(Integer.getInteger("KINESIS_IMPORT_GAP_LEAD", 32768).intValue());
        }

        public void initialize(InitializationInput initializationInput) {
            this.m_shardId = initializationInput.getShardId();
            this.m_formatter = KinesisStreamImporter.this.m_config.getFormatterBuilder().create();
            String sequenceNumber = initializationInput.getExtendedSequenceNumber().getSequenceNumber();
            if (NumberUtils.isDigits(sequenceNumber)) {
                this.m_lastFetchCommittedSequenceNumber = new BigInteger(sequenceNumber);
            }
            KinesisStreamImporter.this.info(null, "Initializing Kinesis stream processing for shard %s, last committed on: %s", this.m_shardId, sequenceNumber);
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            if (processRecordsInput.getRecords().isEmpty()) {
                return;
            }
            BigInteger bigInteger = BigInteger.ZERO;
            this.m_gapTracker.resetTo();
            int i = 0;
            for (Record record : processRecordsInput.getRecords()) {
                KinesisStreamImporter.this.m_submitCount.incrementAndGet();
                BigInteger bigInteger2 = new BigInteger(record.getSequenceNumber());
                if (bigInteger2.compareTo(this.m_lastFetchCommittedSequenceNumber) >= 0) {
                    if (KinesisStreamImporter.this.isDebugEnabled()) {
                        KinesisStreamImporter.this.debug(null, "last committed seq: %s, current seq:%s shard %s", this.m_lastFetchCommittedSequenceNumber.toString(), record.getSequenceNumber(), this.m_shardId);
                        if (bigInteger2.compareTo(bigInteger) < 0) {
                            KinesisStreamImporter.this.debug(null, "Record %d is out of sequence on shard %s", bigInteger2, this.m_shardId);
                        } else {
                            bigInteger = bigInteger2;
                        }
                    }
                    Object[] objArr = null;
                    try {
                        objArr = this.m_formatter.transform(record.getData());
                        if (!KinesisStreamImporter.this.callProcedure(new Invocation(KinesisStreamImporter.this.m_config.getProcedure(), objArr), new StreamProcedureCallback(this.m_gapTracker, i, bigInteger2, KinesisStreamImporter.this.m_cbcnt))) {
                            KinesisStreamImporter.this.rateLimitedLog(Level.ERROR, null, "Call procedure error on shard %s", this.m_shardId);
                            this.m_gapTracker.commit(i, bigInteger2);
                        }
                    } catch (FormatException e) {
                        KinesisStreamImporter.this.rateLimitedLog(Level.ERROR, e, "Data error on shard %s, data: %s", this.m_shardId, Arrays.toString(objArr));
                        this.m_gapTracker.commit(i, bigInteger2);
                    }
                    if (!KinesisStreamImporter.this.shouldRun()) {
                        break;
                    } else {
                        i++;
                    }
                }
            }
            commitCheckPoint(processRecordsInput.getCheckpointer());
        }

        public void shutdown(ShutdownInput shutdownInput) {
            if (KinesisStreamImporter.this.isDebugEnabled()) {
                KinesisStreamImporter.this.debug(null, "shard ID: " + this.m_shardId + ", shutdown reason: " + shutdownInput.getShutdownReason().name(), new Object[0]);
            }
            if (ShutdownReason.TERMINATE.equals(shutdownInput.getShutdownReason())) {
                commitCheckPoint(shutdownInput.getCheckpointer());
            }
        }

        private void commitCheckPoint(IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
            BigInteger safeCommitPoint;
            int i = 1;
            while (i < 4 && KinesisStreamImporter.this.shouldRun() && (safeCommitPoint = this.m_gapTracker.getSafeCommitPoint()) != null) {
                if (KinesisStreamImporter.this.isDebugEnabled()) {
                    KinesisStreamImporter.this.debug(null, "New checkpoint %s, last checkpoint %s on shard %s", safeCommitPoint.toString(), this.m_lastFetchCommittedSequenceNumber.toString(), this.m_shardId);
                }
                if (safeCommitPoint.compareTo(this.m_lastFetchCommittedSequenceNumber) > 0) {
                    if (KinesisStreamImporter.this.isDebugEnabled()) {
                        KinesisStreamImporter.this.debug(null, "Trying to checkpoint %s on shard %s", safeCommitPoint.toString(), this.m_shardId);
                    }
                    try {
                        iRecordProcessorCheckpointer.checkpoint(safeCommitPoint.toString());
                        this.m_lastFetchCommittedSequenceNumber = safeCommitPoint;
                        return;
                    } catch (Exception e) {
                        KinesisStreamImporter.this.rateLimitedLog(Level.WARN, e, "Skipping checkpoint %s on shard %s. Reason: %s", safeCommitPoint.toString(), this.m_shardId, e.getMessage());
                        return;
                    } catch (ThrottlingException e2) {
                        KinesisStreamImporter.this.rateLimitedLog(Level.INFO, null, "Checkpoint attempt  %d on shard %s", Integer.valueOf(i), this.m_shardId);
                    }
                }
                int i2 = i;
                i++;
                KinesisStreamImporter.this.backoffSleep(i2);
            }
        }
    }

    /* loaded from: input_file:org/voltdb/importclient/kinesis/KinesisStreamImporter$StreamProcedureCallback.class */
    private static final class StreamProcedureCallback implements ProcedureCallback {
        private final Gap m_tracker;
        private final int m_offset;
        private final BigInteger m_seq;
        private final AtomicLong m_cbcnt;

        public StreamProcedureCallback(Gap gap, int i, BigInteger bigInteger, AtomicLong atomicLong) {
            this.m_tracker = gap;
            this.m_offset = i;
            this.m_seq = bigInteger;
            this.m_cbcnt = atomicLong;
            this.m_tracker.submit(this.m_offset, this.m_seq);
        }

        @Override // org.voltdb.client.ProcedureCallback
        public void clientCallback(ClientResponse clientResponse) throws Exception {
            this.m_tracker.commit(this.m_offset, this.m_seq);
            this.m_cbcnt.incrementAndGet();
        }
    }

    public KinesisStreamImporter(KinesisStreamImporterConfig kinesisStreamImporterConfig) {
        this.m_config = kinesisStreamImporterConfig;
        KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(this.m_config.getAppName(), this.m_config.getStreamName(), credentials(), this.m_workerId);
        kinesisClientLibConfiguration.withRegionName(this.m_config.getRegion()).withMaxRecords((int) this.m_config.getMaxReadBatchSize()).withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON).withIdleTimeBetweenReadsInMillis(this.m_config.getIdleTimeBetweenReads()).withMetricsLevel(MetricsLevel.NONE).withTaskBackoffTimeMillis(this.m_config.getTaskBackoffTimeMillis()).withKinesisClientConfig(KinesisStreamImporterConfig.getClientConfigWithUserAgent(this.m_config.getAppName()));
        this.m_worker = new Worker.Builder().recordProcessorFactory(new RecordProcessorFactory()).config(kinesisClientLibConfiguration).build();
    }

    @Override // org.voltdb.importer.AbstractImporter
    public URI getResourceID() {
        return this.m_config.getResourceID();
    }

    @Override // org.voltdb.importer.AbstractImporter
    public String getTaskThreadName() {
        return getName() + HelpFormatter.DEFAULT_OPT_PREFIX + this.m_config.getAppName() + HelpFormatter.DEFAULT_OPT_PREFIX + this.m_workerId;
    }

    @Override // org.voltdb.importer.AbstractImporter
    public void accept() {
        info(null, "Starting data stream fetcher for " + this.m_config.getResourceID().toString(), new Object[0]);
        try {
            this.m_worker.run();
        } catch (RuntimeException e) {
            rateLimitedLog(Level.ERROR, e, "Error in Kinesis stream importer %s", this.m_config.getResourceID());
            this.m_worker.shutdown();
        }
        info(null, "Data stream fetcher stopped for %s. Callback Rcvd: %d. Submitted: %d", this.m_config.getResourceID().toString(), Long.valueOf(this.m_cbcnt.get()), Long.valueOf(this.m_submitCount.get()));
    }

    @Override // org.voltdb.importer.AbstractImporter, org.voltdb.importer.ImporterLifecycle
    public void stop() {
        this.m_worker.shutdown();
    }

    @Override // org.voltdb.InternalConnectionContext
    public String getName() {
        return KinesisStreamImporterConfig.APP_NAME;
    }

    public AWSCredentialsProvider credentials() throws AmazonClientException {
        return new StaticCredentialsProvider(new BasicAWSCredentials(this.m_config.getAccessKey(), this.m_config.getSecretKey()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void backoffSleep(int i) {
        try {
            Thread.sleep(200 * i);
        } catch (InterruptedException e) {
            rateLimitedLog(Level.WARN, e, "Interrupted sleep when checkpointing.", new Object[0]);
        }
    }
}
