/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.client.jdbc;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.SequenceInputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPInputStream;
import net.snowflake.client.core.HttpUtil;
import net.snowflake.client.core.ObjectMapperFactory;
import net.snowflake.client.jdbc.ErrorCode;
import net.snowflake.client.jdbc.RestRequest;
import net.snowflake.client.jdbc.SnowflakeResultChunk;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.client.jdbc.SnowflakeUtil;
import net.snowflake.client.jdbc.internal.apache.http.Header;
import net.snowflake.client.jdbc.internal.apache.http.HttpEntity;
import net.snowflake.client.jdbc.internal.apache.http.HttpResponse;
import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse;
import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpGet;
import net.snowflake.client.jdbc.internal.apache.http.client.utils.URIBuilder;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonFactory;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonParser;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonToken;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.MappingJsonFactory;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import net.snowflake.client.log.SFLogger;
import net.snowflake.client.log.SFLoggerFactory;

public class SnowflakeChunkDownloader {
    private static final String SSE_C_ALGORITHM = "x-amz-server-side-encryption-customer-algorithm";
    private static final String SSE_C_KEY = "x-amz-server-side-encryption-customer-key";
    private static final String SSE_C_AES = "AES256";
    private static final ObjectMapper mapper = ObjectMapperFactory.getObjectMapper();
    private static final JsonFactory jsonFactory = new MappingJsonFactory();
    private static final SFLogger logger = SFLoggerFactory.getLogger(SnowflakeChunkDownloader.class);
    private SnowflakeResultChunk.ResultChunkDataCache chunkDataCache = new SnowflakeResultChunk.ResultChunkDataCache();
    private List<SnowflakeResultChunk> chunks = null;
    private int nextChunkToConsume = 0;
    private int nextChunkToDownload = 0;
    private final int prefetchSlots;
    private boolean useJsonParser = false;
    private ThreadPoolExecutor executor;
    private long numberMillisWaitingForChunks = 0L;
    private boolean terminated = false;
    private final AtomicLong totalMillisDownloadingChunks = new AtomicLong(0L);
    private final AtomicLong totalMillisParsingChunks = new AtomicLong(0L);
    private final String qrmk;
    private Map<String, String> chunkHeadersMap = null;
    private final int networkTimeoutInMilli;
    private long memoryLimit;
    private static Long currentMemoryUsage = 0L;
    private long BASE_WAITING_MS = 50L;
    private long WAITING_SECS_MULTIPLIER = 2L;
    private long MAX_WAITING_MS = 30000L;
    private long WAITING_JITTER_RATIO = 10L;
    private final long downloadedConditionTimeoutInSeconds = 3600L;

