package org.apache.drill.exec.server.rest.stream;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.util.DrillExceptionUtil;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
import org.apache.drill.exec.physical.resultSet.PushResultSetReader;
import org.apache.drill.exec.physical.resultSet.impl.PushResultSetReaderImpl;
import org.apache.drill.exec.physical.resultSet.util.JsonWriter;
import org.apache.drill.exec.physical.rowSet.RowSetReader;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.server.rest.BaseWebUserConnection;
import org.apache.drill.exec.server.rest.WebSessionResources;
import org.apache.drill.exec.vector.complex.fn.JsonOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/server/rest/stream/StreamingHttpConnection.class */
public class StreamingHttpConnection extends BaseWebUserConnection {
    private static final Logger logger = LoggerFactory.getLogger(StreamingHttpConnection.class);
    private final CountDownLatch startSignal;
    private UserBitShared.QueryId queryId;
    private int rowLimit;
    private int batchCount;
    private int rowCount;
    private OutputStream out;
    private JsonWriter writer;
    private PushResultSetReaderImpl.BatchHolder batchHolder;
    private PushResultSetReader reader;

    public StreamingHttpConnection(WebSessionResources webSessionResources) {
        super(webSessionResources);
        this.startSignal = new CountDownLatch(1);
    }

    public void onStart(UserBitShared.QueryId queryId, int i) {
        this.queryId = queryId;
        this.rowLimit = i;
    }

    public void outputAvailable(OutputStream outputStream) throws IOException {
        this.out = outputStream;
        this.writer = new JsonWriter(outputStream, false, false);
        this.startSignal.countDown();
    }

    @Override // org.apache.drill.exec.rpc.UserClientConnection
    public void sendData(RpcOutcomeListener<GeneralRPCProtos.Ack> rpcOutcomeListener, QueryDataPackage queryDataPackage) {
        VectorContainer batch = queryDataPackage.batch();
        try {
            try {
                if (this.batchCount == 0) {
                    this.batchHolder = new PushResultSetReaderImpl.BatchHolder(batch);
                    this.reader = new PushResultSetReaderImpl(this.batchHolder);
                    this.startSignal.await();
                }
                this.batchHolder.newBatch();
                emitBatch(this.reader.start());
                this.batchCount++;
                batch.zeroVectors();
                rpcOutcomeListener.success(Acks.OK, (ByteBuf) null);
            } catch (IOException e) {
                throw UserException.dataWriteError(e).addContext("Failed to send JSON results to the REST client").build(logger);
            } catch (InterruptedException e2) {
                throw new DrillRuntimeException("Interrupted", e2);
            }
        } catch (Throwable th) {
            batch.zeroVectors();
            rpcOutcomeListener.success(Acks.OK, (ByteBuf) null);
            throw th;
        }
    }

    public void emitBatch(RowSetReader rowSetReader) throws IOException {
        if (this.batchCount == 0) {
            emitHeader(rowSetReader.tupleSchema());
        }
        if (this.rowLimit == 0 || this.rowCount < this.rowLimit) {
            emitRows(rowSetReader);
        }
    }

    private void emitHeader(TupleMetadata tupleMetadata) throws IOException {
        startHeader();
        JsonOutput jsonOutput = this.writer.jsonOutput();
        jsonOutput.writeFieldName("columns");
        writeColNames(jsonOutput, tupleMetadata);
        writeNewline(jsonOutput);
        jsonOutput.writeFieldName("metadata");
        writeColTypes(jsonOutput, tupleMetadata);
        writeNewline(jsonOutput);
        jsonOutput.writeFieldName("attemptedAutoLimit");
        jsonOutput.writeInt(this.rowLimit);
        writeNewline(jsonOutput);
        jsonOutput.writeFieldName("rows");
        jsonOutput.writeStartArray();
        writeNewline(jsonOutput);
    }

    private void startHeader() throws IOException {
        JsonOutput jsonOutput = this.writer.jsonOutput();
        jsonOutput.writeStartObject();
        jsonOutput.writeFieldName("queryId");
        jsonOutput.writeVarChar(QueryIdHelper.getQueryId(this.queryId));
        writeNewline(jsonOutput);
    }

    public void writeNewline(JsonOutput jsonOutput) throws IOException {
        jsonOutput.flush();
        this.out.write(10);
    }

    private void writeColNames(JsonOutput jsonOutput, TupleMetadata tupleMetadata) throws IOException {
        jsonOutput.writeStartArray();
        Iterator it = tupleMetadata.iterator();
        while (it.hasNext()) {
            jsonOutput.writeVarChar(((ColumnMetadata) it.next()).name());
        }
        jsonOutput.writeEndArray();
    }

    private void writeColTypes(JsonOutput jsonOutput, TupleMetadata tupleMetadata) throws IOException {
        jsonOutput.writeStartArray();
        Iterator it = tupleMetadata.iterator();
        while (it.hasNext()) {
            jsonOutput.writeVarChar(webDataType(((ColumnMetadata) it.next()).majorType()));
        }
        jsonOutput.writeEndArray();
    }

    private void emitRows(RowSetReader rowSetReader) throws IOException {
        while (rowSetReader.next()) {
            this.writer.writeRow(rowSetReader);
            writeNewline(this.writer.jsonOutput());
            if (this.rowLimit > 0) {
                int i = this.rowCount + 1;
                this.rowCount = i;
                if (i >= this.rowLimit) {
                    return;
                }
            }
        }
    }

    public void finish() throws IOException {
        JsonOutput jsonOutput = this.writer.jsonOutput();
        if (this.batchCount == 0) {
            startHeader();
            if (getSession().getOptions().getBoolean(ExecConstants.ENABLE_REST_VERBOSE_ERRORS_KEY)) {
                emitErrorInfo();
            }
        } else {
            jsonOutput.writeEndArray();
            writeNewline(jsonOutput);
        }
        jsonOutput.writeFieldName("queryState");
        jsonOutput.writeVarChar(getQueryState());
        writeNewline(jsonOutput);
        jsonOutput.writeEndObject();
        writeNewline(jsonOutput);
    }

    private void emitErrorInfo() throws IOException {
        JsonOutput jsonOutput = this.writer.jsonOutput();
        Throwable throwable = DrillExceptionUtil.getThrowable(this.error.getException());
        if (throwable != null) {
            jsonOutput.writeFieldName("exception");
            jsonOutput.writeVarChar(throwable.getClass().getName());
            writeNewline(jsonOutput);
            jsonOutput.writeFieldName("errorMessage");
            jsonOutput.writeVarChar(throwable.getMessage());
            writeNewline(jsonOutput);
            jsonOutput.writeFieldName("stackTrace");
            jsonOutput.writeStartArray();
            for (String str : ExceptionUtils.getStackFrames(throwable)) {
                jsonOutput.writeVarChar(str);
            }
            jsonOutput.writeEndArray();
        } else {
            jsonOutput.writeFieldName("errorMessage");
            jsonOutput.writeVarChar(this.error.getMessage());
        }
        writeNewline(jsonOutput);
    }
}
