/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spring.bigquery.core;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.StorageError;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.spring.bigquery.core.BigQueryException;
import com.google.cloud.spring.bigquery.core.BigQueryJsonDataWriter;
import com.google.cloud.spring.bigquery.core.BigQueryOperations;
import com.google.cloud.spring.bigquery.core.WriteApiResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.StreamUtils;

public class BigQueryTemplate
implements BigQueryOperations {
    private final BigQuery bigQuery;
    private final String datasetName;
    private final TaskScheduler taskScheduler;
    private final BigQueryWriteClient bigQueryWriteClient;
    private boolean autoDetectSchema = true;
    private JobInfo.WriteDisposition writeDisposition = JobInfo.WriteDisposition.WRITE_APPEND;
    private JobInfo.CreateDisposition createDisposition;
    private Duration jobPollInterval = Duration.ofSeconds(2L);
    private static final int DEFAULT_JSON_STREAM_WRITER_BATCH_SIZE = 1000;
    private static final int MIN_JSON_STREAM_WRITER_BATCH_SIZE = 10;
    private final Logger logger = LoggerFactory.getLogger(BigQueryTemplate.class);
    private final int jsonWriterBatchSize;

    public BigQueryTemplate(BigQuery bigQuery, BigQueryWriteClient bigQueryWriteClient, Map<String, Object> bqInitSettings, TaskScheduler taskScheduler) {
        String bqDatasetName = (String)bqInitSettings.get("DATASET_NAME");
        Assert.notNull((Object)bigQuery, (String)"BigQuery client object must not be null.");
        Assert.notNull((Object)bqDatasetName, (String)"Dataset name must not be null");
        Assert.notNull((Object)taskScheduler, (String)"TaskScheduler must not be null");
        Assert.notNull((Object)bigQueryWriteClient, (String)"BigQueryWriteClient must not be null");
        this.jsonWriterBatchSize = (Integer)bqInitSettings.getOrDefault("JSON_WRITER_BATCH_SIZE", 1000);
        this.bigQuery = bigQuery;
        this.datasetName = bqDatasetName;
        this.taskScheduler = taskScheduler;
        this.bigQueryWriteClient = bigQueryWriteClient;
    }

    public void setAutoDetectSchema(boolean autoDetectSchema) {
        this.autoDetectSchema = autoDetectSchema;
    }

    public void setWriteDisposition(JobInfo.WriteDisposition writeDisposition) {
        Assert.notNull((Object)writeDisposition, (String)"BigQuery write disposition must not be null.");
        this.writeDisposition = writeDisposition;
    }

    public void setCreateDisposition(JobInfo.CreateDisposition createDisposition) {
        Assert.notNull((Object)createDisposition, (String)"BigQuery create disposition must not be null.");
        this.createDisposition = createDisposition;
    }

    public void setJobPollInterval(Duration jobPollInterval) {
        Assert.notNull((Object)jobPollInterval, (String)"BigQuery job polling interval must not be null");
        this.jobPollInterval = jobPollInterval;
    }

    @Override
    public CompletableFuture<Job> writeDataToTable(String tableName, InputStream inputStream, FormatOptions dataFormatOptions) {
        return this.writeDataToTable(tableName, inputStream, dataFormatOptions, null);
    }

    @Override
    public CompletableFuture<Job> writeDataToTable(String tableName, InputStream inputStream, FormatOptions dataFormatOptions, Schema schema) {
        TableId tableId = TableId.of((String)this.datasetName, (String)tableName);
        WriteChannelConfiguration.Builder writeChannelConfiguration = WriteChannelConfiguration.newBuilder((TableId)tableId).setFormatOptions(dataFormatOptions).setWriteDisposition(this.writeDisposition).setCreateDisposition(this.createDisposition).setAutodetect(Boolean.valueOf(this.autoDetectSchema));
        if (schema != null) {
            writeChannelConfiguration.setSchema(schema);
        }
        TableDataWriteChannel writer = this.bigQuery.writer(writeChannelConfiguration.build());
        try (OutputStream sink = Channels.newOutputStream((WritableByteChannel)writer);){
            StreamUtils.copy((InputStream)inputStream, (OutputStream)sink);
        }
        catch (IOException e) {
            throw new BigQueryException("Failed to write data to BigQuery tables.", e);
        }
        if (writer.getJob() == null) {
            throw new BigQueryException("Failed to initialize the BigQuery write job.");
        }
        return this.createJobFuture(writer.getJob());
    }

    @Override
    public CompletableFuture<WriteApiResponse> writeJsonStream(String tableName, InputStream jsonInputStream, Schema schema) {
        this.createTable(tableName, schema);
        return this.writeJsonStream(tableName, jsonInputStream);
    }

    @VisibleForTesting
    public Table createTable(String tableName, Schema schema) {
        TableId tableId = TableId.of((String)this.datasetName, (String)tableName);
        Table table = this.bigQuery.getTable(TableId.of((String)this.datasetName, (String)tableName), new BigQuery.TableOption[0]);
        if (table == null || !table.exists()) {
            StandardTableDefinition tableDefinition = StandardTableDefinition.of((Schema)schema);
            TableInfo tableInfo = TableInfo.newBuilder((TableId)tableId, (TableDefinition)tableDefinition).build();
            return this.bigQuery.create(tableInfo, new BigQuery.TableOption[0]);
        }
        return null;
    }

    @Override
    public CompletableFuture<WriteApiResponse> writeJsonStream(String tableName, InputStream jsonInputStream) {
        CompletableFuture<WriteApiResponse> writeApiFutureResponse = new CompletableFuture<WriteApiResponse>();
        Runnable asyncTask = () -> {
            try {
                WriteApiResponse apiResponse = this.getWriteApiResponse(tableName, jsonInputStream);
                writeApiFutureResponse.complete(apiResponse);
            }
            catch (Exception e) {
                writeApiFutureResponse.completeExceptionally(e);
                Thread.currentThread().interrupt();
                this.logger.warn("Unable to get write API response.", (Throwable)e);
            }
        };
        ScheduledFuture asyncTaskScheduledFuture = this.taskScheduler.schedule(asyncTask, Instant.now());
        writeApiFutureResponse.whenComplete((writeApiResponse, exception) -> {
            if (exception != null) {
                this.logger.error("asyncTask interrupted", exception);
                if (exception instanceof CancellationException) {
                    asyncTaskScheduledFuture.cancel(true);
                }
            } else if (writeApiResponse != null && !writeApiResponse.isSuccessful()) {
                this.logger.warn("Write operation failed");
            } else {
                this.logger.info("Data successfully written");
            }
        });
        return writeApiFutureResponse;
    }

    @VisibleForTesting
    public BigQueryJsonDataWriter getBigQueryJsonDataWriter(TableName parentTable) throws Descriptors.DescriptorValidationException, IOException, InterruptedException {
        return new BigQueryJsonDataWriter(parentTable, this.bigQueryWriteClient);
    }

    public WriteApiResponse getWriteApiResponse(String tableName, InputStream jsonInputStream) throws Descriptors.DescriptorValidationException, IOException, InterruptedException {
        WriteApiResponse apiResponse = new WriteApiResponse();
        TableName parentTable = TableName.of((String)((BigQueryOptions)this.bigQuery.getOptions()).getProjectId(), (String)this.datasetName, (String)tableName);
        BigQueryJsonDataWriter writer = this.getBigQueryJsonDataWriter(parentTable);
        try {
            long offset = 0L;
            int currentBatchSize = 0;
            BufferedReader jsonReader = new BufferedReader(new InputStreamReader(jsonInputStream));
            String jsonLine = null;
            JSONArray jsonBatch = new JSONArray();
            while ((jsonLine = jsonReader.readLine()) != null) {
                JSONObject jsonObj = new JSONObject(jsonLine);
                jsonBatch.put((Object)jsonObj);
                if (++currentBatchSize != this.getBatchSize()) continue;
                writer.append(jsonBatch, offset);
                offset += (long)jsonBatch.length();
                jsonBatch = new JSONArray();
                currentBatchSize = 0;
            }
            if (jsonBatch.length() != 0) {
                writer.append(jsonBatch, offset);
            }
        }
        catch (Exception e) {
            throw new BigQueryException("Failed to append records. \n" + e);
        }
        writer.finalizeWriteStream();
        BatchCommitWriteStreamsResponse commitResponse = this.getCommitResponse(parentTable, writer);
        if (!commitResponse.hasCommitTime()) {
            for (StorageError err : commitResponse.getStreamErrorsList()) {
                apiResponse.addError(err);
            }
        }
        if (apiResponse.getErrors().isEmpty()) {
            apiResponse.setSuccessful(true);
        }
        return apiResponse;
    }

    @VisibleForTesting
    public BatchCommitWriteStreamsResponse getCommitResponse(TableName parentTable, BigQueryJsonDataWriter writer) {
        BatchCommitWriteStreamsRequest commitRequest = BatchCommitWriteStreamsRequest.newBuilder().setParent(parentTable.toString()).addWriteStreams(writer.getStreamName()).build();
        return this.bigQueryWriteClient.batchCommitWriteStreams(commitRequest);
    }

    private int getBatchSize() {
        return this.jsonWriterBatchSize > 10 ? this.jsonWriterBatchSize : 1000;
    }

    public String getDatasetName() {
        return this.datasetName;
    }

    public int getJsonWriterBatchSize() {
        return this.jsonWriterBatchSize;
    }

    private CompletableFuture<Job> createJobFuture(Job pendingJob) {
        CompletableFuture<Job> result = new CompletableFuture<Job>();
        ScheduledFuture scheduledFuture = this.taskScheduler.scheduleAtFixedRate(() -> {
            try {
                Job job = pendingJob.reload(new BigQuery.JobOption[0]);
                if (JobStatus.State.DONE.equals((Object)job.getStatus().getState())) {
                    if (job.getStatus().getError() != null) {
                        result.completeExceptionally((Throwable)((Object)new BigQueryException(job.getStatus().getError().getMessage())));
                    } else {
                        result.complete(job);
                    }
                }
            }
            catch (Exception e) {
                result.completeExceptionally((Throwable)((Object)new BigQueryException(e.getMessage())));
            }
        }, this.jobPollInterval);
        result.whenComplete((response, exception) -> {
            if (exception != null) {
                pendingJob.cancel();
                scheduledFuture.cancel(true);
                return;
            }
            scheduledFuture.cancel(true);
        });
        return result;
    }
}