    private static ThreadPoolExecutor createChunkDownloaderExecutorService(final String threadNamePrefix, int parallel) {
        ThreadFactory threadFactory = new ThreadFactory(){
            private int threadCount = 1;

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName(threadNamePrefix + this.threadCount++);
                thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

                    @Override
                    public void uncaughtException(Thread t, Throwable e) {
                        logger.error("uncaughtException in thread: " + t + " {}", e);
                    }
                });
                thread.setDaemon(true);
                return thread;
            }
        };
        return (ThreadPoolExecutor)Executors.newFixedThreadPool(parallel, threadFactory);
    }

    public SnowflakeChunkDownloader(int colCount, JsonNode chunksData, int prefetchThreads, String qrmk, JsonNode chunkHeaders, int networkTimeoutInMilli, boolean useJsonParser, long memoryLimit, boolean efficientChunkStorage) throws SnowflakeSQLException {
        this.qrmk = qrmk;
        this.networkTimeoutInMilli = networkTimeoutInMilli;
        this.prefetchSlots = prefetchThreads * 2;
        this.useJsonParser = useJsonParser;
        this.memoryLimit = memoryLimit;
        logger.debug("qrmk = {}", qrmk);
        if (chunkHeaders != null && !chunkHeaders.isMissingNode()) {
            this.chunkHeadersMap = new HashMap<String, String>(2);
            Iterator<Map.Entry<String, JsonNode>> chunkHeadersIter = chunkHeaders.fields();
            while (chunkHeadersIter.hasNext()) {
                Map.Entry<String, JsonNode> chunkHeader = chunkHeadersIter.next();
                logger.debug("add header key={}, value={}", chunkHeader.getKey(), chunkHeader.getValue().asText());
                this.chunkHeadersMap.put(chunkHeader.getKey(), chunkHeader.getValue().asText());
            }
        }
        if (chunksData == null) {
            logger.debug("no chunk data");
            return;
        }
        int numChunks = chunksData.size();
        this.chunks = new ArrayList<SnowflakeResultChunk>(numChunks);
        for (int idx = 0; idx < numChunks; ++idx) {
            JsonNode chunkNode = chunksData.get(idx);
            SnowflakeResultChunk chunk = new SnowflakeResultChunk(chunkNode.path("url").asText(), chunkNode.path("rowCount").asInt(), colCount, chunkNode.path("uncompressedSize").asInt(), efficientChunkStorage);
            logger.debug("add chunk, url={} rowCount={}", chunk.getUrl(), chunk.getRowCount());
            this.chunks.add(chunk);
        }
        int effectiveThreads = Math.min(prefetchThreads, numChunks);
        logger.debug("#chunks: {} #threads:{} #slots:{} -> pool:{}", numChunks, prefetchThreads, this.prefetchSlots, effectiveThreads);
        this.executor = SnowflakeChunkDownloader.createChunkDownloaderExecutorService("result-chunk-downloader-", effectiveThreads);
        this.startNextDownloaders();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startNextDownloaders() throws SnowflakeSQLException {
        logger.debug("Submit {} chunks to be pre-fetched", Math.min(this.prefetchSlots, this.chunks.size()));
        long waitingTime = this.BASE_WAITING_MS;
        while (this.nextChunkToDownload - this.nextChunkToConsume < this.prefetchSlots && this.nextChunkToDownload < this.chunks.size()) {
            SnowflakeResultChunk nextChunk = this.chunks.get(this.nextChunkToDownload);
            long neededChunkMemory = nextChunk.computeNeededChunkMemory();
            Long l = currentMemoryUsage;
            synchronized (l) {
                if (neededChunkMemory > this.memoryLimit) {
                    logger.debug("{}: reset memoryLimit from {} MB to current chunk size {} MB", Thread.currentThread().getName(), this.memoryLimit / 1024L / 1024L, neededChunkMemory / 1024L / 1024L);
                    this.memoryLimit = neededChunkMemory;
                }
                if (currentMemoryUsage + neededChunkMemory > this.memoryLimit && this.nextChunkToDownload - this.nextChunkToConsume > 0) {
                    break;
                }
                if (currentMemoryUsage + neededChunkMemory <= this.memoryLimit) {
                    nextChunk.tryReuse(this.chunkDataCache);
                    currentMemoryUsage = currentMemoryUsage + neededChunkMemory;
                    logger.debug("{}: currentMemoryUsage in MB: {}, nextD: {}, nextC: {}, allocated: {} ", Thread.currentThread().getName(), currentMemoryUsage / 1024L / 1024L, this.nextChunkToDownload, this.nextChunkToConsume, neededChunkMemory);
                    logger.debug("submit chunk #{} for downloading, url={}", this.nextChunkToDownload, nextChunk.getUrl());
                    this.executor.submit(SnowflakeChunkDownloader.getDownloadChunkCallable(this, nextChunk, this.qrmk, this.nextChunkToDownload, this.chunkHeadersMap, this.networkTimeoutInMilli));
                    ++this.nextChunkToDownload;
                    waitingTime = this.BASE_WAITING_MS;
                    continue;
                }
            }
            try {
                waitingTime = (waitingTime *= this.WAITING_SECS_MULTIPLIER) > this.MAX_WAITING_MS ? this.MAX_WAITING_MS : waitingTime;
                long jitter = ThreadLocalRandom.current().nextLong(0L, waitingTime / this.WAITING_JITTER_RATIO);
                logger.debug("{} waiting for {}s: currentMemoryUsage in MB: {}, needed: {}, nextD: {}, nextC: {} ", Thread.currentThread().getName(), (double)(waitingTime += jitter) / 1000.0, currentMemoryUsage / 1024L / 1024L, neededChunkMemory / 1024L / 1024L, this.nextChunkToDownload, this.nextChunkToConsume);
                Thread.sleep(waitingTime);
            }
            catch (InterruptedException ie) {
                throw new SnowflakeSQLException("XX000", ErrorCode.INTERNAL_ERROR.getMessageCode(), "Waiting SnowflakeChunkDownloader has been interrupted.");
            }
        }
        this.chunkDataCache.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void releaseCurrentMemoryUsage(int chunk) {
        Long l = currentMemoryUsage;
        synchronized (l) {
            currentMemoryUsage = currentMemoryUsage - this.chunks.get(chunk).computeNeededChunkMemory();
            logger.debug("{}: currentMemoryUsage in MB: {}, released: {}, chunk: {}", Thread.currentThread().getName(), currentMemoryUsage / 1024L / 1024L, this.chunks.get(chunk).computeNeededChunkMemory(), chunk);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SnowflakeResultChunk getNextChunkToConsume() throws InterruptedException, SnowflakeSQLException {
        boolean terminateDownloader;
        SnowflakeResultChunk snowflakeResultChunk;
        if (this.nextChunkToConsume > 0) {
            int prevChunk = this.nextChunkToConsume - 1;
            logger.debug("free chunk data for chunk #{}", prevChunk);
            this.releaseCurrentMemoryUsage(prevChunk);
            if (this.nextChunkToDownload < this.chunks.size()) {
                this.chunkDataCache.add(this.chunks.get(prevChunk));
            } else {
                this.chunkDataCache.clear();
            }
            this.chunks.get(prevChunk).freeData();
        }
        if (this.nextChunkToConsume >= this.chunks.size()) {
            logger.debug("no more chunk");
            return null;
        }
        this.startNextDownloaders();
        SnowflakeResultChunk currentChunk = this.chunks.get(this.nextChunkToConsume);
        if (currentChunk.getDownloadState() == SnowflakeResultChunk.DownloadState.SUCCESS) {
            logger.debug("chunk #{} is ready to consume", this.nextChunkToConsume);
            ++this.nextChunkToConsume;
            if (this.nextChunkToConsume == this.chunks.size()) {
                this.releaseCurrentMemoryUsage(this.nextChunkToConsume - 1);
            }
            return currentChunk;
        }
        try {
            logger.debug("chunk #{} is not ready to consume", this.nextChunkToConsume);
            currentChunk.getLock().lock();
            logger.debug("consumer get lock to check chunk state");
            while (currentChunk.getDownloadState() != SnowflakeResultChunk.DownloadState.SUCCESS && currentChunk.getDownloadState() != SnowflakeResultChunk.DownloadState.FAILURE) {
                logger.debug("wait for chunk #{} to be ready, currentchunk state is: {}", new Object[]{this.nextChunkToConsume, currentChunk.getDownloadState()});
                long startTime = System.currentTimeMillis();
                if (!currentChunk.getDownloadCondition().await(3600L, TimeUnit.SECONDS)) {
                    currentChunk.setDownloadState(SnowflakeResultChunk.DownloadState.FAILURE);
                    currentChunk.setDownloadError(String.format("Timeout waiting for the download of chunk #%d(Total chunks: %d)", this.nextChunkToConsume, this.chunks.size()));
                }
                this.numberMillisWaitingForChunks += System.currentTimeMillis() - startTime;
                logger.debug("woken up from waiting for chunk #{} to be ready", this.nextChunkToConsume);
            }
            if (currentChunk.getDownloadState() == SnowflakeResultChunk.DownloadState.FAILURE) {
                logger.error("downloader encountered error: {}", currentChunk.getDownloadError());
                throw new SnowflakeSQLException("XX000", ErrorCode.INTERNAL_ERROR.getMessageCode(), currentChunk.getDownloadError());
            }
            logger.debug("chunk #{} is ready to consume", this.nextChunkToConsume);
            ++this.nextChunkToConsume;
            snowflakeResultChunk = currentChunk;
            logger.debug("consumer free lock");
            terminateDownloader = currentChunk.getDownloadState() == SnowflakeResultChunk.DownloadState.FAILURE;
        }
        catch (Throwable throwable) {
            logger.debug("consumer free lock");
            boolean terminateDownloader2 = currentChunk.getDownloadState() == SnowflakeResultChunk.DownloadState.FAILURE;
            currentChunk.getLock().unlock();
            if (this.nextChunkToConsume == this.chunks.size()) {
                this.releaseCurrentMemoryUsage(this.nextChunkToConsume - 1);
            }
            if (terminateDownloader2) {
                logger.debug("Download result fail. Shut down the chunk downloader");
                this.terminate();
            }
            throw throwable;
        }
        currentChunk.getLock().unlock();
        if (this.nextChunkToConsume == this.chunks.size()) {
            this.releaseCurrentMemoryUsage(this.nextChunkToConsume - 1);
        }
        if (terminateDownloader) {
            logger.debug("Download result fail. Shut down the chunk downloader");
            this.terminate();
        }
        return snowflakeResultChunk;
    }

    public Metrics terminate() {
        if (!this.terminated) {
            logger.debug("Total milliseconds waiting for chunks: {}, Total memory used: {}, total download time: {} millisec, total parsing time: {} milliseconds, total chunks: {}", this.numberMillisWaitingForChunks, Runtime.getRuntime().totalMemory(), this.totalMillisDownloadingChunks.get(), this.totalMillisParsingChunks.get(), this.chunks.size());
            if (this.executor != null) {
                this.executor.shutdownNow();
                this.executor = null;
            }
            this.chunks = null;
            this.chunkDataCache.clear();
            this.terminated = true;
            return new Metrics();
        }
        return null;
    }

    private void addDownloadTime(long downloadTime) {
        this.totalMillisDownloadingChunks.addAndGet(downloadTime);
    }

    private void addParsingTime(long parsingTime) {
        this.totalMillisParsingChunks.addAndGet(parsingTime);
    }

    private static Callable<Void> getDownloadChunkCallable(final SnowflakeChunkDownloader downloader, final SnowflakeResultChunk resultChunk, final String qrmk, final int chunkIndex, final Map<String, String> chunkHeadersMap, final int networkTimeoutInMilli) {
        return new Callable<Void>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Void call() throws Exception {
                try {
                    SequenceInputStream jsonInputStream;
                    try {
                        resultChunk.getLock().lock();
                        resultChunk.setDownloadState(SnowflakeResultChunk.DownloadState.IN_PROGRESS);
                    }
                    finally {
                        resultChunk.getLock().unlock();
                    }
                    logger.debug("Downloading chunk {}, url={}", chunkIndex, resultChunk.getUrl());
                    long startTime = System.currentTimeMillis();
                    HttpResponse response = this.getResultChunk(resultChunk.getUrl());
                    if (response == null || response.getStatusLine().getStatusCode() != 200) {
                        logger.error("Error fetching chunk from: {}", resultChunk.getUrl());
                        SnowflakeUtil.logResponseDetails(response, logger);
                        throw new SnowflakeSQLException("58030", ErrorCode.NETWORK_ERROR.getMessageCode(), "Error encountered when downloading a result chunk: HTTP status=" + (response != null ? Integer.valueOf(response.getStatusLine().getStatusCode()) : "null response"));
                    }
                    HttpEntity entity = response.getEntity();
                    try {
                        InputStream is = new HttpUtil.HttpInputStream(entity.getContent());
                        Header encoding = response.getFirstHeader("Content-Encoding");
                        if (encoding != null) {
                            if (encoding.getValue().equalsIgnoreCase("gzip")) {
                                is = new GZIPInputStream(is, 65536);
                            } else {
                                throw new SnowflakeSQLException("XX000", ErrorCode.INTERNAL_ERROR.getMessageCode(), "Exception: unexpected compression got " + encoding.getValue());
                            }
                        }
                        jsonInputStream = new SequenceInputStream(Collections.enumeration(Arrays.asList(new ByteArrayInputStream("[".getBytes(StandardCharsets.UTF_8)), is, new ByteArrayInputStream("]".getBytes(StandardCharsets.UTF_8)))));
                    }
                    catch (Exception ex) {
                        logger.error("Failed to uncompress data: {}", response);
                        throw ex;
                    }
                    resultChunk.setDownloadTime(System.currentTimeMillis() - startTime);
                    downloader.addDownloadTime(resultChunk.getDownloadTime());
                    startTime = System.currentTimeMillis();
                    logger.debug("Json response: {}", response);
                    JsonNode resultData = null;
                    try {
                        if (downloader.useJsonParser) {
                            this.parseJsonToChunk(jsonInputStream, resultChunk);
                        } else {
                            resultData = mapper.readTree(jsonInputStream);
                        }
                    }
                    catch (Exception ex) {
                        logger.error("Exception when parsing result", ex);
                        throw new SnowflakeSQLException(ex, "XX000", (int)ErrorCode.INTERNAL_ERROR.getMessageCode(), "Exception: " + ex.getLocalizedMessage() + "\nBad result json: " + response.toString());
                    }
                    finally {
                        ((InputStream)jsonInputStream).close();
                    }
                    resultChunk.setParseTime(System.currentTimeMillis() - startTime);
                    downloader.addParsingTime(resultChunk.getParseTime());
                    resultChunk.setResultData(resultData);
                    logger.debug("Finished preparing chunk data for {}, total download time={}ms, total parse time={}ms", resultChunk.getUrl(), resultChunk.getDownloadTime(), resultChunk.getParseTime());
                    try {
                        resultChunk.getLock().lock();
                        logger.debug("get lock to change the chunk to be ready to consume");
                        logger.debug("wake up consumer if it is waiting for a chunk to be ready");
                        resultChunk.setDownloadState(SnowflakeResultChunk.DownloadState.SUCCESS);
                        resultChunk.getDownloadCondition().signal();
                    }
                    catch (Throwable throwable) {
                        logger.debug("Downloaded chunk {}, free lock", chunkIndex);
                        resultChunk.getLock().unlock();
                        throw throwable;
                    }
                    logger.debug("Downloaded chunk {}, free lock", chunkIndex);
                    resultChunk.getLock().unlock();
                }
                catch (Throwable ex) {
                    try {
                        logger.debug("get lock to set chunk download error");
                        resultChunk.getLock().lock();
                        resultChunk.setDownloadState(SnowflakeResultChunk.DownloadState.FAILURE);
                        resultChunk.setDownloadError(ex.getLocalizedMessage());
                        logger.debug("wake up consumer if it is waiting for a chunk to be ready");
                        resultChunk.getDownloadCondition().signal();
                    }
                    catch (Throwable throwable) {
                        logger.debug("Failed to download chunk {}, free lock", chunkIndex);
                        resultChunk.getLock().unlock();
                        throw throwable;
                    }
                    logger.debug("Failed to download chunk {}, free lock", chunkIndex);
                    resultChunk.getLock().unlock();
                    logger.error("Exception encountered ({}:{}) fetching chunk from: {}", ex.getClass().getName(), ex.getLocalizedMessage(), resultChunk.getUrl());
                    logger.error("Exception: ", ex);
                }
                return null;
            }

            private void parseJsonToChunk(InputStream jsonInputStream, SnowflakeResultChunk resultChunk2) throws IOException, SnowflakeSQLException {
                try (JsonParser jp = jsonFactory.createParser(new InputStreamReader(jsonInputStream, "UTF-8"));){
                    JsonToken currentToken = jp.nextToken();
                    if (currentToken != JsonToken.START_ARRAY) {
                        throw new SnowflakeSQLException("XX000", ErrorCode.INTERNAL_ERROR.getMessageCode(), "Exception1: expected '[' got " + currentToken.asString());
                    }
                    while (jp.nextToken() != JsonToken.END_ARRAY) {
                        resultChunk2.addRow(mapper.readValue(jp, Object[].class));
                    }
                    resultChunk2.ensureRowsComplete();
                }
            }

            private HttpResponse getResultChunk(String chunkUrl) throws URISyntaxException, IOException, SnowflakeSQLException {
                URIBuilder uriBuilder = new URIBuilder(chunkUrl);
                HttpGet httpRequest = new HttpGet(uriBuilder.build());
                if (chunkHeadersMap != null && chunkHeadersMap.size() != 0) {
                    for (Map.Entry entry : chunkHeadersMap.entrySet()) {
                        logger.debug("Adding header key={}, value={}", entry.getKey(), entry.getValue());
                        httpRequest.addHeader((String)entry.getKey(), (String)entry.getValue());
                    }
                } else if (qrmk != null) {
                    httpRequest.addHeader(SnowflakeChunkDownloader.SSE_C_ALGORITHM, SnowflakeChunkDownloader.SSE_C_AES);
                    httpRequest.addHeader(SnowflakeChunkDownloader.SSE_C_KEY, qrmk);
                    logger.debug("Adding SSE-C headers");
                }
                logger.debug("Fetching result: {}", resultChunk.getUrl());
                CloseableHttpClient httpClient = HttpUtil.getHttpClient();
                CloseableHttpResponse response = RestRequest.execute(httpClient, httpRequest, networkTimeoutInMilli / 1000, 0, null, false, false, false, true);
                logger.debug("Call returned for URL: {}", chunkUrl);
                return response;
            }
        };
    }

    public class Metrics {
        public final long millisWaiting;
        public final long millisDownloading;
        public final long millisParsing;

        private Metrics() {
            SnowflakeChunkDownloader outer = SnowflakeChunkDownloader.this;
            this.millisWaiting = outer.numberMillisWaitingForChunks;
            this.millisDownloading = outer.totalMillisDownloadingChunks.get();
            this.millisParsing = outer.totalMillisParsingChunks.get();
        }
    }
}

