package org.apache.iotdb.db.sync.sender.pipe;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeStatus;
import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.pipedata.queue.BufferedPipeDataQueue;
import org.apache.iotdb.db.sync.pipedata.queue.PipeDataQueue;
import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
import org.apache.iotdb.db.sync.sender.manager.LocalSyncManager;
import org.apache.iotdb.db.sync.sender.recovery.TsFilePipeLogger;
import org.apache.iotdb.db.sync.transport.client.SenderManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.class */
public class TsFilePipe implements Pipe {
    private static final Logger logger = LoggerFactory.getLogger(TsFilePipe.class);
    private final TsFilePipeInfo pipeInfo;
    private final PipeSink pipeSink;
    private final SenderManager senderManager;
    private final Map<String, ISyncManager> syncManagerMap = new ConcurrentHashMap();
    private final Map<String, PipeDataQueue> historyQueueMap = new ConcurrentHashMap();
    private final Map<String, PipeDataQueue> realTimeQueueMap = new ConcurrentHashMap();
    private final ReentrantReadWriteLock isCollectFinishedReadWriteLock = new ReentrantReadWriteLock(false);
    private final TsFilePipeLogger pipeLog = new TsFilePipeLogger(this);
    private boolean isCollectFinished = this.pipeLog.isCollectFinished();
    private final ReentrantLock collectRealTimeDataLock = new ReentrantLock();
    private AtomicLong maxSerialNumber = new AtomicLong(0);

    public TsFilePipe(long j, String str, PipeSink pipeSink, long j2, boolean z) {
        this.pipeInfo = new TsFilePipeInfo(str, pipeSink.getPipeSinkName(), PipeStatus.STOP, j, j2, z);
        this.pipeSink = pipeSink;
        this.senderManager = new SenderManager(this, pipeSink);
        recover();
    }

