package org.voltdb.dr2.snapshot;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.voltcore.utils.DBBPool;
import org.voltdb.DRLogSegmentId;
import org.voltdb.dr2.DRPartitionStreamReader;
import org.voltdb.dr2.DRProducer;
import org.voltdb.dr2.InvocationBuffer;

/* loaded from: input_file:org/voltdb/dr2/snapshot/StreamingDRPartitionStreamReader.class */
public abstract class StreamingDRPartitionStreamReader implements DRPartitionStreamReader {
    private static final int MEMORY_LIMIT = (Integer.getInteger("DR_MEM_LIMIT", 10).intValue() * 1024) * 1024;
    static final int MAX_INFLIGHT_BUFFERS = Integer.getInteger("DR_MAX_INFLIGHT_SNAPSHOT_BUFFERS", 3).intValue();
    final DRPartitionStreamReader.SubscribeId m_subscribeId;
    private final long m_consumerCreationId;
    private final ExecutorService m_compressionExecutor;
    protected final LinkedBlockingQueue<InvocationBuffer> m_bufferQueue = new LinkedBlockingQueue<>();
    final Queue<Long> m_inFlightBuffers = new ArrayDeque();
    private final AtomicLong m_bytesInMemory = new AtomicLong(0);
    private final Semaphore m_bytesInMemoryLimiter = new Semaphore(MEMORY_LIMIT);
    protected volatile long m_lastQueuedRow = 0;
    private volatile long m_lastSentRow = 0;
    private volatile long m_lastAckedRow = 0;
    private DRPartitionStreamReader.Mode m_mode = DRPartitionStreamReader.Mode.PAUSED;
    private boolean m_done = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/voltdb/dr2/snapshot/StreamingDRPartitionStreamReader$SyncSnapshotBuffer.class */
    public class SyncSnapshotBuffer extends InvocationBuffer {
        SyncSnapshotBuffer(long j, long j2, DBBPool.BBContainer bBContainer) {
            super(StreamingDRPartitionStreamReader.this.m_subscribeId.partitionId, j, j2, 0L, 0L, InvocationBuffer.Type.SNAPSHOT, null, new InvocationBuffer.InvocationCompressionSerialization(bBContainer));
        }

        private SyncSnapshotBuffer(DRLogSegmentId dRLogSegmentId, DBBPool.BBContainer bBContainer) {
            super(StreamingDRPartitionStreamReader.this.m_subscribeId.partitionId, dRLogSegmentId.drId, 0L, dRLogSegmentId.spUniqueId, dRLogSegmentId.mpUniqueId, InvocationBuffer.Type.END_OF_SNAPSHOT, null, bBContainer == null ? null : new InvocationBuffer.InvocationCompressionSerialization(bBContainer));
        }

