package org.apache.iotdb.db.pipe.agent.task;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.PipeTask;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.extractor.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeBuilder;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskDataRegionBuilder;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskSchemaRegionBuilder;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.class */
public class PipeTaskDataNodeAgent extends PipeTaskAgent {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskDataNodeAgent.class);
    protected static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();

    /* renamed from: org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$common$rpc$thrift$TConsensusGroupType = new int[TConsensusGroupType.values().length];

        static {
            try {
                $SwitchMap$org$apache$iotdb$common$rpc$thrift$TConsensusGroupType[TConsensusGroupType.DataRegion.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$iotdb$common$rpc$thrift$TConsensusGroupType[TConsensusGroupType.SchemaRegion.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    protected boolean isShutdown() {
        return PipeAgent.runtime().isShutdown();
    }

    protected Map<TConsensusGroupId, PipeTask> buildPipeTasks(PipeMeta pipeMeta) {
        return new PipeDataNodeBuilder(pipeMeta).build();
    }

    public void stopAllPipesWithCriticalException() {
        int i = 0;
        while (!tryWriteLockWithTimeOut(5L)) {
            try {
                Thread.sleep(1000L);
                i++;
                LOGGER.warn("Failed to stop all pipes with critical exception, retry count: {}.", Integer.valueOf(i));
            } catch (InterruptedException e) {
                LOGGER.error("Interrupted when trying to stop all pipes with critical exception, exception message: {}", e.getMessage(), e);
                Thread.currentThread().interrupt();
                return;
            } catch (Exception e2) {
                LOGGER.error("Failed to stop all pipes with critical exception, exception message: {}", e2.getMessage(), e2);
                return;
            }
        }
        try {
            stopAllPipesWithCriticalExceptionInternal();
            LOGGER.info("Stopped all pipes with critical exception.");
        } finally {
            releaseWriteLock();
        }
    }

    private void stopAllPipesWithCriticalExceptionInternal() {
        int dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
        HashMap hashMap = new HashMap();
        this.pipeMetaKeeper.getPipeMetaList().forEach(pipeMeta -> {
            PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
            pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().values().forEach(pipeTaskMeta -> {
                if (pipeTaskMeta.getLeaderDataNodeId() != dataNodeId) {
                    return;
                }
                for (PipeRuntimeConnectorCriticalException pipeRuntimeConnectorCriticalException : pipeTaskMeta.getExceptionMessages()) {
                    if (pipeRuntimeConnectorCriticalException instanceof PipeRuntimeConnectorCriticalException) {
                        hashMap.putIfAbsent(staticMeta.getConnectorParameters(), pipeRuntimeConnectorCriticalException);
                    }
                }
            });
        });
        this.pipeMetaKeeper.getPipeMetaList().forEach(pipeMeta2 -> {
            PipeStaticMeta staticMeta = pipeMeta2.getStaticMeta();
            pipeMeta2.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().values().forEach(pipeTaskMeta -> {
                if (pipeTaskMeta.getLeaderDataNodeId() == dataNodeId && hashMap.containsKey(staticMeta.getConnectorParameters()) && !pipeTaskMeta.containsExceptionMessage((PipeRuntimeException) hashMap.get(staticMeta.getConnectorParameters()))) {
                    PipeRuntimeConnectorCriticalException pipeRuntimeConnectorCriticalException = (PipeRuntimeConnectorCriticalException) hashMap.get(staticMeta.getConnectorParameters());
                    pipeTaskMeta.trackExceptionMessage(pipeRuntimeConnectorCriticalException);
                    LOGGER.warn("Pipe {} (creation time = {}) will be stopped because of critical exception (occurred time {}) in connector {}.", new Object[]{staticMeta.getPipeName(), DateTimeUtils.convertLongToDate(staticMeta.getCreationTime(), "ms"), DateTimeUtils.convertLongToDate(pipeRuntimeConnectorCriticalException.getTimeStamp(), "ms"), staticMeta.getConnectorParameters()});
                }
            });
        });
        this.pipeMetaKeeper.getPipeMetaList().forEach(pipeMeta3 -> {
            PipeStaticMeta staticMeta = pipeMeta3.getStaticMeta();
            PipeRuntimeMeta runtimeMeta = pipeMeta3.getRuntimeMeta();
            if (runtimeMeta.getStatus().get() == PipeStatus.RUNNING) {
                runtimeMeta.getConsensusGroupId2TaskMetaMap().values().forEach(pipeTaskMeta -> {
                    for (PipeRuntimeException pipeRuntimeException : pipeTaskMeta.getExceptionMessages()) {
                        if (pipeRuntimeException instanceof PipeRuntimeCriticalException) {
                            stopPipe(staticMeta.getPipeName(), staticMeta.getCreationTime());
                            LOGGER.warn("Pipe {} (creation time = {}) was stopped because of critical exception (occurred time {}).", new Object[]{staticMeta.getPipeName(), DateTimeUtils.convertLongToDate(staticMeta.getCreationTime(), "ms"), DateTimeUtils.convertLongToDate(pipeRuntimeException.getTimeStamp(), "ms")});
                            return;
                        }
                    }
                });
            }
        });
    }

    protected void createPipeTask(TConsensusGroupId tConsensusGroupId, PipeStaticMeta pipeStaticMeta, PipeTaskMeta pipeTaskMeta) {
        PipeDataNodeTask build;
        if (tConsensusGroupId.getType() != TConsensusGroupType.ConfigRegion && pipeTaskMeta.getLeaderDataNodeId() == CONFIG.getDataNodeId()) {
            switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$common$rpc$thrift$TConsensusGroupType[tConsensusGroupId.getType().ordinal()]) {
                case 1:
                    build = new PipeDataNodeTaskDataRegionBuilder(pipeStaticMeta, tConsensusGroupId, pipeTaskMeta).build();
                    break;
                case 2:
                    build = new PipeDataNodeTaskSchemaRegionBuilder(pipeStaticMeta, tConsensusGroupId, pipeTaskMeta).build();
                    break;
                default:
                    throw new UnsupportedOperationException("Unsupported consensus group type: " + tConsensusGroupId.getType());
            }
            build.create();
            this.pipeTaskManager.addPipeTask(pipeStaticMeta, tConsensusGroupId, build);
        }
        this.pipeMetaKeeper.getPipeMeta(pipeStaticMeta.getPipeName()).getRuntimeMeta().getConsensusGroupId2TaskMetaMap().put(tConsensusGroupId, pipeTaskMeta);
    }

    public void collectPipeMetaList(TDataNodeHeartbeatResp tDataNodeHeartbeatResp) throws TException {
        if (tryReadLockWithTimeOut(10L)) {
            try {
                collectPipeMetaListInternal(tDataNodeHeartbeatResp);
            } finally {
                releaseReadLock();
            }
        }
    }

    private void collectPipeMetaListInternal(TDataNodeHeartbeatResp tDataNodeHeartbeatResp) throws TException {
        if (PipeAgent.runtime().isShutdown()) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        try {
            for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
                arrayList.add(pipeMeta.serialize());
                LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
            }
            tDataNodeHeartbeatResp.setPipeMetaList(arrayList);
        } catch (IOException e) {
            throw new TException(e);
        }
    }

    public void collectPipeMetaList(TPipeHeartbeatReq tPipeHeartbeatReq, TPipeHeartbeatResp tPipeHeartbeatResp) throws TException {
        acquireReadLock();
        try {
            collectPipeMetaListInternal(tPipeHeartbeatReq, tPipeHeartbeatResp);
        } finally {
            releaseReadLock();
        }
    }

    private void collectPipeMetaListInternal(TPipeHeartbeatReq tPipeHeartbeatReq, TPipeHeartbeatResp tPipeHeartbeatResp) throws TException {
        if (PipeAgent.runtime().isShutdown()) {
            return;
        }
        LOGGER.info("Received pipe heartbeat request {} from config node.", Long.valueOf(tPipeHeartbeatReq.heartbeatId));
        ArrayList arrayList = new ArrayList();
        try {
            for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
                arrayList.add(pipeMeta.serialize());
                LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
            }
            tPipeHeartbeatResp.setPipeMetaList(arrayList);
            PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
        } catch (IOException e) {
            throw new TException(e);
        }
    }

    public void restartAllStuckPipes() {
        if (tryWriteLockWithTimeOut(5L)) {
            try {
                restartAllStuckPipesInternal();
            } finally {
                releaseWriteLock();
            }
        }
    }

    private void restartAllStuckPipesInternal() {
        Map<String, IoTDBDataRegionExtractor> extractorMap = PipeExtractorMetrics.getInstance().getExtractorMap();
        HashSet hashSet = new HashSet();
        for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
            String pipeName = pipeMeta.getStaticMeta().getPipeName();
            List list = (List) extractorMap.values().stream().filter(ioTDBDataRegionExtractor -> {
                return ioTDBDataRegionExtractor.getPipeName().equals(pipeName);
            }).collect(Collectors.toList());
            if (!list.isEmpty() && ((IoTDBDataRegionExtractor) list.get(0)).isStreamMode() && !list.stream().noneMatch((v0) -> {
                return v0.hasConsumedAllHistoricalTsFiles();
            }) && (mayMemTablePinnedCountReachDangerousThreshold() || mayWalSizeReachThrottleThreshold())) {
                LOGGER.warn("Pipe {} may be stuck.", pipeMeta.getStaticMeta());
                hashSet.add(pipeMeta);
            }
        }
        hashSet.parallelStream().forEach(this::restartStuckPipe);
    }

    private boolean mayMemTablePinnedCountReachDangerousThreshold() {
        return PipeResourceManager.wal().getPinnedWalCount() >= 10 * PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
    }

    private boolean mayWalSizeReachThrottleThreshold() {
        return 3 * WALManager.getInstance().getTotalDiskUsage() > 2 * IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
    }

    private void restartStuckPipe(PipeMeta pipeMeta) {
        LOGGER.warn("Pipe {} will be restarted because of stuck.", pipeMeta.getStaticMeta());
        long currentTimeMillis = System.currentTimeMillis();
        handleDropPipeInternal(pipeMeta.getStaticMeta().getPipeName());
        handleSinglePipeMetaChangesInternal(pipeMeta);
        LOGGER.warn("Pipe {} was restarted because of stuck, time cost: {} ms.", pipeMeta.getStaticMeta(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }
}
