package org.apache.iotdb.db.trigger.executor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.trigger.TriggerTable;
import org.apache.iotdb.commons.trigger.exception.TriggerExecutionException;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
import org.apache.iotdb.trigger.api.enums.FailureStrategy;
import org.apache.iotdb.trigger.api.enums.TriggerEvent;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.class */
public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(TriggerFireVisitor.class);
    private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
    private static final int FIRE_RETRY_NUM = IoTDBDescriptor.getInstance().getConfig().getRetryNumToFindStatefulTrigger();

    @Override // org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor
    public TriggerFireResult process(PlanNode planNode, TriggerEvent triggerEvent) {
        return TriggerManagementService.getInstance().isTriggerTableEmpty() ? TriggerFireResult.SUCCESS : (TriggerFireResult) planNode.accept(this, triggerEvent);
    }

    @Override // org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor
    public TriggerFireResult visitPlan(PlanNode planNode, TriggerEvent triggerEvent) {
        return TriggerFireResult.SUCCESS;
    }

    @Override // org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor
    public TriggerFireResult visitInsertRow(InsertRowNode insertRowNode, TriggerEvent triggerEvent) {
        Map<String, List<String>> constructTriggerNameToMeasurementListMap = constructTriggerNameToMeasurementListMap(insertRowNode, triggerEvent);
        if (constructTriggerNameToMeasurementListMap.isEmpty()) {
            return TriggerFireResult.SUCCESS;
        }
        MeasurementSchema[] measurementSchemas = insertRowNode.getMeasurementSchemas();
        Map<String, Integer> constructMeasurementToSchemaIndexMap = constructMeasurementToSchemaIndexMap(insertRowNode.getMeasurements(), measurementSchemas);
        Object[] values = insertRowNode.getValues();
        long time = insertRowNode.getTime();
        boolean z = false;
        for (Map.Entry<String, List<String>> entry : constructTriggerNameToMeasurementListMap.entrySet()) {
            Tablet tablet = new Tablet(insertRowNode.getDevicePath().getFullPath(), (List) entry.getValue().stream().map(str -> {
                return measurementSchemas[((Integer) constructMeasurementToSchemaIndexMap.get(str)).intValue()];
            }).collect(Collectors.toList()), 1);
            tablet.rowSize++;
            tablet.addTimestamp(0, time);
            for (String str2 : entry.getValue()) {
                tablet.addValue(str2, 0, values[constructMeasurementToSchemaIndexMap.get(str2).intValue()]);
            }
            TriggerFireResult fire = fire(entry.getKey(), tablet, triggerEvent);
            if (fire.equals(TriggerFireResult.TERMINATION)) {
                return fire;
            }
            if (fire.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
                z = true;
            }
        }
        return z ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
    }

    @Override // org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor
    public TriggerFireResult visitInsertTablet(InsertTabletNode insertTabletNode, TriggerEvent triggerEvent) {
        Tablet tablet;
        Map<String, List<String>> constructTriggerNameToMeasurementListMap = constructTriggerNameToMeasurementListMap(insertTabletNode, triggerEvent);
        if (constructTriggerNameToMeasurementListMap.isEmpty()) {
            return TriggerFireResult.SUCCESS;
        }
        MeasurementSchema[] measurementSchemas = insertTabletNode.getMeasurementSchemas();
        Map<String, Integer> constructMeasurementToSchemaIndexMap = constructMeasurementToSchemaIndexMap(insertTabletNode.getMeasurements(), measurementSchemas);
        Object[] columns = insertTabletNode.getColumns();
        BitMap[] bitMaps = insertTabletNode.getBitMaps();
        long[] times = insertTabletNode.getTimes();
        int rowCount = insertTabletNode.getRowCount();
        boolean z = false;
        for (Map.Entry<String, List<String>> entry : constructTriggerNameToMeasurementListMap.entrySet()) {
            if (entry.getValue().size() == measurementSchemas.length) {
                tablet = new Tablet(insertTabletNode.getDevicePath().getFullPath(), Arrays.asList(measurementSchemas), times, columns, bitMaps, rowCount);
            } else {
                List list = (List) entry.getValue().stream().map(str -> {
                    return measurementSchemas[((Integer) constructMeasurementToSchemaIndexMap.get(str)).intValue()];
                }).collect(Collectors.toList());
                Object[] array = entry.getValue().stream().map(str2 -> {
                    return columns[((Integer) constructMeasurementToSchemaIndexMap.get(str2)).intValue()];
                }).toArray();
                BitMap[] bitMapArr = new BitMap[entry.getValue().size()];
                if (bitMaps != null) {
                    for (int i = 0; i < entry.getValue().size(); i++) {
                        bitMapArr[i] = bitMaps[constructMeasurementToSchemaIndexMap.get(entry.getValue().get(i)).intValue()];
                    }
                }
                tablet = new Tablet(insertTabletNode.getDevicePath().getFullPath(), list, times, array, bitMapArr, rowCount);
            }
            TriggerFireResult fire = fire(entry.getKey(), tablet, triggerEvent);
            if (fire.equals(TriggerFireResult.TERMINATION)) {
                return fire;
            }
            if (fire.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
                z = true;
            }
        }
        return z ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
    }

    @Override // org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor
    public TriggerFireResult visitInsertRows(InsertRowsNode insertRowsNode, TriggerEvent triggerEvent) {
        boolean z = false;
        Iterator<InsertRowNode> it = insertRowsNode.getInsertRowNodeList().iterator();
        while (it.hasNext()) {
            TriggerFireResult visitInsertRow = visitInsertRow(it.next(), triggerEvent);
            if (visitInsertRow.equals(TriggerFireResult.TERMINATION)) {
                return visitInsertRow;
            }
            if (visitInsertRow.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
                z = true;
            }
        }
        return z ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
    }

    @Override // org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor
    public TriggerFireResult visitInsertMultiTablets(InsertMultiTabletsNode insertMultiTabletsNode, TriggerEvent triggerEvent) {
        boolean z = false;
        Iterator<InsertTabletNode> it = insertMultiTabletsNode.getInsertTabletNodeList().iterator();
        while (it.hasNext()) {
            TriggerFireResult visitInsertTablet = visitInsertTablet(it.next(), triggerEvent);
            if (visitInsertTablet.equals(TriggerFireResult.TERMINATION)) {
                return visitInsertTablet;
            }
            if (visitInsertTablet.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
                z = true;
            }
        }
        return z ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
    }

    @Override // org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor
    public TriggerFireResult visitInsertRowsOfOneDevice(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode, TriggerEvent triggerEvent) {
        boolean z = false;
        Iterator<InsertRowNode> it = insertRowsOfOneDeviceNode.getInsertRowNodeList().iterator();
        while (it.hasNext()) {
            TriggerFireResult visitInsertRow = visitInsertRow(it.next(), triggerEvent);
            if (visitInsertRow.equals(TriggerFireResult.TERMINATION)) {
                return visitInsertRow;
            }
            if (visitInsertRow.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
                z = true;
            }
        }
        return z ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
    }

    @Override // org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor
    public TriggerFireResult visitPipeEnrichedInsert(PipeEnrichedInsertNode pipeEnrichedInsertNode, TriggerEvent triggerEvent) {
        return (TriggerFireResult) pipeEnrichedInsertNode.getInsertNode().accept(this, triggerEvent);
    }

    private Map<String, Integer> constructMeasurementToSchemaIndexMap(String[] strArr, MeasurementSchema[] measurementSchemaArr) {
        HashMap hashMap = new HashMap();
        int length = strArr.length;
        for (int i = 0; i < length; i++) {
            if (strArr[i] != null) {
                if (measurementSchemaArr[i] == null || !measurementSchemaArr[i].getMeasurementId().equals(strArr[i])) {
                    int i2 = 0;
                    int length2 = measurementSchemaArr.length;
                    while (true) {
                        if (i2 >= length2) {
                            break;
                        }
                        if (measurementSchemaArr[i2] != null && measurementSchemaArr[i2].getMeasurementId().equals(strArr[i])) {
                            hashMap.put(strArr[i], Integer.valueOf(i2));
                            break;
                        }
                        i2++;
                    }
                } else {
                    hashMap.put(strArr[i], Integer.valueOf(i));
                }
            }
        }
        return hashMap;
    }

    private Map<String, List<String>> constructTriggerNameToMeasurementListMap(InsertNode insertNode, TriggerEvent triggerEvent) {
        PartialPath devicePath = insertNode.getDevicePath();
        ArrayList arrayList = new ArrayList();
        for (String str : insertNode.getMeasurements()) {
            if (str != null) {
                arrayList.add(str);
            }
        }
        List<List<String>> matchedTriggerListForPath = TriggerManagementService.getInstance().getMatchedTriggerListForPath(devicePath, arrayList);
        boolean z = true;
        Iterator<List<String>> it = matchedTriggerListForPath.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (!it.next().isEmpty()) {
                z = false;
                break;
            }
        }
        if (z) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        TriggerTable triggerTable = TriggerManagementService.getInstance().getTriggerTable();
        int size = arrayList.size();
        for (int i = 0; i < size; i++) {
            for (String str2 : matchedTriggerListForPath.get(i)) {
                TriggerInformation triggerInformation = triggerTable.getTriggerInformation(str2);
                if (triggerInformation.getEvent().equals(triggerEvent) && triggerInformation.getTriggerState().equals(TTriggerState.ACTIVE)) {
                    ((List) hashMap.computeIfAbsent(str2, str3 -> {
                        return new ArrayList();
                    })).add((String) arrayList.get(i));
                }
            }
        }
        return hashMap;
    }

    private TriggerFireResult fire(String str, Tablet tablet, TriggerEvent triggerEvent) {
        SyncDataNodeInternalServiceClient syncDataNodeInternalServiceClient;
        TFireTriggerResp fireTrigger;
        TriggerFireResult triggerFireResult = TriggerFireResult.SUCCESS;
        for (int i = 0; i < FIRE_RETRY_NUM; i++) {
            if (!TriggerManagementService.getInstance().needToFireOnAnotherDataNode(str)) {
                TriggerExecutor executor = TriggerManagementService.getInstance().getExecutor(str);
                if (executor == null) {
                    return TriggerManagementService.getInstance().getTriggerInformation(str).getFailureStrategy().equals(FailureStrategy.PESSIMISTIC) ? TriggerFireResult.TERMINATION : TriggerFireResult.FAILED_NO_TERMINATION;
                }
                try {
                    if (!executor.fire(tablet, triggerEvent)) {
                        triggerFireResult = executor.getFailureStrategy().equals(FailureStrategy.PESSIMISTIC) ? TriggerFireResult.TERMINATION : TriggerFireResult.FAILED_NO_TERMINATION;
                    }
                } catch (TriggerExecutionException e) {
                    triggerFireResult = executor.getFailureStrategy().equals(FailureStrategy.PESSIMISTIC) ? TriggerFireResult.TERMINATION : TriggerFireResult.FAILED_NO_TERMINATION;
                }
                return triggerFireResult;
            }
            TDataNodeLocation dataNodeLocationOfStatefulTrigger = TriggerManagementService.getInstance().getDataNodeLocationOfStatefulTrigger(str);
            try {
                try {
                    syncDataNodeInternalServiceClient = (SyncDataNodeInternalServiceClient) Coordinator.getInstance().getInternalServiceClientManager().borrowClient(dataNodeLocationOfStatefulTrigger.getInternalEndPoint());
                    try {
                        fireTrigger = syncDataNodeInternalServiceClient.fireTrigger(new TFireTriggerReq(str, tablet.serialize(), triggerEvent.getId()));
                    } catch (Throwable th) {
                        if (syncDataNodeInternalServiceClient != null) {
                            try {
                                syncDataNodeInternalServiceClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (ClientManagerException | TException e2) {
                    LOGGER.warn("Error occurred when trying to fire trigger({}) on TEndPoint: {}, the cause is: {}", new Object[]{str, dataNodeLocationOfStatefulTrigger.getInternalEndPoint(), e2});
                    updateLocationOfStatefulTrigger(str, dataNodeLocationOfStatefulTrigger.getDataNodeId());
                }
            } catch (InterruptedException e3) {
                LOGGER.warn("{} interrupted when sleep", str);
                Thread.currentThread().interrupt();
            } catch (Exception e4) {
                LOGGER.warn("Error occurred when trying to fire trigger({}) on TEndPoint: {}, the cause is: {}", new Object[]{str, dataNodeLocationOfStatefulTrigger.getInternalEndPoint(), e4});
                return TriggerManagementService.getInstance().getTriggerInformation(str).getFailureStrategy().equals(FailureStrategy.OPTIMISTIC) ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.TERMINATION;
            }
            if (fireTrigger.foundExecutor) {
                TriggerFireResult construct = TriggerFireResult.construct(fireTrigger.getFireResult());
                if (syncDataNodeInternalServiceClient != null) {
                    syncDataNodeInternalServiceClient.close();
                }
                return construct;
            }
            if (!updateLocationOfStatefulTrigger(str, dataNodeLocationOfStatefulTrigger.getDataNodeId())) {
                Thread.sleep(4000L);
            }
            if (syncDataNodeInternalServiceClient != null) {
                syncDataNodeInternalServiceClient.close();
            }
        }
        return triggerFireResult;
    }

    private boolean updateLocationOfStatefulTrigger(String str, int i) {
        try {
            ConfigNodeClient configNodeClient = (ConfigNodeClient) CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
            try {
                TDataNodeLocation dataNodeLocation = configNodeClient.getLocationOfStatefulTrigger(str).getDataNodeLocation();
                if (dataNodeLocation == null || i == dataNodeLocation.getDataNodeId()) {
                    if (configNodeClient != null) {
                        configNodeClient.close();
                    }
                    return false;
                }
                TriggerManagementService.getInstance().updateLocationOfStatefulTrigger(str, dataNodeLocation);
                if (configNodeClient != null) {
                    configNodeClient.close();
                }
                return true;
            } catch (Throwable th) {
                if (configNodeClient != null) {
                    try {
                        configNodeClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (ClientManagerException | TException | IOException e) {
            LOGGER.error("Failed to update location of stateful trigger({}) through config node. The cause is {}.", str, e);
            return false;
        }
    }
}
