package org.voltdb.rejoin;

import com.google_voltpatches.common.base.Preconditions;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import org.voltcore.logging.VoltLogger;
import org.voltcore.messaging.Mailbox;
import org.voltcore.utils.CoreUtils;
import org.voltcore.utils.DBBPool;
import org.voltdb.PrivateVoltTableFactory;
import org.voltdb.SiteProcedureConnection;
import org.voltdb.TheHashinator;
import org.voltdb.VoltDB;
import org.voltdb.utils.CachedByteBufferAllocator;

/* loaded from: input_file:org/voltdb/rejoin/StreamSnapshotSink.class */
public class StreamSnapshotSink {
    private static final VoltLogger rejoinLog = new VoltLogger("REJOIN");
    private final Mailbox m_mb;
    private StreamSnapshotDataReceiver m_in = null;
    private Thread m_inThread = null;
    private StreamSnapshotAckSender m_ack = null;
    private Thread m_ackThread = null;
    private final AtomicInteger m_expectedEOFs = new AtomicInteger();
    private boolean m_EOF = false;
    private final Map<Integer, byte[]> m_schemas = new HashMap();
    private long m_bytesReceived = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/voltdb/rejoin/StreamSnapshotSink$DecodedContainer.class */
    public static class DecodedContainer {
        final long m_srcHSId;
        final long m_dataTargetId;
        final DBBPool.BBContainer m_container;
        final int m_blockIndex;
        final StreamSnapshotMessageType m_msgType;
        final int m_tableId;

