/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.sync.externalpipe;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.lang3.Validate;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.sync.datasource.PipeOpManager;
import org.apache.iotdb.db.sync.datasource.PipeStorageGroupInfo;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginConfiguration;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
import org.apache.iotdb.db.sync.externalpipe.ExternalPipeStatus;
import org.apache.iotdb.db.sync.externalpipe.operation.InsertOperation;
import org.apache.iotdb.db.sync.externalpipe.operation.Operation;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.pipe.external.api.DataType;
import org.apache.iotdb.pipe.external.api.ExternalPipeSinkWriterStatus;
import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriter;
import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriterFactory;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExtPipePlugin {
    private static final Logger logger = LoggerFactory.getLogger(ExtPipePlugin.class);
    private String extPipeTypeName;
    Map<String, String> sinkParams;
    private PipeOpManager pipeOpManager;
    private IExternalPipeSinkWriterFactory pipeSinkWriterFactory;
    private ExtPipePluginConfiguration configuration;
    private volatile boolean alive = false;
    private List<DataTransmissionTask> dataTransmissionTasks;
    private ExecutorService executorService;
    private Map<String, Long> dataCommitMap = new ConcurrentHashMap<String, Long>();
    private Map<String, Map<String, AtomicInteger>> writerInvocationFailures;
    private int timestampDivisor;

    public ExtPipePlugin(String extPipeTypeName, Map<String, String> sinkParams, PipeOpManager pipeOpManager) {
        String timePrecision;
        this.extPipeTypeName = extPipeTypeName;
        this.sinkParams = sinkParams;
        this.pipeOpManager = pipeOpManager;
        switch (timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision()) {
            case "ms": {
                this.timestampDivisor = 1;
                break;
            }
            case "us": {
                this.timestampDivisor = 1000;
                break;
            }
            case "ns": {
                this.timestampDivisor = 1000000;
                break;
            }
            default: {
                throw new IllegalArgumentException("Unrecognized time precision: " + timePrecision);
            }
        }
    }

    public ExtPipePlugin(String Name2, IExternalPipeSinkWriterFactory factory, ExtPipePluginConfiguration conf, TsFilePipe tsFilePipe) {
    }

    public void setPipeSinkWriterFactory(IExternalPipeSinkWriterFactory pipeSinkWriterFactory) {
        this.pipeSinkWriterFactory = pipeSinkWriterFactory;
    }

    private int getIntParam(String paramName, int defaultValue) {
        String valueStr = this.sinkParams.get(paramName);
        if (valueStr == null) {
            return defaultValue;
        }
        return Integer.parseInt(valueStr);
    }

    public void start() throws IOException {
        logger.debug("ExtPipePlugin start(), extPipeName={}.", (Object)this.extPipeTypeName);
        if (this.alive) {
            String errMsg = "Can not re-run alive External pipe: " + this.extPipeTypeName + ".";
            logger.error(errMsg);
            throw new IllegalStateException(errMsg);
        }
        int threadNum = this.getIntParam("thread_num", 1);
        int batchSize = this.getIntParam("batch_size", 100000);
        int attemptTimes = this.getIntParam("attempt_times", 3);
        int backoffInterval = this.getIntParam("retry_interval", 1000);
        try {
            this.configuration = new ExtPipePluginConfiguration.Builder(this.extPipeTypeName).numOfThreads(threadNum).operationBatchSize(batchSize).attemptTimes(attemptTimes).backOffInterval(backoffInterval).build();
            if (this.pipeSinkWriterFactory == null) {
                this.pipeSinkWriterFactory = ExtPipePluginRegister.getInstance().getWriteFactory(this.extPipeTypeName);
            }
            this.pipeSinkWriterFactory.initialize(this.sinkParams);
        }
        catch (Exception e) {
            logger.error("Failed to start External Pipe: {}.", (Object)this.extPipeTypeName, (Object)e);
            throw new IOException("Failed to start External Pipe: " + this.extPipeTypeName + ". " + e.getMessage());
        }
        this.alive = true;
        logger.info("External pipe " + this.extPipeTypeName + " begin to START");
        this.executorService = Executors.newFixedThreadPool(threadNum, r -> {
            Thread thread = new Thread(r);
            thread.setName("ExtPipePlugin-worker-" + this.extPipeTypeName + "-" + thread.getId());
            return thread;
        });
        this.dataTransmissionTasks = new ArrayList<DataTransmissionTask>(threadNum);
        for (int i = 0; i < threadNum; ++i) {
            IExternalPipeSinkWriter writer = (IExternalPipeSinkWriter)this.pipeSinkWriterFactory.get();
            DataTransmissionTask dataTransmissionTask = new DataTransmissionTask(writer, i, this.configuration);
            this.dataTransmissionTasks.add(dataTransmissionTask);
            this.executorService.submit(dataTransmissionTask);
        }
        this.writerInvocationFailures = new ConcurrentHashMap<String, Map<String, AtomicInteger>>();
        logger.info("External pipe " + this.extPipeTypeName + " finish START.");
    }

    public void stop() {
        if (!this.alive) {
            String errMsg = "Error: External pipe " + this.extPipeTypeName + " has not started.";
            logger.error(errMsg);
            throw new IllegalStateException(errMsg);
        }
        this.alive = false;
        this.executorService.shutdown();
        boolean isExecutorServiceTerminated = false;
        try {
            isExecutorServiceTerminated = this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            logger.error("Interrupted when waiting for the termination of external pipe, " + this.extPipeTypeName, (Throwable)e);
        }
        finally {
            if (!isExecutorServiceTerminated) {
                logger.warn("ExtPipePlugin stop(), graceful termination of external pipe {} timed out. So force terminating working threads.", (Object)this.extPipeTypeName);
                this.executorService.shutdownNow();
            }
        }
    }

    public boolean isAlive() {
        return this.alive;
    }

    public ExternalPipeStatus getStatus() {
        ExternalPipeStatus status = new ExternalPipeStatus();
        try {
            List<ExternalPipeSinkWriterStatus> writerStatuses = this.dataTransmissionTasks.stream().map(DataTransmissionTask::getStatus).collect(Collectors.toList());
            status.setWriterStatuses(writerStatuses);
        }
        catch (Exception e) {
            this.handleExceptionsThrownByWriter("getStatus", e);
        }
        status.setAlive(this.alive);
        status.setWriterInvocationFailures(this.writerInvocationFailures);
        return status;
    }

    private void handleExceptionsThrownByWriter(String method, Exception e) {
        this.writerInvocationFailures.computeIfAbsent(method, m -> new ConcurrentHashMap());
        String eMsg = e.getMessage();
        if (eMsg == null) {
            eMsg = "N/A";
        }
        this.writerInvocationFailures.get(method).computeIfAbsent(eMsg, msg -> new AtomicInteger(0));
        this.writerInvocationFailures.get(method).get(eMsg).incrementAndGet();
        logger.info("Exception thrown form writer", (Throwable)e);
    }

    public long getDataCommitIndex(String sgName) {
        return this.dataCommitMap.getOrDefault(sgName, Long.MIN_VALUE);
    }

    private class DataTransmissionTask
    implements Callable<Void> {
        private final IExternalPipeSinkWriter writer;
        private final int threadIndex;
        private final ExtPipePluginConfiguration configuration;
        private Map<String, PipeStorageGroupInfo> sgInfoMap;
        private long nextIndex;
        private String lastReadSgName;

        DataTransmissionTask(IExternalPipeSinkWriter writer, int threadIndex, ExtPipePluginConfiguration configuration) throws IOException {
            this.writer = (IExternalPipeSinkWriter)Validate.notNull((Object)writer);
            this.threadIndex = threadIndex;
            this.configuration = configuration;
            this.sgInfoMap = configuration.getBucketSgInfoMap(threadIndex);
            this.writer.open();
        }

        private long getSgNextDataIndex(String sgName) {
            PipeStorageGroupInfo sgInfo = this.sgInfoMap.get(sgName);
            if (sgInfo != null) {
                return sgInfo.getNextReadIndex();
            }
            long nextReadIndex = ExtPipePlugin.this.pipeOpManager.getCommittedIndex(sgName) + 1L;
            this.sgInfoMap.put(sgName, new PipeStorageGroupInfo(sgName, nextReadIndex - 1L, nextReadIndex));
            return nextReadIndex;
        }

        private void setSgNextDataIndex(String sgName, long nextReadIndex, long committedIndex) {
            logger.debug("setSgNextDataIndex(), sgName={}, nextReadIndex={}, committedIndex={}.", new Object[]{sgName, nextReadIndex, committedIndex});
            PipeStorageGroupInfo sgInfo = this.sgInfoMap.computeIfAbsent(sgName, k -> new PipeStorageGroupInfo(sgName, -1L, 0L));
            sgInfo.setNextReadIndex(nextReadIndex);
            sgInfo.setCommittedIndex(committedIndex);
        }

        private void commitData(String sgName, long committedIndex) {
            logger.debug("commitData(), sgName={}, committedIndex={}.", (Object)sgName, (Object)committedIndex);
            ExtPipePlugin.this.dataCommitMap.put(sgName, committedIndex);
        }

        private boolean sgHasNewData(String sgName) {
            PipeStorageGroupInfo pipeStorageGroupInfo = this.sgInfoMap.get(sgName);
            if (pipeStorageGroupInfo == null) {
                return true;
            }
            long nextReadIndex = pipeStorageGroupInfo.getNextReadIndex();
            long nextIndex = ExtPipePlugin.this.pipeOpManager.getNextIndex(sgName);
            return nextIndex > nextReadIndex;
        }

        public String waitForOperations() throws InterruptedException {
            if (this.lastReadSgName != null) {
                if (this.sgHasNewData(this.lastReadSgName)) {
                    return this.lastReadSgName;
                }
                this.lastReadSgName = null;
            }
            Set<String> sgSet = ExtPipePlugin.this.pipeOpManager.getSgSet();
            while (ExtPipePlugin.this.alive) {
                for (String sgName : sgSet) {
                    if (this.threadIndex != Math.abs(sgName.hashCode()) % this.configuration.getNumOfThreads() || !this.sgHasNewData(sgName)) continue;
                    this.lastReadSgName = sgName;
                    return sgName;
                }
                Thread.sleep(1000L);
            }
            return null;
        }

        @Override
        public Void call() throws Exception {
            logger.info("ExternalPipeWorker start. thread={}.", (Object)Thread.currentThread().getName());
            while (ExtPipePlugin.this.alive) {
                try {
                    String sgName = this.waitForOperations();
                    if (sgName == null) continue;
                    Operation operation = null;
                    try {
                        operation = ExtPipePlugin.this.pipeOpManager.getOperation(sgName, this.getSgNextDataIndex(sgName), this.configuration.getOperationBatchSize());
                    }
                    catch (IOException e) {
                        continue;
                    }
                    if (operation == null || operation.getDataCount() <= 0L) continue;
                    if (!this.handleOperationWithRetry(operation)) {
                        logger.error("Failed to handle operation after " + this.configuration.getAttemptTimes() + " attempts: " + operation);
                    }
                    if (!this.flushWithRetry()) {
                        logger.error("Failed to flush operations after " + this.configuration.getAttemptTimes() + " attempts: startIndex=" + operation.getStartIndex() + ",endIndex=" + operation.getEndIndex());
                    }
                    this.nextIndex = operation.getEndIndex();
                    this.setSgNextDataIndex(sgName, this.nextIndex, this.nextIndex - 1L);
                    this.commitData(sgName, this.nextIndex - 1L);
                }
                catch (InterruptedException e) {
                    break;
                }
                catch (Exception e) {
                    logger.error("Unexpected system exception", (Throwable)e);
                }
            }
            try {
                this.writer.close();
            }
            catch (IOException e) {
                ExtPipePlugin.this.handleExceptionsThrownByWriter("close", e);
                logger.info("Exception happened when closing the writer", (Throwable)e);
            }
            logger.info("ExternalPipeWorker exits. Thread={}", (Object)Thread.currentThread().getName());
            return null;
        }

        public ExternalPipeSinkWriterStatus getStatus() {
            return this.writer.getStatus();
        }

        private boolean handleOperationWithRetry(Operation operation) {
            if (operation instanceof InsertOperation) {
                try {
                    this.handleInsertOperation((InsertOperation)operation);
                }
                catch (Exception e1) {
                    logger.error("Exception happened when handling an insert operation", (Throwable)e1);
                    ExtPipePlugin.this.handleExceptionsThrownByWriter("insert", e1);
                    boolean succeed = false;
                    for (int attemptTimes = 1; ExtPipePlugin.this.alive && attemptTimes < this.configuration.getAttemptTimes(); ++attemptTimes) {
                        try {
                            Thread.sleep(this.configuration.getBackOffInterval());
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                        try {
                            this.handleInsertOperation((InsertOperation)operation);
                            succeed = true;
                            break;
                        }
                        catch (Exception e2) {
                            ExtPipePlugin.this.handleExceptionsThrownByWriter("insert", e2);
                            continue;
                        }
                    }
                    return succeed;
                }
            }
            throw new IllegalArgumentException("Unrecognized operation " + operation.getClass().getSimpleName());
            return true;
        }

        private void handleInsertOperation(InsertOperation operation) throws IOException {
            for (Pair<MeasurementPath, List<TimeValuePair>> dataPair : operation.getDataList()) {
                MeasurementPath path = (MeasurementPath)((Object)dataPair.left);
                block10: for (TimeValuePair tvPair : (List)dataPair.right) {
                    String[] nodes = path.getNodes();
                    long timestampInMs = tvPair.getTimestamp() / (long)ExtPipePlugin.this.timestampDivisor;
                    switch (tvPair.getValue().getDataType()) {
                        case BOOLEAN: {
                            this.writer.insertBoolean(nodes, timestampInMs, tvPair.getValue().getBoolean());
                            continue block10;
                        }
                        case INT32: {
                            this.writer.insertInt32(nodes, timestampInMs, tvPair.getValue().getInt());
                            continue block10;
                        }
                        case INT64: {
                            this.writer.insertInt64(nodes, timestampInMs, tvPair.getValue().getLong());
                            continue block10;
                        }
                        case FLOAT: {
                            this.writer.insertFloat(nodes, timestampInMs, tvPair.getValue().getFloat());
                            continue block10;
                        }
                        case DOUBLE: {
                            this.writer.insertDouble(nodes, timestampInMs, tvPair.getValue().getDouble());
                            continue block10;
                        }
                        case TEXT: {
                            this.writer.insertText(nodes, timestampInMs, tvPair.getValue().getStringValue());
                            continue block10;
                        }
                        case VECTOR: {
                            this.writer.insertVector(nodes, (DataType[])Arrays.stream(tvPair.getValue().getVector()).map(TsPrimitiveType::getDataType).map(type -> DataType.fromTsDataType((byte)type.serialize())).toArray(DataType[]::new), timestampInMs, (Object[])Arrays.stream(tvPair.getValue().getVector()).map(TsPrimitiveType::getValue).toArray(Object[]::new));
                            continue block10;
                        }
                    }
                    throw new IllegalArgumentException("Unrecognized data type " + tvPair.getValue().getDataType());
                }
            }
        }

        private boolean flushWithRetry() {
            try {
                this.writer.flush();
            }
            catch (IOException e1) {
                logger.error("Exception happened when flushing operations", (Throwable)e1);
                ExtPipePlugin.this.handleExceptionsThrownByWriter("flush", e1);
                boolean succeed = false;
                for (int attemptTimes = 1; ExtPipePlugin.this.alive && attemptTimes < this.configuration.getAttemptTimes(); ++attemptTimes) {
                    try {
                        Thread.sleep(this.configuration.getBackOffInterval());
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    try {
                        this.writer.flush();
                        succeed = true;
                        break;
                    }
                    catch (Exception e2) {
                        ExtPipePlugin.this.handleExceptionsThrownByWriter("flush", e2);
                        continue;
                    }
                }
                return succeed;
            }
            return true;
        }
    }
}

