package org.apache.iotdb.db.pipe.processor.twostage.combiner;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest;
import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse;
import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.class */
public class PipeCombineHandlerManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeCombineHandlerManager.class);
    private final ConcurrentMap<String, PipeCombineHandler> pipeId2CombineHandler;
    private final ConcurrentMap<String, AtomicInteger> pipeId2ReferenceCount;
    private final ConcurrentMap<String, Object> pipeId2LastCombinedValue;

    /* loaded from: input_file:org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager$CombineHandlerManagerHolder.class */
    private static class CombineHandlerManagerHolder {
        private static final PipeCombineHandlerManager INSTANCE = new PipeCombineHandlerManager();

        private CombineHandlerManagerHolder() {
        }
    }

    public synchronized void register(String str, long j, Function<String, Operator> function) {
        String generatePipeId = generatePipeId(str, j);
        this.pipeId2CombineHandler.putIfAbsent(generatePipeId, new PipeCombineHandler(str, j, function));
        this.pipeId2ReferenceCount.putIfAbsent(generatePipeId, new AtomicInteger(0));
        this.pipeId2ReferenceCount.get(generatePipeId).incrementAndGet();
    }

    public synchronized void deregister(String str, long j) {
        String generatePipeId = generatePipeId(str, j);
        if (!this.pipeId2ReferenceCount.containsKey(generatePipeId) || this.pipeId2ReferenceCount.get(generatePipeId).decrementAndGet() > 0) {
            return;
        }
        this.pipeId2LastCombinedValue.remove(generatePipeId);
        this.pipeId2ReferenceCount.remove(generatePipeId);
        try {
            this.pipeId2CombineHandler.remove(generatePipeId).close();
        } catch (Exception e) {
            LOGGER.warn("Error occurred when closing CombineHandler(id = {})", generatePipeId, e);
        }
    }

    public Object getLastCombinedValue(String str, long j) {
        return this.pipeId2LastCombinedValue.get(generatePipeId(str, j));
    }

    public void updateLastCombinedValue(String str, long j, Object obj) {
        this.pipeId2LastCombinedValue.put(generatePipeId(str, j), obj);
    }

    public synchronized Set<Integer> getExpectedDataNodeIdSet(String str, long j) {
        PipeCombineHandler pipeCombineHandler = this.pipeId2CombineHandler.get(generatePipeId(str, j));
        return Objects.isNull(pipeCombineHandler) ? Collections.emptySet() : pipeCombineHandler.getExpectedDataNodeIdSet();
    }

    public TPipeTransferResp handle(CombineRequest combineRequest) {
        String generatePipeId = generatePipeId(combineRequest.getPipeName(), combineRequest.getCreationTime());
        PipeCombineHandler pipeCombineHandler = this.pipeId2CombineHandler.get(generatePipeId);
        if (Objects.isNull(pipeCombineHandler)) {
            throw new PipeException("CombineHandler not found for pipeId = " + generatePipeId);
        }
        return new TPipeTransferResp().setStatus(pipeCombineHandler.combine(combineRequest.getRegionId(), combineRequest.getCombineId(), combineRequest.getState()));
    }

    public FetchCombineResultResponse handle(FetchCombineResultRequest fetchCombineResultRequest) throws IOException {
        String generatePipeId = generatePipeId(fetchCombineResultRequest.getPipeName(), fetchCombineResultRequest.getCreationTime());
        PipeCombineHandler pipeCombineHandler = this.pipeId2CombineHandler.get(generatePipeId);
        if (Objects.isNull(pipeCombineHandler)) {
            throw new PipeException("CombineHandler not found for pipeId = " + generatePipeId);
        }
        return pipeCombineHandler.fetchCombineResult(fetchCombineResultRequest.getCombineIdList());
    }

    public void fetchExpectedRegionIdSetAndCleanOutdatedCombiner() {
        HashMap hashMap;
        synchronized (this) {
            hashMap = new HashMap(this.pipeId2CombineHandler);
        }
        hashMap.forEach((str, pipeCombineHandler) -> {
            pipeCombineHandler.fetchAndUpdateExpectedRegionId2DataNodeIdMap();
            pipeCombineHandler.cleanOutdatedCombiner();
        });
    }

    private static String generatePipeId(String str, long j) {
        return str + "-" + j;
    }

    private PipeCombineHandlerManager() {
        this.pipeId2CombineHandler = new ConcurrentHashMap();
        this.pipeId2ReferenceCount = new ConcurrentHashMap();
        this.pipeId2LastCombinedValue = new ConcurrentHashMap();
        PipeAgent.runtime().registerPeriodicalJob("CombineHandlerManager#fetchExpectedRegionIdSetAndCleanOutdatedCombiner", this::fetchExpectedRegionIdSetAndCleanOutdatedCombiner, (PipeConfig.getInstance().getTwoStageAggregateDataRegionInfoCacheTimeInMs() / 1000) / 2);
    }

    public static PipeCombineHandlerManager getInstance() {
        return CombineHandlerManagerHolder.INSTANCE;
    }
}
