package com.microsoft.azure.kusto.ingest;

import com.microsoft.azure.kusto.data.StreamingClient;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.DataWebException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URISyntaxException;
import org.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.class */
public class ManagedStreamingIngestClient implements IngestClient {
    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    public static final int MAX_RETRY_CALLS = 3;
    private final QueuedIngestClient queuedIngestClient;
    private final StreamingIngestClient streamingIngestClient;

    public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, ConnectionStringBuilder connectionStringBuilder2) throws URISyntaxException {
        log.info("Creating a new ManagedStreamingIngestClient from connection strings");
        this.queuedIngestClient = new QueuedIngestClient(connectionStringBuilder);
        this.streamingIngestClient = new StreamingIngestClient(connectionStringBuilder2);
    }

    public ManagedStreamingIngestClient(ResourceManager resourceManager, AzureStorageClient azureStorageClient, StreamingClient streamingClient) {
        log.info("Creating a new ManagedStreamingIngestClient from raw parts");
        this.queuedIngestClient = new QueuedIngestClient(resourceManager, azureStorageClient);
        this.streamingIngestClient = new StreamingIngestClient(streamingClient);
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClient
    public IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        fileSourceInfo.validate();
        ingestionProperties.validate();
        try {
            return ingestFromStream(IngestionUtils.fileToStream(fileSourceInfo, true), ingestionProperties);
        } catch (FileNotFoundException e) {
            log.error("File not found when ingesting a file.", e);
            throw new IngestionClientException("IO exception - check file path.", e);
        }
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClient
    public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        blobSourceInfo.validate();
        ingestionProperties.validate();
        return this.queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClient
    public IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        resultSetSourceInfo.validate();
        ingestionProperties.validate();
        try {
            return ingestFromStream(IngestionUtils.resultSetToStream(resultSetSourceInfo), ingestionProperties);
        } catch (IOException e) {
            log.error("Failed to read from ResultSet.", e);
            throw new IngestionClientException("Failed to read from ResultSet.", e);
        }
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClient
    public IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        streamSourceInfo.validate();
        ingestionProperties.validate();
        if (streamSourceInfo.isLeaveOpen()) {
            throw new UnsupportedOperationException("LeaveOpen can't be true in ManagedStreamingIngestClient");
        }
        streamSourceInfo.setLeaveOpen(true);
        for (int i = 0; i < 3; i++) {
            try {
                return this.streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties);
            } catch (Exception e) {
                try {
                    if ((e instanceof IngestionServiceException) && e.getCause() != null && (e.getCause() instanceof DataServiceException) && e.getCause().getCause() != null && (e.getCause().getCause() instanceof DataWebException)) {
                        try {
                            if (e.getCause().getCause().getApiError().isPermanent()) {
                                log.error("Error is permanent, stopping.");
                                throw e;
                                break;
                            }
                        } catch (JSONException e2) {
                            log.info("Failed to parse json in exception, continuing.", e2);
                        }
                    }
                    log.info("Streaming ingestion failed, trying again", e);
                    try {
                        streamSourceInfo.getStream().reset();
                    } catch (IOException e3) {
                        throw new IngestionClientException("Ingestion failed transiently but the stream isn't resettable therefore ingestion wasn't retried", e3);
                    }
                } finally {
                    try {
                        streamSourceInfo.getStream().close();
                    } catch (IOException e4) {
                        log.warn("Failed to close stream", e4);
                    }
                }
            }
        }
        IngestionResult ingestFromStream = this.queuedIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties);
        try {
            streamSourceInfo.getStream().close();
        } catch (IOException e5) {
            log.warn("Failed to close stream", e5);
        }
        return ingestFromStream;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.queuedIngestClient.close();
        this.streamingIngestClient.close();
    }
}