        public DecodedContainer(long j, long j2, DBBPool.BBContainer bBContainer) {
            this.m_srcHSId = j;
            this.m_dataTargetId = j2;
            this.m_container = bBContainer;
            ByteBuffer b = bBContainer.b();
            byte b2 = b.get(0);
            this.m_blockIndex = b.getInt(1);
            this.m_msgType = StreamSnapshotMessageType.values()[b2];
            if (this.m_msgType == StreamSnapshotMessageType.SCHEMA || this.m_msgType == StreamSnapshotMessageType.DATA) {
                this.m_tableId = b.getInt(5);
            } else {
                this.m_tableId = -1;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/voltdb/rejoin/StreamSnapshotSink$HashinatorRestoreWork.class */
    public static class HashinatorRestoreWork implements RestoreWork {
        private final long version;
        private final byte[] hashinatorConfig;

        public HashinatorRestoreWork(long j, byte[] bArr) {
            this.version = j;
            this.hashinatorConfig = bArr;
        }

        @Override // org.voltdb.rejoin.StreamSnapshotSink.RestoreWork
        public void restore(SiteProcedureConnection siteProcedureConnection) {
            StreamSnapshotSink.rejoinLog.debug("Updating the hashinator to version " + this.version);
            siteProcedureConnection.updateHashinator(TheHashinator.updateConfiguredHashinator(this.version, this.hashinatorConfig).getSecond());
        }
    }

    /* loaded from: input_file:org/voltdb/rejoin/StreamSnapshotSink$RestoreWork.class */
    public interface RestoreWork {
        void restore(SiteProcedureConnection siteProcedureConnection);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/voltdb/rejoin/StreamSnapshotSink$TableRestoreWork.class */
    public static class TableRestoreWork implements RestoreWork {
        private final int tableId;
        private final ByteBuffer tableBlock;

        public TableRestoreWork(int i, ByteBuffer byteBuffer) {
            this.tableId = i;
            this.tableBlock = byteBuffer;
        }

        @Override // org.voltdb.rejoin.StreamSnapshotSink.RestoreWork
        public void restore(SiteProcedureConnection siteProcedureConnection) {
            if (StreamSnapshotSink.rejoinLog.isDebugEnabled()) {
                StreamSnapshotSink.rejoinLog.debug("remaining bytes for table " + this.tableId + " is " + this.tableBlock.remaining());
            }
            siteProcedureConnection.loadTable(Long.MIN_VALUE, Long.MIN_VALUE, Long.MIN_VALUE, this.tableId, PrivateVoltTableFactory.createVoltTableFromBuffer(this.tableBlock.duplicate(), true), false, false, false);
        }
    }

    public StreamSnapshotSink(Mailbox mailbox) {
        Preconditions.checkArgument(mailbox != null);
        this.m_mb = mailbox;
    }

    public long initialize(int i, Queue<DBBPool.BBContainer> queue, Queue<DBBPool.BBContainer> queue2) {
        this.m_expectedEOFs.set(i);
        this.m_in = new StreamSnapshotDataReceiver(this.m_mb, queue, queue2);
        this.m_inThread = new Thread(this.m_in, "Snapshot data receiver");
        this.m_inThread.setDaemon(true);
        this.m_ack = new StreamSnapshotAckSender(this.m_mb);
        this.m_ackThread = new Thread(this.m_ack, "Snapshot ack sender");
        this.m_inThread.start();
        this.m_ackThread.start();
        return this.m_mb.getHSId();
    }

    public boolean isEOF() {
        return this.m_EOF;
    }

    public void close() {
        if (this.m_in != null) {
            this.m_in.close();
            this.m_inThread.interrupt();
            try {
                this.m_inThread.join();
            } catch (InterruptedException e) {
            }
        }
        if (this.m_ack != null) {
            this.m_ack.close();
            try {
                this.m_ackThread.join();
            } catch (InterruptedException e2) {
            }
        }
        this.m_in = null;
        this.m_ack = null;
    }

    public static ByteBuffer getNextChunk(byte[] bArr, ByteBuffer byteBuffer, CachedByteBufferAllocator cachedByteBufferAllocator) {
        byteBuffer.position(byteBuffer.position() + 4);
        ByteBuffer allocate = cachedByteBufferAllocator.allocate(bArr.length + byteBuffer.remaining());
        allocate.put(bArr);
        allocate.put(byteBuffer);
        allocate.flip();
        return allocate;
    }

    public RestoreWork take(CachedByteBufferAllocator cachedByteBufferAllocator) throws InterruptedException {
        if (this.m_in == null || this.m_ack == null) {
            return null;
        }
        RestoreWork restoreWork = null;
        while (!this.m_EOF) {
            restoreWork = processMessage(this.m_in.take(), cachedByteBufferAllocator);
            if (restoreWork != null) {
                break;
            }
        }
        return restoreWork;
    }

    public RestoreWork poll(CachedByteBufferAllocator cachedByteBufferAllocator) {
        if (this.m_in == null || this.m_ack == null) {
            return null;
        }
        return processMessage(this.m_in.poll(), cachedByteBufferAllocator);
    }

    private RestoreWork processMessage(DecodedContainer decodedContainer, CachedByteBufferAllocator cachedByteBufferAllocator) {
        if (decodedContainer == null) {
            return null;
        }
        RestoreWork restoreWork = null;
        try {
            if (decodedContainer.m_msgType == StreamSnapshotMessageType.FAILURE) {
                VoltDB.crashLocalVoltDB("Rejoin source sent failure message.", false, null);
                if (this.m_expectedEOFs.decrementAndGet() == 0) {
                    this.m_EOF = true;
                }
            } else if (decodedContainer.m_msgType == StreamSnapshotMessageType.END) {
                if (rejoinLog.isTraceEnabled()) {
                    rejoinLog.trace("Got END message " + decodedContainer.m_blockIndex + " from " + CoreUtils.hsIdToString(decodedContainer.m_srcHSId) + " (TargetId " + decodedContainer.m_dataTargetId + ")");
                }
                if (this.m_expectedEOFs.decrementAndGet() == 0) {
                    this.m_EOF = true;
                }
            } else if (decodedContainer.m_msgType == StreamSnapshotMessageType.SCHEMA) {
                rejoinLog.trace("Got SCHEMA message " + decodedContainer.m_blockIndex + " from " + CoreUtils.hsIdToString(decodedContainer.m_srcHSId) + " (TargetId " + decodedContainer.m_dataTargetId + ")");
                ByteBuffer b = decodedContainer.m_container.b();
                b.position(9);
                byte[] bArr = new byte[b.remaining()];
                b.get(bArr);
                this.m_schemas.put(Integer.valueOf(decodedContainer.m_tableId), bArr);
            } else if (decodedContainer.m_msgType == StreamSnapshotMessageType.HASHINATOR) {
                ByteBuffer b2 = decodedContainer.m_container.b();
                b2.position(9);
                long j = b2.getLong();
                byte[] bArr2 = new byte[b2.remaining()];
                b2.get(bArr2);
                restoreWork = new HashinatorRestoreWork(j, bArr2);
            } else {
                rejoinLog.trace("Got DATA message " + decodedContainer.m_blockIndex + " from " + CoreUtils.hsIdToString(decodedContainer.m_srcHSId) + " (TargetId " + decodedContainer.m_dataTargetId + ")");
                ByteBuffer b3 = decodedContainer.m_container.b();
                if (!this.m_schemas.containsKey(Integer.valueOf(decodedContainer.m_tableId))) {
                    VoltDB.crashLocalVoltDB("No schema for table with ID " + decodedContainer.m_tableId, false, null);
                }
                b3.position(9);
                ByteBuffer nextChunk = getNextChunk(this.m_schemas.get(Integer.valueOf(decodedContainer.m_tableId)), b3, cachedByteBufferAllocator);
                this.m_bytesReceived += nextChunk.remaining();
                restoreWork = new TableRestoreWork(decodedContainer.m_tableId, nextChunk);
            }
            return restoreWork;
        } finally {
            decodedContainer.m_container.discard();
            this.m_ack.ack(decodedContainer.m_srcHSId, decodedContainer.m_msgType == StreamSnapshotMessageType.END, decodedContainer.m_dataTargetId, decodedContainer.m_blockIndex);
        }
    }

    public long bytesTransferred() {
        return this.m_bytesReceived;
    }
}
