package org.apache.iotdb.db.pipe.agent.task;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.PipeTask;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeBuilder;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskBuilder;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.SubStringFunctionColumnTransformer;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.class */
public class PipeDataNodeTaskAgent extends PipeTaskAgent {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeTaskAgent.class);
    protected static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();

    protected boolean isShutdown() {
        return PipeAgent.runtime().isShutdown();
    }

    protected Map<Integer, PipeTask> buildPipeTasks(PipeMeta pipeMeta) throws IllegalPathException {
        return new PipeDataNodeBuilder(pipeMeta).build();
    }

    protected void createPipeTask(int i, PipeStaticMeta pipeStaticMeta, PipeTaskMeta pipeTaskMeta) throws IllegalPathException {
        if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
            PipeParameters extractorParameters = pipeStaticMeta.getExtractorParameters();
            boolean z = StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(i)) && DataRegionListeningFilter.shouldDataRegionBeListened(extractorParameters);
            boolean z2 = SchemaEngine.getInstance().getAllSchemaRegionIds().contains(new SchemaRegionId(i)) && !SchemaRegionListeningFilter.parseListeningPlanTypeSet(extractorParameters).isEmpty();
            if (z || z2) {
                PipeDataNodeTask build = new PipeDataNodeTaskBuilder(pipeStaticMeta, i, pipeTaskMeta).build();
                build.create();
                this.pipeTaskManager.addPipeTask(pipeStaticMeta, i, build);
            }
        }
        this.pipeMetaKeeper.getPipeMeta(pipeStaticMeta.getPipeName()).getRuntimeMeta().getConsensusGroupId2TaskMetaMap().put(Integer.valueOf(i), pipeTaskMeta);
    }

    public List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal(List<PipeMeta> list) {
        if (isShutdown()) {
            return Collections.emptyList();
        }
        List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal = super.handlePipeMetaChangesInternal(list);
        try {
            closeSchemaRegionListeningQueueIfNecessary(clearSchemaRegionListeningQueueIfNecessary(list), handlePipeMetaChangesInternal);
            return handlePipeMetaChangesInternal;
        } catch (Exception e) {
            throw new PipeException("Failed to clear/close schema region listening queue.", e);
        }
    }

    private Set<Integer> clearSchemaRegionListeningQueueIfNecessary(List<PipeMeta> list) throws IllegalPathException {
        HashMap hashMap = new HashMap();
        for (PipeMeta pipeMeta : list) {
            if (!SchemaRegionListeningFilter.parseListeningPlanTypeSet(pipeMeta.getStaticMeta().getExtractorParameters()).isEmpty()) {
                ConcurrentMap consensusGroupId2TaskMetaMap = pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
                Iterator<SchemaRegionId> it = SchemaEngine.getInstance().getAllSchemaRegionIds().iterator();
                while (it.hasNext()) {
                    int id = it.next().getId();
                    PipeTaskMeta pipeTaskMeta = (PipeTaskMeta) consensusGroupId2TaskMetaMap.get(Integer.valueOf(id));
                    if (pipeTaskMeta != null) {
                        MetaProgressIndex progressIndex = pipeTaskMeta.getProgressIndex();
                        if (!(progressIndex instanceof MetaProgressIndex)) {
                            hashMap.put(Integer.valueOf(id), 0L);
                        } else if (progressIndex.getIndex() + 1 < ((Long) hashMap.getOrDefault(Integer.valueOf(id), Long.valueOf(WALNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX))).longValue()) {
                            hashMap.put(Integer.valueOf(id), Long.valueOf(progressIndex.getIndex() + 1));
                        }
                    }
                }
            }
        }
        hashMap.forEach((num, l) -> {
            PipeAgent.runtime().schemaListener(new SchemaRegionId(num.intValue())).removeBefore(l.longValue());
        });
        return hashMap.keySet();
    }

    private void closeSchemaRegionListeningQueueIfNecessary(Set<Integer> set, List<TPushPipeMetaRespExceptionMessage> list) {
        if (list.isEmpty()) {
            PipeAgent.runtime().listeningSchemaRegionIds().forEach(schemaRegionId -> {
                if (set.contains(Integer.valueOf(schemaRegionId.getId())) || !PipeAgent.runtime().isSchemaLeaderReady(schemaRegionId)) {
                    return;
                }
                try {
                    SchemaRegionConsensusImpl.getInstance().write(schemaRegionId, new PipeOperateSchemaQueueNode(new PlanNodeId(SubStringFunctionColumnTransformer.EMPTY_STRING), false));
                } catch (ConsensusException e) {
                    throw new PipeException("Failed to close listening queue for SchemaRegion " + schemaRegionId, e);
                }
            });
        }
    }

    public void stopAllPipesWithCriticalException() {
        super.stopAllPipesWithCriticalException(IoTDBDescriptor.getInstance().getConfig().getDataNodeId());
    }

    public void collectPipeMetaList(TDataNodeHeartbeatResp tDataNodeHeartbeatResp) throws TException {
        if (tryReadLockWithTimeOut(10L)) {
            try {
                collectPipeMetaListInternal(tDataNodeHeartbeatResp);
            } finally {
                releaseReadLock();
            }
        }
    }

    private void collectPipeMetaListInternal(TDataNodeHeartbeatResp tDataNodeHeartbeatResp) throws TException {
        if (PipeAgent.runtime().isShutdown()) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        try {
            Optional<Logger> schedule = PipeResourceManager.log().schedule(PipeDataNodeTaskAgent.class, PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), this.pipeMetaKeeper.getPipeMetaCount());
            for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
                arrayList.add(pipeMeta.serialize());
                schedule.ifPresent(logger -> {
                    logger.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
                });
            }
            LOGGER.info("Reported {} pipe metas.", Integer.valueOf(arrayList.size()));
            tDataNodeHeartbeatResp.setPipeMetaList(arrayList);
        } catch (IOException e) {
            throw new TException(e);
        }
    }

    protected void collectPipeMetaListInternal(TPipeHeartbeatReq tPipeHeartbeatReq, TPipeHeartbeatResp tPipeHeartbeatResp) throws TException {
        if (PipeAgent.runtime().isShutdown()) {
            return;
        }
        LOGGER.info("Received pipe heartbeat request {} from config node.", Long.valueOf(tPipeHeartbeatReq.heartbeatId));
        ArrayList arrayList = new ArrayList();
        try {
            Optional<Logger> schedule = PipeResourceManager.log().schedule(PipeDataNodeTaskAgent.class, PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), this.pipeMetaKeeper.getPipeMetaCount());
            for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
                arrayList.add(pipeMeta.serialize());
                schedule.ifPresent(logger -> {
                    logger.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
                });
            }
            LOGGER.info("Reported {} pipe metas.", Integer.valueOf(arrayList.size()));
            tPipeHeartbeatResp.setPipeMetaList(arrayList);
            PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
        } catch (IOException e) {
            throw new TException(e);
        }
    }

    public void restartAllStuckPipes() {
        if (tryWriteLockWithTimeOut(5L)) {
            try {
                restartAllStuckPipesInternal();
            } finally {
                releaseWriteLock();
            }
        }
    }

    private void restartAllStuckPipesInternal() {
        Map<String, IoTDBDataRegionExtractor> extractorMap = PipeExtractorMetrics.getInstance().getExtractorMap();
        HashSet hashSet = new HashSet();
        for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
            String pipeName = pipeMeta.getStaticMeta().getPipeName();
            List list = (List) extractorMap.values().stream().filter(ioTDBDataRegionExtractor -> {
                return ioTDBDataRegionExtractor.getPipeName().equals(pipeName);
            }).collect(Collectors.toList());
            if (!list.isEmpty() && ((IoTDBDataRegionExtractor) list.get(0)).isStreamMode() && !list.stream().noneMatch((v0) -> {
                return v0.hasConsumedAllHistoricalTsFiles();
            }) && (mayMemTablePinnedCountReachDangerousThreshold() || mayWalSizeReachThrottleThreshold())) {
                LOGGER.warn("Pipe {} may be stuck.", pipeMeta.getStaticMeta());
                hashSet.add(pipeMeta);
            }
        }
        hashSet.parallelStream().forEach(this::restartStuckPipe);
    }

    private boolean mayMemTablePinnedCountReachDangerousThreshold() {
        return PipeResourceManager.wal().getPinnedWalCount() >= 10 * PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
    }

    private boolean mayWalSizeReachThrottleThreshold() {
        return 3 * WALManager.getInstance().getTotalDiskUsage() > 2 * IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
    }

    private void restartStuckPipe(PipeMeta pipeMeta) {
        LOGGER.warn("Pipe {} will be restarted because of stuck.", pipeMeta.getStaticMeta());
        long currentTimeMillis = System.currentTimeMillis();
        changePipeStatusBeforeRestart(pipeMeta.getStaticMeta().getPipeName());
        handleSinglePipeMetaChangesInternal(pipeMeta);
        LOGGER.warn("Pipe {} was restarted because of stuck, time cost: {} ms.", pipeMeta.getStaticMeta(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private void changePipeStatusBeforeRestart(String str) {
        PipeMeta pipeMeta = this.pipeMetaKeeper.getPipeMeta(str);
        HashSet hashSet = new HashSet(this.pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta()).keySet());
        Set set = (Set) StorageEngine.getInstance().getAllDataRegionIds().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet());
        Stream stream = hashSet.stream();
        Objects.requireNonNull(set);
        Set set2 = (Set) stream.filter((v1) -> {
            return r1.contains(v1);
        }).map(num -> {
            return this.pipeTaskManager.removePipeTask(pipeMeta.getStaticMeta(), num.intValue());
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toSet());
        set2.parallelStream().forEach((v0) -> {
            v0.drop();
        });
        this.pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta()).values().parallelStream().forEach((v0) -> {
            v0.stop();
        });
        set2.parallelStream().forEach(pipeTask -> {
            PipeDataNodeTask build = new PipeDataNodeTaskBuilder(pipeMeta.getStaticMeta(), ((PipeDataNodeTask) pipeTask).getRegionId(), (PipeTaskMeta) pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().get(Integer.valueOf(((PipeDataNodeTask) pipeTask).getRegionId()))).build();
            build.create();
            this.pipeTaskManager.addPipeTask(pipeMeta.getStaticMeta(), ((PipeDataNodeTask) pipeTask).getRegionId(), build);
        });
        pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
    }

    public Set<Integer> getPipeTaskRegionIdSet(String str, long j) {
        PipeMeta pipeMeta = this.pipeMetaKeeper.getPipeMeta(str);
        return (pipeMeta == null || pipeMeta.getStaticMeta().getCreationTime() != j) ? Collections.emptySet() : pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().keySet();
    }
}
