package org.apache.iotdb.db.sync.pipedata.queue;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.sync.conf.SyncConstant;
import org.apache.iotdb.db.sync.conf.SyncPathUtil;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.class */
public class BufferedPipeDataQueue implements PipeDataQueue {
    private static final Logger logger = LoggerFactory.getLogger(BufferedPipeDataQueue.class);
    private final String pipeLogDir;
    private BlockingDeque<PipeData> inputDeque;
    private DataOutputStream outputStream;
    private long currentPipeLogSize;
    private DataOutputStream commitLogWriter;
    private long currentCommitLogSize;
    private final Object waitLock = new Object();
    private long lastMaxSerialNumber = 0;
    private BlockingDeque<Long> pipeLogStartNumber = new LinkedBlockingDeque();
    private BlockingDeque<PipeData> outputDeque = new LinkedBlockingDeque();
    private long pullSerialNumber = Long.MIN_VALUE;
    private long commitSerialNumber = Long.MIN_VALUE;

    public BufferedPipeDataQueue(String str) {
        this.pipeLogDir = str;
        recover();
    }

    private void recover() {
        if (new File(this.pipeLogDir).exists()) {
            recoverPipeLogStartNumber();
            recoverLastMaxSerialNumber();
            recoverCommitSerialNumber();
            recoverOutputDeque();
        }
    }

