package org.apache.iotdb.db.wal.buffer;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
import org.apache.iotdb.db.wal.io.WALMetaData;
import org.apache.iotdb.db.wal.utils.WALFileStatus;
import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/wal/buffer/WALBuffer.class */
public class WALBuffer extends AbstractWALBuffer {
    private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
    private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
    private static final int HALF_WAL_BUFFER_SIZE = config.getWalBufferSize() / 2;
    private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity();
    private volatile boolean isClosed;
    private final BlockingQueue<WALEntry> walEntries;
    private final Lock buffersLock;
    private final Condition idleBufferReadyCondition;
    private volatile ByteBuffer workingBuffer;
    private volatile ByteBuffer idleBuffer;
    private volatile ByteBuffer syncingBuffer;
    protected volatile WALFileStatus currentFileStatus;
    private final ExecutorService serializeThread;
    private final ExecutorService syncBufferThread;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/wal/buffer/WALBuffer$ByteBufferView.class */
    public class ByteBufferView implements IWALByteBufferView {
        private int flushedBytesNum;

        private ByteBufferView() {
            this.flushedBytesNum = 0;
        }

        private void ensureEnoughSpace(int i) {
            if (WALBuffer.this.workingBuffer.remaining() < i) {
                rollBuffer();
            }
        }

        private void rollBuffer() {
            this.flushedBytesNum += WALBuffer.this.workingBuffer.position();
            WALBuffer.this.syncWorkingBuffer(WALBuffer.this.currentSearchIndex, WALBuffer.this.currentFileStatus);
        }

        @Override // org.apache.iotdb.db.wal.buffer.IWALByteBufferView
        public void put(byte b) {
            ensureEnoughSpace(1);
            WALBuffer.this.workingBuffer.put(b);
        }

        @Override // org.apache.iotdb.db.wal.buffer.IWALByteBufferView
        public void put(byte[] bArr) {
            int i = 0;
            while (true) {
                int remaining = WALBuffer.this.workingBuffer.remaining();
                int length = bArr.length - i;
                if (remaining >= length) {
                    WALBuffer.this.workingBuffer.put(bArr, i, length);
                    return;
                } else {
                    WALBuffer.this.workingBuffer.put(bArr, i, remaining);
                    i += remaining;
                    rollBuffer();
                }
            }
        }

        @Override // org.apache.iotdb.db.wal.buffer.IWALByteBufferView
        public void putChar(char c) {
            ensureEnoughSpace(2);
            WALBuffer.this.workingBuffer.putChar(c);
        }

        @Override // org.apache.iotdb.db.wal.buffer.IWALByteBufferView
        public void putShort(short s) {
            ensureEnoughSpace(2);
            WALBuffer.this.workingBuffer.putShort(s);
        }

        @Override // org.apache.iotdb.db.wal.buffer.IWALByteBufferView
        public void putInt(int i) {
            ensureEnoughSpace(4);
            WALBuffer.this.workingBuffer.putInt(i);
        }

        @Override // org.apache.iotdb.db.wal.buffer.IWALByteBufferView
        public void putLong(long j) {
            ensureEnoughSpace(8);
            WALBuffer.this.workingBuffer.putLong(j);
        }

        @Override // org.apache.iotdb.db.wal.buffer.IWALByteBufferView
        public void putFloat(float f) {
            ensureEnoughSpace(4);
            WALBuffer.this.workingBuffer.putFloat(f);
        }

        @Override // org.apache.iotdb.db.wal.buffer.IWALByteBufferView
        public void putDouble(double d) {
            ensureEnoughSpace(8);
            WALBuffer.this.workingBuffer.putDouble(d);
        }