    private void recover() {
        File file = new File(SyncPathUtil.getSenderHistoryPipeLogDir(this.pipeInfo.getPipeName(), this.pipeInfo.getCreateTime()));
        if (file.exists()) {
            for (File file2 : file.listFiles()) {
                String name = file2.getName();
                this.historyQueueMap.put(name, new BufferedPipeDataQueue(SyncPathUtil.getSenderDataRegionHistoryPipeLogDir(this.pipeInfo.getPipeName(), this.pipeInfo.getCreateTime(), name)));
            }
        }
        File file3 = new File(SyncPathUtil.getSenderRealTimePipeLogDir(this.pipeInfo.getPipeName(), this.pipeInfo.getCreateTime()));
        if (file3.exists()) {
            for (File file4 : file3.listFiles()) {
                String name2 = file4.getName();
                BufferedPipeDataQueue bufferedPipeDataQueue = new BufferedPipeDataQueue(SyncPathUtil.getSenderDataRegionRealTimePipeLogDir(this.pipeInfo.getPipeName(), this.pipeInfo.getCreateTime(), name2));
                this.realTimeQueueMap.put(name2, bufferedPipeDataQueue);
                this.maxSerialNumber.set(Math.max(this.maxSerialNumber.get(), bufferedPipeDataQueue.getLastMaxSerialNumber()));
            }
        }
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public synchronized void start() throws PipeException {
        if (this.pipeInfo.getStatus() == PipeStatus.RUNNING) {
            return;
        }
        this.senderManager.checkConnection();
        for (DataRegion dataRegion : StorageEngineV2.getInstance().getAllDataRegions()) {
            logger.info(logFormat("init syncManager for %s-%s", dataRegion.getDatabaseName(), dataRegion.getDataRegionId()));
            getOrCreateSyncManager(dataRegion.getDataRegionId());
        }
        try {
            try {
                this.isCollectFinishedReadWriteLock.writeLock().lock();
                if (!this.isCollectFinished) {
                    this.pipeLog.clear();
                    collectHistoryData();
                    this.pipeLog.finishCollect();
                    this.isCollectFinished = true;
                }
                this.pipeInfo.setStatus(PipeStatus.RUNNING);
                this.senderManager.start();
                this.isCollectFinishedReadWriteLock.writeLock().unlock();
            } catch (IOException e) {
                logger.error(logFormat("Clear pipe dir %s error.", SyncPathUtil.getSenderPipeDir(this.pipeInfo.getPipeName(), this.pipeInfo.getCreateTime())), e);
                throw new PipeException("Start error, can not clear pipe log.");
            }
        } catch (Throwable th) {
            this.isCollectFinishedReadWriteLock.writeLock().unlock();
            throw th;
        }
    }

    private void collectHistoryData() {
        for (Map.Entry<String, ISyncManager> entry : this.syncManagerMap.entrySet()) {
            List<File> syncHistoryTsFile = entry.getValue().syncHistoryTsFile(this.pipeInfo.getDataStartTimestamp());
            int size = syncHistoryTsFile.size();
            for (int i = 0; i < size; i++) {
                long j = (1 - size) + i;
                File file = syncHistoryTsFile.get(i);
                this.historyQueueMap.get(entry.getKey()).offer(new TsFilePipeData(file.getParent(), file.getName(), j));
            }
        }
    }

    public File createHistoryTsFileHardlink(File file, long j) {
        this.collectRealTimeDataLock.lock();
        try {
            try {
                if (this.pipeLog.isHardlinkExist(file)) {
                    return null;
                }
                File createTsFileAndModsHardlink = this.pipeLog.createTsFileAndModsHardlink(file, j);
                this.collectRealTimeDataLock.unlock();
                return createTsFileAndModsHardlink;
            } catch (IOException e) {
                logger.error(logFormat("Create hardlink for history tsfile %s error.", file.getPath()), e);
                this.collectRealTimeDataLock.unlock();
                return null;
            }
        } finally {
            this.collectRealTimeDataLock.unlock();
        }
    }

    public void collectRealTimeDeletion(Deletion deletion, String str, String str2) {
        this.collectRealTimeDataLock.lock();
        try {
            try {
                if (!this.pipeInfo.isSyncDelOp()) {
                    this.collectRealTimeDataLock.unlock();
                    return;
                }
                Iterator<PartialPath> it = LocalSyncManager.splitPathPatternByDevice(deletion.getPath()).iterator();
                while (it.hasNext()) {
                    this.realTimeQueueMap.get(str2).offer(new DeletionPipeData(str, new Deletion(it.next(), deletion.getFileOffset(), deletion.getStartTime(), deletion.getEndTime()), this.maxSerialNumber.incrementAndGet()));
                }
                this.collectRealTimeDataLock.unlock();
            } catch (MetadataException e) {
                logger.warn(logFormat("Collect deletion %s error.", deletion), e);
                this.collectRealTimeDataLock.unlock();
            }
        } catch (Throwable th) {
            this.collectRealTimeDataLock.unlock();
            throw th;
        }
    }

    public void collectRealTimeTsFile(File file, String str) {
        this.collectRealTimeDataLock.lock();
        try {
            try {
                if (this.pipeLog.isHardlinkExist(file)) {
                    return;
                }
                File createTsFileHardlink = this.pipeLog.createTsFileHardlink(file);
                this.realTimeQueueMap.get(str).offer(new TsFilePipeData(createTsFileHardlink.getParent(), createTsFileHardlink.getName(), this.maxSerialNumber.incrementAndGet()));
                this.collectRealTimeDataLock.unlock();
            } catch (IOException e) {
                logger.warn(logFormat("Create Hardlink tsfile %s on disk error, serial number is %d.", file.getPath(), this.maxSerialNumber), e);
                this.collectRealTimeDataLock.unlock();
            }
        } finally {
            this.collectRealTimeDataLock.unlock();
        }
    }

    public void collectRealTimeResource(File file) {
        try {
            this.pipeLog.createTsFileResourceHardlink(file);
        } catch (IOException e) {
            logger.warn(logFormat("Record tsfile resource %s on disk error.", file.getPath()), e);
        }
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public PipeData take(String str) throws InterruptedException {
        return !this.historyQueueMap.get(str).isEmpty() ? this.historyQueueMap.get(str).take() : this.realTimeQueueMap.get(str).take();
    }

    public List<PipeData> pull(long j) {
        ArrayList arrayList = new ArrayList();
        for (PipeDataQueue pipeDataQueue : this.historyQueueMap.values()) {
            if (!pipeDataQueue.isEmpty()) {
                arrayList.addAll(pipeDataQueue.pull(j));
            }
        }
        for (PipeDataQueue pipeDataQueue2 : this.realTimeQueueMap.values()) {
            if (j > 0) {
                arrayList.addAll(pipeDataQueue2.pull(j));
            }
        }
        return arrayList;
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public void commit(String str) {
        if (!this.historyQueueMap.get(str).isEmpty()) {
            this.historyQueueMap.get(str).commit();
        }
        this.realTimeQueueMap.get(str).commit();
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public ISyncManager getOrCreateSyncManager(String str) {
        return this.syncManagerMap.computeIfAbsent(str, str2 -> {
            registerDataRegion(str2);
            return new LocalSyncManager(StorageEngineV2.getInstance().getDataRegion(new DataRegionId(Integer.parseInt(str2))), this);
        });
    }

    private void registerDataRegion(String str) {
        this.historyQueueMap.put(str, new BufferedPipeDataQueue(SyncPathUtil.getSenderDataRegionHistoryPipeLogDir(this.pipeInfo.getPipeName(), this.pipeInfo.getCreateTime(), str)));
        this.realTimeQueueMap.put(str, new BufferedPipeDataQueue(SyncPathUtil.getSenderDataRegionRealTimePipeLogDir(this.pipeInfo.getPipeName(), this.pipeInfo.getCreateTime(), str)));
        this.senderManager.registerDataRegion(str);
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public void unregisterDataRegion(String str) {
        ISyncManager remove = this.syncManagerMap.remove(str);
        if (remove != null) {
            remove.delete();
            this.senderManager.unregisterDataRegion(str);
            this.realTimeQueueMap.remove(str).clear();
            this.historyQueueMap.remove(str).clear();
        }
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public boolean isHistoryCollectFinished() {
        try {
            this.isCollectFinishedReadWriteLock.readLock().lock();
            return this.isCollectFinished;
        } finally {
            this.isCollectFinishedReadWriteLock.readLock().unlock();
        }
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public PipeInfo getPipeInfo() {
        return this.pipeInfo;
    }

    public void commit(long j) {
        for (PipeDataQueue pipeDataQueue : this.historyQueueMap.values()) {
            if (!pipeDataQueue.isEmpty()) {
                pipeDataQueue.commit(j);
            }
        }
        for (PipeDataQueue pipeDataQueue2 : this.realTimeQueueMap.values()) {
            if (j > 0) {
                pipeDataQueue2.commit(j);
            }
        }
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public synchronized void stop() throws PipeException {
        this.senderManager.stop();
        this.pipeInfo.setStatus(PipeStatus.STOP);
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public synchronized void drop() throws PipeException {
        close();
        clear();
    }

    private void clear() {
        try {
            this.historyQueueMap.values().forEach((v0) -> {
                v0.clear();
            });
            this.realTimeQueueMap.values().forEach((v0) -> {
                v0.clear();
            });
            this.pipeLog.clear();
        } catch (IOException e) {
            logger.warn(logFormat("Clear pipe %s %d error.", this.pipeInfo.getPipeName(), Long.valueOf(this.pipeInfo.getCreateTime())), e);
        }
    }

    private String logFormat(String str, Object... objArr) {
        return String.format(String.format("[%s-%s] ", this.pipeInfo.getPipeName(), Long.valueOf(this.pipeInfo.getCreateTime())) + str, objArr);
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public void close() throws PipeException {
        this.historyQueueMap.values().forEach((v0) -> {
            v0.close();
        });
        this.realTimeQueueMap.values().forEach((v0) -> {
            v0.close();
        });
        this.senderManager.close();
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public String getName() {
        return this.pipeInfo.getPipeName();
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public PipeSink getPipeSink() {
        return this.pipeSink;
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public long getCreateTime() {
        return this.pipeInfo.getCreateTime();
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public PipeStatus getStatus() {
        return this.pipeInfo.getStatus();
    }

    public String toString() {
        return "TsFilePipe{, pipeInfo=" + this.pipeInfo + ", pipeSink=" + this.pipeSink + ", pipeLog=" + this.pipeLog + ", collectRealTimeDataLock=" + this.collectRealTimeDataLock + ", maxSerialNumber=" + this.maxSerialNumber + '}';
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        TsFilePipe tsFilePipe = (TsFilePipe) obj;
        return Objects.equals(this.pipeInfo, tsFilePipe.pipeInfo) && Objects.equals(this.pipeSink, tsFilePipe.pipeSink);
    }

    public int hashCode() {
        return Objects.hash(this.pipeInfo, this.pipeSink);
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public SenderManager getSenderManager() {
        return this.senderManager;
    }
}
