package com.microsoft.azure.kusto.ingest;

import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.ClientRequestProperties;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.StreamingClient;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.DataWebException;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.result.IngestionStatus;
import com.microsoft.azure.kusto.ingest.result.IngestionStatusResult;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/kusto/ingest/StreamingIngestClient.class */
public class StreamingIngestClient extends IngestClientBase implements IngestClient {
    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final StreamingClient streamingClient;
    private static final int STREAM_COMPRESS_BUFFER_SIZE = 16384;
    public static final String EXPECTED_SERVICE_TYPE = "Engine";

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingIngestClient(ConnectionStringBuilder connectionStringBuilder) throws URISyntaxException {
        log.info("Creating a new StreamingIngestClient");
        this.streamingClient = ClientFactory.createStreamingClient(connectionStringBuilder);
        this.connectionDataSource = connectionStringBuilder.getClusterUrl();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingIngestClient(StreamingClient streamingClient) {
        log.info("Creating a new StreamingIngestClient");
        this.streamingClient = streamingClient;
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClient
    public IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        fileSourceInfo.validate();
        ingestionProperties.validate();
        try {
            return ingestFromStream(IngestionUtils.fileToStream(fileSourceInfo, false), ingestionProperties);
        } catch (FileNotFoundException e) {
            log.error("File not found when ingesting a file.", e);
            throw new IngestionClientException("IO exception - check file path.", e);
        }
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClient
    public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        log.warn("Ingesting from blob using the StreamingIngestClient is not recommended, consider using the IngestClient instead.");
        Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        blobSourceInfo.validate();
        ingestionProperties.validate();
        try {
            return ingestFromBlob(blobSourceInfo, ingestionProperties, new CloudBlockBlob(new URI(blobSourceInfo.getBlobPath())));
        } catch (StorageException e) {
            log.error("Unexpected Storage error when ingesting a blob.", e);
            throw new IngestionClientException("Unexpected Storage error when ingesting a blob.", e);
        } catch (IllegalArgumentException | URISyntaxException e2) {
            log.error("Unexpected error when ingesting a blob - Invalid blob path.", e2);
            throw new IngestionClientException("Unexpected error when ingesting a blob - Invalid blob path.", e2);
        }
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClient
    public IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        resultSetSourceInfo.validate();
        ingestionProperties.validate();
        try {
            return ingestFromStream(IngestionUtils.resultSetToStream(resultSetSourceInfo), ingestionProperties);
        } catch (IOException e) {
            log.error("Failed to read from ResultSet.", e);
            throw new IngestionClientException("Failed to read from ResultSet.", e);
        }
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClient
    public IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        streamSourceInfo.validate();
        ingestionProperties.validate();
        String format = getFormat(ingestionProperties);
        String mappingReference = getMappingReference(ingestionProperties, format);
        try {
            InputStream stream = streamSourceInfo.getCompressionType() != null ? streamSourceInfo.getStream() : compressStream(streamSourceInfo.getStream(), streamSourceInfo.isLeaveOpen());
            log.debug("Executing streaming ingest");
            this.streamingClient.executeStreamingIngest(ingestionProperties.getDatabaseName(), ingestionProperties.getTableName(), stream, (ClientRequestProperties) null, format, mappingReference, streamSourceInfo.getCompressionType() != null && streamSourceInfo.isLeaveOpen());
            log.debug("Stream was ingested successfully.");
            IngestionStatus ingestionStatus = new IngestionStatus();
            ingestionStatus.status = OperationStatus.Succeeded;
            ingestionStatus.table = ingestionProperties.getTableName();
            ingestionStatus.database = ingestionProperties.getDatabaseName();
            return new IngestionStatusResult(ingestionStatus);
        } catch (DataServiceException e) {
            log.error(e.getMessage(), e);
            if ((e.getCause() instanceof DataWebException) && "Error in post request".equals(e.getMessage())) {
                validateEndpointServiceType(this.connectionDataSource, EXPECTED_SERVICE_TYPE);
            }
            throw new IngestionServiceException(e.getMessage(), e);
        } catch (DataClientException | IOException e2) {
            log.error(e2.getMessage(), e2);
            throw new IngestionClientException(e2.getMessage(), e2);
        }
    }

    private String getFormat(IngestionProperties ingestionProperties) {
        String dataFormat = ingestionProperties.getDataFormat();
        return dataFormat == null ? "csv" : dataFormat;
    }

    private String getMappingReference(IngestionProperties ingestionProperties, String str) throws IngestionClientException {
        IngestionMapping ingestionMapping = ingestionProperties.getIngestionMapping();
        String ingestionMappingReference = ingestionMapping.getIngestionMappingReference();
        IngestionMapping.IngestionMappingKind ingestionMappingKind = ingestionMapping.getIngestionMappingKind();
        if (IngestionMapping.mappingRequiredFormats.contains(str)) {
            String str2 = null;
            if (!str.equalsIgnoreCase(ingestionMappingKind.name())) {
                str2 = String.format("Wrong ingestion mapping for format %s, found %s mapping kind.", str, ingestionMappingKind.name());
            }
            if (StringUtils.isBlank(ingestionMappingReference)) {
                str2 = String.format("Mapping reference must be specified for %s format.", str);
            }
            if (str2 != null) {
                log.error(str2);
                throw new IngestionClientException(str2);
            }
        }
        return ingestionMappingReference;
    }

    private InputStream compressStream(InputStream inputStream, boolean z) throws IngestionClientException, IOException {
        int read;
        log.debug("Compressing the stream.");
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        GZIPOutputStream gZIPOutputStream = new GZIPOutputStream(byteArrayOutputStream);
        byte[] bArr = new byte[STREAM_COMPRESS_BUFFER_SIZE];
        int read2 = inputStream.read(bArr);
        if (read2 == -1) {
            log.error("Empty stream.");
            throw new IngestionClientException("Empty stream.");
        }
        do {
            gZIPOutputStream.write(bArr, 0, read2);
            read = inputStream.read(bArr);
            read2 = read;
        } while (read != -1);
        gZIPOutputStream.flush();
        gZIPOutputStream.close();
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
        byteArrayOutputStream.close();
        if (!z) {
            inputStream.close();
        }
        return byteArrayInputStream;
    }

    IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties, CloudBlockBlob cloudBlockBlob) throws IngestionClientException, IngestionServiceException, StorageException {
        String blobPath = blobSourceInfo.getBlobPath();
        cloudBlockBlob.downloadAttributes();
        if (cloudBlockBlob.getProperties().getLength() == 0) {
            log.error("Empty blob.");
            throw new IngestionClientException("Empty blob.");
        }
        StreamSourceInfo streamSourceInfo = new StreamSourceInfo(cloudBlockBlob.openInputStream(), false, blobSourceInfo.getSourceId());
        streamSourceInfo.setCompressionType(AzureStorageClient.getCompression(blobPath));
        return ingestFromStream(streamSourceInfo, ingestionProperties);
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClientBase
    protected String emendEndpointUri(URIBuilder uRIBuilder) {
        if (!uRIBuilder.getHost().startsWith(IngestClientBase.INGEST_PREFIX)) {
            return "";
        }
        uRIBuilder.setHost(uRIBuilder.getHost().substring(IngestClientBase.INGEST_PREFIX.length()));
        return uRIBuilder.toString();
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClientBase
    protected String retrieveServiceType() throws IngestionServiceException, IngestionClientException {
        if (this.streamingClient == null) {
            return null;
        }
        log.info("Getting version to determine endpoint's ServiceType");
        try {
            KustoOperationResult execute = this.streamingClient.execute(Commands.VERSION_SHOW_COMMAND);
            if (execute == null || !execute.hasNext() || execute.getResultTables().isEmpty()) {
                throw new IngestionServiceException("Couldn't retrieve ServiceType because '.show version' didn't return any records");
            }
            KustoResultSetTable next = execute.next();
            next.next();
            return next.getString(ResourceManager.SERVICE_TYPE_COLUMN_NAME);
        } catch (DataServiceException e) {
            throw new IngestionServiceException(e.getIngestionSource(), "Couldn't retrieve ServiceType because of a service exception executing '.show version'", e);
        } catch (DataClientException e2) {
            throw new IngestionClientException(e2.getIngestionSource(), "Couldn't retrieve ServiceType because of a client exception executing '.show version'", e2);
        }
    }

    protected void setConnectionDataSource(String str) {
        this.connectionDataSource = str;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }
}