        @Override // org.voltdb.dr2.InvocationBuffer
        public void release() {
            super.release();
            StreamingDRPartitionStreamReader.this.m_bytesInMemory.addAndGet(-size());
            StreamingDRPartitionStreamReader.this.m_bytesInMemoryLimiter.release(size());
            discard();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamingDRPartitionStreamReader(byte b, long j, int i) {
        this.m_subscribeId = new DRPartitionStreamReader.SubscribeId(b, j, i);
        this.m_consumerCreationId = j;
        this.m_compressionExecutor = DRProducer.getCompressionExecutor(i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void enqueue(final long j, final DBBPool.BBContainer bBContainer) throws InterruptedException {
        final int metadataSize = InvocationBuffer.metadataSize() + (2 * bBContainer.b().remaining());
        final long j2 = this.m_lastQueuedRow;
        this.m_lastQueuedRow = j;
        this.m_compressionExecutor.submit(new Runnable() { // from class: org.voltdb.dr2.snapshot.StreamingDRPartitionStreamReader.1
            static final /* synthetic */ boolean $assertionsDisabled;

            @Override // java.lang.Runnable
            public void run() {
                SyncSnapshotBuffer syncSnapshotBuffer = new SyncSnapshotBuffer(j2, j, bBContainer);
                int size = metadataSize - syncSnapshotBuffer.size();
                if (!$assertionsDisabled && size < 0) {
                    throw new AssertionError();
                }
                StreamingDRPartitionStreamReader.this.m_bytesInMemory.addAndGet(-size);
                StreamingDRPartitionStreamReader.this.m_bytesInMemoryLimiter.release(size);
                if (StreamingDRPartitionStreamReader.this.shouldTerminate()) {
                    syncSnapshotBuffer.discard();
                } else {
                    StreamingDRPartitionStreamReader.this.m_bufferQueue.offer(syncSnapshotBuffer);
                }
            }

            static {
                $assertionsDisabled = !StreamingDRPartitionStreamReader.class.desiredAssertionStatus();
            }
        });
        this.m_bytesInMemory.addAndGet(metadataSize);
        while (!shouldTerminate() && !this.m_bytesInMemoryLimiter.tryAcquire(metadataSize, 1L, TimeUnit.SECONDS)) {
        }
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public DRPartitionStreamReader.SubscribeId getSubscribeId() {
        return this.m_subscribeId;
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public void resume() {
        throw new UnsupportedOperationException("Sync snapshot doesn't do this");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startStreaming() {
        this.m_mode = DRPartitionStreamReader.Mode.NORMAL;
    }

    protected InvocationBuffer createEndOfSnapshotBuffer() {
        byte[] snapshotTrackers = getSnapshotTrackers();
        DBBPool.BBContainer allocate = InvocationBuffer.getAllocator().allocate(snapshotTrackers.length);
        ByteBuffer b = allocate.b();
        b.put(snapshotTrackers);
        b.flip();
        return new SyncSnapshotBuffer(getProducerSnapshotInfo(), allocate);
    }

    protected abstract boolean doneStreaming();

    protected abstract DRLogSegmentId getProducerSnapshotInfo();

    protected abstract byte[] getSnapshotTrackers();

    protected abstract boolean shouldTerminate();

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public InvocationBuffer getToSend() {
        if (this.m_done || this.m_inFlightBuffers.size() >= MAX_INFLIGHT_BUFFERS) {
            return null;
        }
        InvocationBuffer poll = this.m_bufferQueue.poll();
        if (poll != null) {
            this.m_lastSentRow = poll.endDRId();
            this.m_inFlightBuffers.offer(Long.valueOf(this.m_lastSentRow));
        } else if (doneStreaming() && this.m_lastAckedRow >= this.m_lastQueuedRow) {
            poll = createEndOfSnapshotBuffer();
            this.m_done = true;
        }
        return poll;
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public void ack(long j) {
        this.m_lastAckedRow = j;
        while (true) {
            Long peek = this.m_inFlightBuffers.peek();
            if (peek == null || peek.longValue() > j) {
                return;
            } else {
                this.m_inFlightBuffers.poll();
            }
        }
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public long getAcked() {
        return this.m_lastAckedRow;
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public boolean isSynced() {
        return true;
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public DRPartitionStreamReader.Mode getMode() {
        return this.m_mode;
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public long getTotalBytesInMem() {
        return this.m_bytesInMemory.get();
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public long getTotalBytes() {
        return this.m_bytesInMemory.get();
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public long getTotalBuffers() {
        return this.m_bufferQueue.size();
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public long getLastAckedDRId() {
        return this.m_lastAckedRow;
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public boolean isEmpty() {
        return this.m_bufferQueue.isEmpty();
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public void clear() {
        delete();
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public void discard() {
        delete();
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public void delete() {
        Iterator<InvocationBuffer> it = this.m_bufferQueue.iterator();
        while (it.hasNext()) {
            it.next().discard();
        }
        this.m_bufferQueue.clear();
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public long getNextDrId() {
        InvocationBuffer peek = this.m_bufferQueue.peek();
        if (peek == null) {
            return Long.MIN_VALUE;
        }
        return peek.startDRId();
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public long getDelayNanos() {
        return -1L;
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public long getTimestampFromLastAckedUniqueId() {
        throw new UnsupportedOperationException("Sync snapshot doesn't do this");
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public void pause() {
        throw new UnsupportedOperationException("Sync snapshot doesn't do this");
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public void notifyOfUpdatedLastCommandLoggedUniqueIds() {
        throw new UnsupportedOperationException("Sync snapshot doesn't do this");
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public void resetToDRId(long j) {
        throw new UnsupportedOperationException("Sync snapshot doesn't do this");
    }

    @Override // org.voltdb.dr2.DRPartitionStreamReader
    public long debugGetOldestAvailable() {
        throw new UnsupportedOperationException("Sync snapshot doesn't do this");
    }

    public long getLastQueuedRow() {
        return this.m_lastQueuedRow;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getRowsConverted() {
        return 0L;
    }
}
