/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.kusto.ingest;

import com.microsoft.azure.kusto.data.StreamingClient;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.DataWebException;
import com.microsoft.azure.kusto.data.exceptions.OneApiError;
import com.microsoft.azure.kusto.ingest.AzureStorageClient;
import com.microsoft.azure.kusto.ingest.Ensure;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestionUtils;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import com.microsoft.azure.kusto.ingest.ResourceManager;
import com.microsoft.azure.kusto.ingest.StreamingIngestClient;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URISyntaxException;
import org.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManagedStreamingIngestClient
implements IngestClient {
    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    public static final int MAX_RETRY_CALLS = 3;
    private final QueuedIngestClient queuedIngestClient;
    private final StreamingIngestClient streamingIngestClient;

    public ManagedStreamingIngestClient(ConnectionStringBuilder dmConnectionStringBuilder, ConnectionStringBuilder engineConnectionStringBuilder) throws URISyntaxException {
        log.info("Creating a new ManagedStreamingIngestClient from connection strings");
        this.queuedIngestClient = new QueuedIngestClient(dmConnectionStringBuilder);
        this.streamingIngestClient = new StreamingIngestClient(engineConnectionStringBuilder);
    }

    public ManagedStreamingIngestClient(ResourceManager resourceManager, AzureStorageClient storageClient, StreamingClient streamingClient) {
        log.info("Creating a new ManagedStreamingIngestClient from raw parts");
        this.queuedIngestClient = new QueuedIngestClient(resourceManager, storageClient);
        this.streamingIngestClient = new StreamingIngestClient(streamingClient);
    }

    @Override
    public IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        fileSourceInfo.validate();
        ingestionProperties.validate();
        try {
            StreamSourceInfo streamSourceInfo = IngestionUtils.fileToStream(fileSourceInfo, true);
            return this.ingestFromStream(streamSourceInfo, ingestionProperties);
        }
        catch (FileNotFoundException e) {
            log.error("File not found when ingesting a file.", (Throwable)e);
            throw new IngestionClientException("IO exception - check file path.", e);
        }
    }

    @Override
    public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        blobSourceInfo.validate();
        ingestionProperties.validate();
        return this.queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
    }

    @Override
    public IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        resultSetSourceInfo.validate();
        ingestionProperties.validate();
        try {
            StreamSourceInfo streamSourceInfo = IngestionUtils.resultSetToStream(resultSetSourceInfo);
            return this.ingestFromStream(streamSourceInfo, ingestionProperties);
        }
        catch (IOException ex) {
            String msg = "Failed to read from ResultSet.";
            log.error(msg, (Throwable)ex);
            throw new IngestionClientException(msg, ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        streamSourceInfo.validate();
        ingestionProperties.validate();
        if (streamSourceInfo.isLeaveOpen()) {
            throw new UnsupportedOperationException("LeaveOpen can't be true in ManagedStreamingIngestClient");
        }
        streamSourceInfo.setLeaveOpen(true);
        try {
            for (int i = 0; i < 3; ++i) {
                try {
                    IngestionResult ingestionResult = this.streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties);
                    return ingestionResult;
                }
                catch (Exception e) {
                    if (e instanceof IngestionServiceException && e.getCause() != null && e.getCause() instanceof DataServiceException && e.getCause().getCause() != null && e.getCause().getCause() instanceof DataWebException) {
                        DataWebException webException = (DataWebException)e.getCause().getCause();
                        try {
                            OneApiError oneApiError = webException.getApiError();
                            if (oneApiError.isPermanent()) {
                                log.error("Error is permanent, stopping.");
                                throw e;
                            }
                        }
                        catch (JSONException je) {
                            log.info("Failed to parse json in exception, continuing.", (Throwable)je);
                        }
                    }
                    log.info("Streaming ingestion failed, trying again", (Throwable)e);
                    try {
                        streamSourceInfo.getStream().reset();
                        continue;
                    }
                    catch (IOException ioException) {
                        throw new IngestionClientException("Ingestion failed transiently but the stream isn't resettable therefore ingestion wasn't retried", ioException);
                    }
                }
            }
            IngestionResult ingestionResult = this.queuedIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties);
            return ingestionResult;
        }
        finally {
            try {
                streamSourceInfo.getStream().close();
            }
            catch (IOException e) {
                log.warn("Failed to close stream", (Throwable)e);
            }
        }
    }

    @Override
    public void close() {
        this.queuedIngestClient.close();
        this.streamingIngestClient.close();
    }
}