        @Override // org.apache.iotdb.db.wal.buffer.IWALByteBufferView
        public int position() {
            return this.flushedBytesNum + WALBuffer.this.workingBuffer.position();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/wal/buffer/WALBuffer$SerializeInfo.class */
    public static class SerializeInfo {
        final WALMetaData metaData;
        final List<WALFlushListener> fsyncListeners;
        WALFlushListener rollWALFileWriterListener;

        private SerializeInfo() {
            this.metaData = new WALMetaData();
            this.fsyncListeners = new LinkedList();
            this.rollWALFileWriterListener = null;
        }
    }

    /* loaded from: input_file:org/apache/iotdb/db/wal/buffer/WALBuffer$SerializeTask.class */
    private class SerializeTask implements Runnable {
        private final ByteBufferView byteBufferVew;
        private final SerializeInfo info;
        private int batchSize;

        private SerializeTask() {
            this.byteBufferVew = new ByteBufferView();
            this.info = new SerializeInfo();
            this.batchSize = 0;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                serialize();
            } finally {
                if (!WALBuffer.this.isClosed) {
                    WALBuffer.this.serializeThread.submit(new SerializeTask());
                }
            }
        }

        private void serialize() {
            try {
                if (handleWALEntry((WALEntry) WALBuffer.this.walEntries.take())) {
                    return;
                }
            } catch (InterruptedException e) {
                WALBuffer.logger.warn("Interrupted when waiting for taking WALEntry from blocking queue to serialize.");
                Thread.currentThread().interrupt();
            }
            long fsyncWalDelayInMs = WALBuffer.config.getFsyncWalDelayInMs();
            if (fsyncWalDelayInMs > 0) {
                try {
                    Thread.sleep(fsyncWalDelayInMs);
                } catch (InterruptedException e2) {
                    WALBuffer.logger.warn("Interrupted when sleeping a while to enlarge wal write batch.");
                    Thread.currentThread().interrupt();
                }
            }
            while (WALBuffer.this.walEntries.peek() != null && this.batchSize < WALBuffer.QUEUE_CAPACITY) {
                if (handleWALEntry((WALEntry) WALBuffer.this.walEntries.poll())) {
                    return;
                }
            }
            if (this.batchSize > 0) {
                WALBuffer.this.fsyncWorkingBuffer(WALBuffer.this.currentSearchIndex, WALBuffer.this.currentFileStatus, this.info);
            }
        }

        private boolean handleWALEntry(WALEntry wALEntry) {
            if (wALEntry.isSignal()) {
                return handleSignalEntry((WALSignalEntry) wALEntry);
            }
            if (!handleInfoEntry(wALEntry)) {
                return false;
            }
            this.batchSize++;
            this.info.fsyncListeners.add(wALEntry.getWalFlushListener());
            return false;
        }

        private boolean handleInfoEntry(WALEntry wALEntry) {
            int position = this.byteBufferVew.position();
            try {
                wALEntry.serialize(this.byteBufferVew);
                int position2 = this.byteBufferVew.position() - position;
                WALBuffer.logger.debug("wal entry size is: {}", Integer.valueOf(position2));
                long j = -1;
                if (wALEntry.getType().needSearch()) {
                    j = wALEntry.getType() == WALEntryType.DELETE_DATA_NODE ? ((DeleteDataNode) wALEntry.getValue()).getSearchIndex() : ((InsertNode) wALEntry.getValue()).getSearchIndex();
                    if (j != -1) {
                        WALBuffer.this.currentSearchIndex = j;
                        WALBuffer.this.currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
                    }
                }
                this.info.metaData.add(position2, j);
                return true;
            } catch (Exception e) {
                WALBuffer.logger.error("Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", WALBuffer.this.identifier, e);
                wALEntry.getWalFlushListener().fail(e);
                return false;
            }
        }

        private boolean handleSignalEntry(WALSignalEntry wALSignalEntry) {
            switch (wALSignalEntry.getType()) {
                case ROLL_WAL_LOG_WRITER_SIGNAL:
                    WALBuffer.logger.debug("Handle roll log writer signal for wal node-{}.", WALBuffer.this.identifier);
                    this.info.rollWALFileWriterListener = wALSignalEntry.getWalFlushListener();
                    WALBuffer.this.fsyncWorkingBuffer(WALBuffer.this.currentSearchIndex, WALBuffer.this.currentFileStatus, this.info);
                    return true;
                case CLOSE_SIGNAL:
                    WALBuffer.logger.debug("Handle close signal for wal node-{}, there are {} entries left.", WALBuffer.this.identifier, Integer.valueOf(WALBuffer.this.walEntries.size()));
                    boolean z = this.batchSize > 0;
                    if (z) {
                        WALBuffer.this.fsyncWorkingBuffer(WALBuffer.this.currentSearchIndex, WALBuffer.this.currentFileStatus, this.info);
                    }
                    return z;
                default:
                    return false;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/wal/buffer/WALBuffer$SyncBufferTask.class */
    public class SyncBufferTask implements Runnable {
        private final long searchIndex;
        private final WALFileStatus fileStatus;
        private final boolean forceFlag;
        private final SerializeInfo info;

        public SyncBufferTask(WALBuffer wALBuffer, long j, WALFileStatus wALFileStatus, boolean z) {
            this(j, wALFileStatus, z, null);
        }

        public SyncBufferTask(long j, WALFileStatus wALFileStatus, boolean z, SerializeInfo serializeInfo) {
            this.searchIndex = j;
            this.fileStatus = wALFileStatus;
            this.forceFlag = z;
            this.info = serializeInfo == null ? new SerializeInfo() : serializeInfo;
        }

        @Override // java.lang.Runnable
        public void run() {
            WALBuffer.this.currentWALFileWriter.updateFileStatus(this.fileStatus);
            try {
                WALBuffer.this.currentWALFileWriter.write(WALBuffer.this.syncingBuffer, this.info.metaData);
            } catch (Throwable th) {
                WALBuffer.logger.error("Fail to sync wal node-{}'s buffer, change system mode to error.", WALBuffer.this.identifier, th);
                CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
            } finally {
                WALBuffer.this.switchSyncingBufferToIdle();
            }
            boolean z = false;
            if (this.info.rollWALFileWriterListener != null || (this.forceFlag && WALBuffer.this.currentWALFileWriter.size() >= WALBuffer.config.getWalFileSizeThresholdInByte())) {
                try {
                    WALBuffer.this.rollLogWriter(this.searchIndex, WALBuffer.this.currentWALFileWriter.getWalFileStatus());
                    z = true;
                    if (this.info.rollWALFileWriterListener != null) {
                        this.info.rollWALFileWriterListener.succeed();
                    }
                } catch (IOException e) {
                    WALBuffer.logger.error("Fail to roll wal node-{}'s log writer, change system mode to error.", WALBuffer.this.identifier, e);
                    if (this.info.rollWALFileWriterListener != null) {
                        this.info.rollWALFileWriterListener.fail(e);
                    }
                    CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
                }
            } else if (this.forceFlag) {
                try {
                    WALBuffer.this.currentWALFileWriter.force();
                    z = true;
                } catch (IOException e2) {
                    WALBuffer.logger.error("Fail to fsync wal node-{}'s log writer, change system mode to error.", WALBuffer.this.identifier, e2);
                    Iterator<WALFlushListener> it = this.info.fsyncListeners.iterator();
                    while (it.hasNext()) {
                        it.next().fail(e2);
                    }
                    CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
                }
            }
            if (z) {
                Iterator<WALFlushListener> it2 = this.info.fsyncListeners.iterator();
                while (it2.hasNext()) {
                    it2.next().succeed();
                }
            }
        }
    }

    public WALBuffer(String str, String str2) throws FileNotFoundException {
        this(str, str2, 0L, 0L);
    }

    public WALBuffer(String str, String str2, long j, long j2) throws FileNotFoundException {
        super(str, str2, j, j2);
        this.isClosed = false;
        this.walEntries = new ArrayBlockingQueue(QUEUE_CAPACITY);
        this.buffersLock = new ReentrantLock();
        this.idleBufferReadyCondition = this.buffersLock.newCondition();
        this.currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
        allocateBuffers();
        this.serializeThread = IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.WAL_SERIALIZE.getName() + "(node-" + str + ")");
        this.syncBufferThread = IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.WAL_SYNC.getName() + "(node-" + str + ")");
        this.serializeThread.submit(new SerializeTask());
    }

    private void allocateBuffers() {
        try {
            this.workingBuffer = ByteBuffer.allocateDirect(HALF_WAL_BUFFER_SIZE);
            this.idleBuffer = ByteBuffer.allocateDirect(HALF_WAL_BUFFER_SIZE);
        } catch (OutOfMemoryError e) {
            logger.error("Fail to allocate wal node-{}'s buffer because out of memory.", this.identifier, e);
            close();
            throw e;
        }
    }

    @Override // org.apache.iotdb.db.wal.buffer.IWALBuffer
    public void write(WALEntry wALEntry) {
        if (this.isClosed) {
            logger.error("Fail to write WALEntry into wal node-{} because this node is closed.", this.identifier);
            wALEntry.getWalFlushListener().fail(new WALNodeClosedException(this.identifier));
        } else {
            try {
                this.walEntries.put(wALEntry);
            } catch (InterruptedException e) {
                logger.warn("Interrupted when waiting for adding WALEntry to buffer.");
                Thread.currentThread().interrupt();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void syncWorkingBuffer(long j, WALFileStatus wALFileStatus) {
        switchWorkingBufferToFlushing();
        this.syncBufferThread.submit(new SyncBufferTask(this, j, wALFileStatus, false));
        this.currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fsyncWorkingBuffer(long j, WALFileStatus wALFileStatus, SerializeInfo serializeInfo) {
        switchWorkingBufferToFlushing();
        this.syncBufferThread.submit(new SyncBufferTask(j, wALFileStatus, true, serializeInfo));
        this.currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
    }

    private void switchWorkingBufferToFlushing() {
        this.buffersLock.lock();
        while (this.idleBuffer == null) {
            try {
                this.idleBufferReadyCondition.await();
            } catch (InterruptedException e) {
                logger.warn("Interrupted When waiting for available working buffer.");
                Thread.currentThread().interrupt();
                return;
            } finally {
                this.buffersLock.unlock();
            }
        }
        this.syncingBuffer = this.workingBuffer;
        this.workingBuffer = this.idleBuffer;
        this.workingBuffer.clear();
        this.idleBuffer = null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void switchSyncingBufferToIdle() {
        this.buffersLock.lock();
        try {
            this.idleBuffer = this.syncingBuffer;
            this.syncingBuffer = null;
            this.idleBufferReadyCondition.signalAll();
        } finally {
            this.buffersLock.unlock();
        }
    }

    @Override // org.apache.iotdb.db.wal.buffer.IWALBuffer
    public void waitForFlush() throws InterruptedException {
        this.buffersLock.lock();
        try {
            this.idleBufferReadyCondition.await();
        } finally {
            this.buffersLock.unlock();
        }
    }

    @Override // org.apache.iotdb.db.wal.buffer.IWALBuffer
    public boolean waitForFlush(long j, TimeUnit timeUnit) throws InterruptedException {
        this.buffersLock.lock();
        try {
            boolean await = this.idleBufferReadyCondition.await(j, timeUnit);
            this.buffersLock.unlock();
            return await;
        } catch (Throwable th) {
            this.buffersLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.iotdb.db.wal.buffer.IWALBuffer, java.lang.AutoCloseable
    public void close() {
        this.isClosed = true;
        if (this.serializeThread != null) {
            try {
                this.walEntries.put(new WALSignalEntry(WALEntryType.CLOSE_SIGNAL));
            } catch (InterruptedException e) {
                logger.error("Fail to put CLOSE_SIGNAL to walEntries.", e);
            }
            shutdownThread(this.serializeThread, ThreadName.WAL_SERIALIZE);
        }
        if (this.syncBufferThread != null) {
            shutdownThread(this.syncBufferThread, ThreadName.WAL_SYNC);
        }
        if (this.currentWALFileWriter != null) {
            try {
                this.currentWALFileWriter.close();
            } catch (IOException e2) {
                logger.error("Fail to close wal node-{}'s log writer.", this.identifier, e2);
            }
        }
        if (this.workingBuffer != null) {
            MmapUtil.clean((MappedByteBuffer) this.workingBuffer);
        }
        if (this.idleBuffer != null) {
            MmapUtil.clean((MappedByteBuffer) this.workingBuffer);
        }
        if (this.syncingBuffer != null) {
            MmapUtil.clean((MappedByteBuffer) this.syncingBuffer);
        }
    }

    private void shutdownThread(ExecutorService executorService, ThreadName threadName) {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                logger.warn("Waiting thread {} to be terminated is timeout", threadName.getName());
            }
        } catch (InterruptedException e) {
            logger.warn("Thread {} still doesn't exit after 30s", threadName.getName());
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.apache.iotdb.db.wal.buffer.IWALBuffer
    public boolean isAllWALEntriesConsumed() {
        boolean z;
        this.buffersLock.lock();
        try {
            if (this.walEntries.isEmpty() && this.workingBuffer.position() == 0) {
                if (this.syncingBuffer == null) {
                    z = true;
                    return z;
                }
            }
            z = false;
            return z;
        } finally {
            this.buffersLock.unlock();
        }
    }
}