    private void recoverPipeLogStartNumber() {
        File file = new File(this.pipeLogDir);
        ArrayList arrayList = new ArrayList();
        for (File file2 : file.listFiles()) {
            if (file2.getName().endsWith(SyncConstant.PIPE_LOG_NAME_SUFFIX) && file2.length() > 0) {
                arrayList.add(SyncPathUtil.getSerialNumberFromPipeLogName(file2.getName()));
            }
        }
        if (arrayList.size() != 0) {
            Collections.sort(arrayList);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                this.pipeLogStartNumber.offer((Long) it.next());
            }
        }
    }

    private void recoverLastMaxSerialNumber() {
        if (this.pipeLogStartNumber.isEmpty()) {
            return;
        }
        File file = new File(this.pipeLogDir, SyncPathUtil.getPipeLogName(this.pipeLogStartNumber.peekLast().longValue()));
        try {
            List<PipeData> parsePipeLog = parsePipeLog(file);
            int size = parsePipeLog.size();
            this.lastMaxSerialNumber = size == 0 ? this.pipeLogStartNumber.peekLast().longValue() - 1 : parsePipeLog.get(size - 1).getSerialNumber();
        } catch (IOException e) {
            logger.error(String.format("Can not recover inputQueue from %s, because %s.", file.getPath(), e));
        }
    }

    private void recoverCommitSerialNumber() {
        File file = new File(this.pipeLogDir, SyncConstant.COMMIT_LOG_NAME);
        if (!file.exists()) {
            if (this.pipeLogStartNumber.isEmpty()) {
                return;
            }
            this.commitSerialNumber = this.pipeLogStartNumber.peek().longValue() - 1;
            return;
        }
        try {
            RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");
            Throwable th = null;
            try {
                try {
                    if (randomAccessFile.length() >= 8) {
                        randomAccessFile.seek(randomAccessFile.length() - 8);
                        this.commitSerialNumber = randomAccessFile.readLong();
                    }
                    if (randomAccessFile != null) {
                        if (0 != 0) {
                            try {
                                randomAccessFile.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            randomAccessFile.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (IOException e) {
            logger.error(String.format("deserialize remove serial number error, remove serial number has been set to %d, because %s", Long.valueOf(this.commitSerialNumber), e));
        }
    }

    private void recoverOutputDeque() {
        if (this.pipeLogStartNumber.isEmpty()) {
            return;
        }
        File file = new File(this.pipeLogDir, SyncPathUtil.getPipeLogName(this.pipeLogStartNumber.peek().longValue()));
        try {
            List<PipeData> parsePipeLog = parsePipeLog(file);
            for (int size = parsePipeLog.size() - 1; size >= 0; size--) {
                PipeData pipeData = parsePipeLog.get(size);
                if (pipeData.getSerialNumber() <= this.commitSerialNumber) {
                    break;
                }
                this.outputDeque.addFirst(pipeData);
            }
        } catch (IOException e) {
            logger.error(String.format("Recover output deque from pipe log %s error, because %s.", file.getPath(), e));
        }
    }

    public long getLastMaxSerialNumber() {
        return this.lastMaxSerialNumber;
    }

    public long getCommitSerialNumber() {
        return this.commitSerialNumber;
    }

    @Override // org.apache.iotdb.db.sync.pipedata.queue.PipeDataQueue
    public boolean offer(PipeData pipeData) {
        if (this.outputStream == null || this.currentPipeLogSize > SyncConstant.DEFAULT_PIPE_LOG_SIZE_IN_BYTE.longValue()) {
            try {
                moveToNextPipeLog(pipeData.getSerialNumber());
            } catch (IOException e) {
                logger.error(String.format("Move to next pipe log %s error, because %s.", pipeData, e));
            }
        }
        if (!this.inputDeque.offer(pipeData)) {
            return false;
        }
        synchronized (this.waitLock) {
            this.waitLock.notifyAll();
        }
        try {
            writeToDisk(pipeData);
            return true;
        } catch (IOException e2) {
            logger.error(String.format("Record pipe data %s error, because %s.", pipeData, e2));
            return false;
        }
    }

    private synchronized void moveToNextPipeLog(long j) throws IOException {
        if (this.outputStream != null) {
            this.outputStream.close();
        }
        File file = new File(this.pipeLogDir, SyncPathUtil.getPipeLogName(j));
        SyncPathUtil.createFile(file);
        this.outputStream = new DataOutputStream(new FileOutputStream(file));
        this.pipeLogStartNumber.offer(Long.valueOf(j));
        this.currentPipeLogSize = 0L;
        this.inputDeque = new LinkedBlockingDeque();
        if (this.commitSerialNumber == Long.MIN_VALUE) {
            this.commitSerialNumber = j - 1;
        }
    }

    private void writeToDisk(PipeData pipeData) throws IOException {
        this.currentPipeLogSize += pipeData.serialize(this.outputStream);
        this.outputStream.flush();
    }

    private synchronized PipeData pullOnePipeData(long j) throws IOException {
        long j2 = j + 1;
        if (!this.outputDeque.isEmpty()) {
            return this.outputDeque.poll();
        }
        if (this.outputDeque == this.inputDeque || this.pipeLogStartNumber.isEmpty() || j == Long.MIN_VALUE || j2 > this.pipeLogStartNumber.peekLast().longValue()) {
            return null;
        }
        if (j2 != this.pipeLogStartNumber.peekLast().longValue() || this.inputDeque == null) {
            List<PipeData> parsePipeLog = parsePipeLog(new File(this.pipeLogDir, SyncPathUtil.getPipeLogName(j2)));
            int size = parsePipeLog.size();
            this.outputDeque = new LinkedBlockingDeque();
            for (int i = 0; i < size; i++) {
                this.outputDeque.offer(parsePipeLog.get(i));
            }
        } else {
            this.outputDeque = this.inputDeque;
        }
        return this.outputDeque.poll();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.iotdb.db.sync.pipedata.queue.PipeDataQueue
    public List<PipeData> pull(long j) {
        ArrayList arrayList = new ArrayList();
        this.pullSerialNumber = this.commitSerialNumber;
        while (this.pullSerialNumber < j) {
            try {
                PipeData pullOnePipeData = pullOnePipeData(this.pullSerialNumber);
                if (pullOnePipeData == null) {
                    break;
                }
                arrayList.add(pullOnePipeData);
                this.pullSerialNumber = pullOnePipeData.getSerialNumber();
            } catch (IOException e) {
                logger.error(String.format("Pull pipe data serial number %s error, because %s.", Long.valueOf(this.pullSerialNumber + 1), e));
            }
        }
        for (int size = arrayList.size() - 1; size >= 0; size--) {
            this.outputDeque.addFirst(arrayList.get(size));
        }
        return arrayList;
    }

    @Override // org.apache.iotdb.db.sync.pipedata.queue.PipeDataQueue
    public PipeData take() throws InterruptedException {
        PipeData pipeData = null;
        try {
            synchronized (this.waitLock) {
                while (true) {
                    PipeData pullOnePipeData = pullOnePipeData(this.commitSerialNumber);
                    pipeData = pullOnePipeData;
                    if (pullOnePipeData != null) {
                        break;
                    }
                    this.waitLock.wait();
                    this.waitLock.notifyAll();
                }
            }
        } catch (IOException e) {
            logger.error(String.format("Blocking pull pipe data number %s error, because %s", Long.valueOf(this.commitSerialNumber + 1), e));
        }
        this.outputDeque.addFirst(pipeData);
        this.pullSerialNumber = pipeData.getSerialNumber();
        return pipeData;
    }

    @Override // org.apache.iotdb.db.sync.pipedata.queue.PipeDataQueue
    public void commit() {
        if (this.pullSerialNumber == Long.MIN_VALUE) {
            return;
        }
        commit(this.pullSerialNumber);
    }

    public void commit(long j) {
        deletePipeData(j);
        deletePipeLog();
        serializeCommitSerialNumber();
    }

    private void deletePipeData(long j) {
        while (this.commitSerialNumber < j) {
            this.commitSerialNumber++;
            try {
                PipeData pullOnePipeData = pullOnePipeData(this.commitSerialNumber);
                if (pullOnePipeData != null) {
                    if (PipeData.PipeDataType.TSFILE.equals(pullOnePipeData.getType())) {
                        Iterator<File> it = ((TsFilePipeData) pullOnePipeData).getTsFiles(false).iterator();
                        while (it.hasNext()) {
                            Files.deleteIfExists(it.next().toPath());
                        }
                    }
                }
            } catch (IOException e) {
                logger.error(String.format("Commit pipe data serial number %s error, because %s.", Long.valueOf(this.commitSerialNumber), e));
            }
        }
    }

    private void deletePipeLog() {
        long longValue;
        if (this.pipeLogStartNumber.size() >= 2) {
            while (true) {
                longValue = this.pipeLogStartNumber.poll().longValue();
                if (this.pipeLogStartNumber.isEmpty() || this.pipeLogStartNumber.peek().longValue() > this.commitSerialNumber) {
                    break;
                }
                try {
                    Files.deleteIfExists(new File(this.pipeLogDir, SyncPathUtil.getPipeLogName(longValue)).toPath());
                } catch (IOException e) {
                    logger.warn(String.format("Delete %s-pipe.log error, because %s.", Long.valueOf(longValue), e));
                }
            }
            this.pipeLogStartNumber.addFirst(Long.valueOf(longValue));
        }
    }

    private void serializeCommitSerialNumber() {
        try {
            if (this.commitLogWriter == null) {
                this.commitLogWriter = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir, SyncConstant.COMMIT_LOG_NAME)));
                this.currentCommitLogSize = 0L;
            }
            this.commitLogWriter.writeLong(this.commitSerialNumber);
            this.commitLogWriter.flush();
            this.currentCommitLogSize += 8;
            if (this.currentCommitLogSize >= SyncConstant.DEFAULT_PIPE_LOG_SIZE_IN_BYTE.longValue()) {
                this.commitLogWriter.close();
                this.commitLogWriter = null;
            }
        } catch (IOException e) {
            logger.error(String.format("Serialize commit serial number %s error, because %s.", Long.valueOf(this.commitSerialNumber), e));
        }
    }

    @Override // org.apache.iotdb.db.sync.pipedata.queue.PipeDataQueue
    public synchronized boolean isEmpty() {
        if (this.outputDeque == null || this.pipeLogStartNumber.isEmpty()) {
            return true;
        }
        return this.pipeLogStartNumber.size() == 1 && this.outputDeque.isEmpty() && (this.inputDeque == null || this.inputDeque.isEmpty());
    }

    @Override // org.apache.iotdb.db.sync.pipedata.queue.PipeDataQueue
    public void close() {
        try {
            if (this.outputStream != null) {
                this.outputStream.close();
                this.outputStream = null;
            }
            if (this.commitLogWriter != null) {
                this.commitLogWriter.close();
                this.commitLogWriter = null;
            }
            this.inputDeque = null;
            this.pipeLogStartNumber = null;
            this.outputDeque = null;
        } catch (IOException e) {
            logger.warn(String.format("Close pipe log dir %s error.", this.pipeLogDir), e);
        }
    }

    @Override // org.apache.iotdb.db.sync.pipedata.queue.PipeDataQueue
    public void clear() {
        close();
        File file = new File(this.pipeLogDir);
        if (file.exists()) {
            FileUtils.deleteDirectory(file);
        }
    }

    public static List<PipeData> parsePipeLog(File file) throws IOException {
        ArrayList arrayList = new ArrayList();
        try {
            Throwable th = null;
            while (true) {
                try {
                    try {
                        arrayList.add(PipeData.deserialize(new DataInputStream(new FileInputStream(file))));
                    } finally {
                    }
                } finally {
                }
            }
        } catch (EOFException e) {
            return arrayList;
        } catch (IllegalPathException e2) {
            logger.error(String.format("Parsing pipeLog %s error, because %s", file.getPath(), e2));
            throw new IOException((Throwable) e2);
        }
    }
}
