package org.apache.pinot.controller.api.resources;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path("/")
/* loaded from: input_file:org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.class */
public class LLCSegmentCompletionHandlers {
    private static final Logger LOGGER = LoggerFactory.getLogger(LLCSegmentCompletionHandlers.class);
    private static final Object SEGMENT_UPLOAD_LOCK = new Object();
    private static final String SCHEME = "file://";

    @Inject
    SegmentCompletionManager _segmentCompletionManager;

    @VisibleForTesting
    public static String getScheme() {
        return SCHEME;
    }

    @GET
    @Path("extendBuildTime")
    @Produces({"application/json"})
    @Authorize(targetType = TargetType.CLUSTER, action = "GetAdminInfo")
    public String extendBuildTime(@QueryParam("instance") String str, @QueryParam("name") String str2, @QueryParam("offset") long j, @QueryParam("streamPartitionMsgOffset") String str3, @QueryParam("extraTimeSec") int i) {
        if (str == null || str2 == null || (j == -1 && str3 == null)) {
            LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, streamPartitionMsgOffset={}", new Object[]{Long.valueOf(j), str2, str, str3});
            return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
        }
        if (i <= 0) {
            LOGGER.warn("Invalid value {} for extra build time from instance {} for segment {}", new Object[]{Integer.valueOf(i), str, str2});
            i = SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds();
        }
        SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
        params.withInstanceId(str).withSegmentName(str2).withExtraTimeSec(i);
        extractOffsetFromParams(params, str3, j);
        LOGGER.info("Processing extendBuildTime:{}", params.toString());
        String jsonString = this._segmentCompletionManager.extendBuildTime(params).toJsonString();
        LOGGER.info("Response to extendBuildTime:{}", jsonString);
        return jsonString;
    }

    private void extractOffsetFromParams(SegmentCompletionProtocol.Request.Params params, String str, long j) {
        if (str != null) {
            params.withStreamPartitionMsgOffset(str);
        } else {
            params.withStreamPartitionMsgOffset(Long.toString(j));
        }
    }

    @GET
    @Path("segmentConsumed")
    @Produces({"application/json"})
    @Authorize(targetType = TargetType.CLUSTER, action = "GetAdminInfo")
    public String segmentConsumed(@QueryParam("instance") String str, @QueryParam("name") String str2, @QueryParam("offset") long j, @QueryParam("streamPartitionMsgOffset") String str3, @QueryParam("reason") String str4, @QueryParam("memoryUsedBytes") long j2, @QueryParam("rowCount") int i) {
        if (str == null || str2 == null || (j == -1 && str3 == null)) {
            LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, streamPartitionMsgOffset={}", new Object[]{Long.valueOf(j), str2, str, str3});
            return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
        }
        SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
        params.withInstanceId(str).withSegmentName(str2).withReason(str4).withMemoryUsedBytes(j2).withNumRows(i);
        extractOffsetFromParams(params, str3, j);
        LOGGER.info("Processing segmentConsumed:{}", params.toString());
        String jsonString = this._segmentCompletionManager.segmentConsumed(params).toJsonString();
        LOGGER.info("Response to segmentConsumed for segment:{} is :{}", str2, jsonString);
        return jsonString;
    }

    @GET
    @Path("segmentStoppedConsuming")
    @Produces({"application/json"})
    @Authorize(targetType = TargetType.CLUSTER, action = "GetAdminInfo")
    public String segmentStoppedConsuming(@QueryParam("instance") String str, @QueryParam("name") String str2, @QueryParam("offset") long j, @QueryParam("streamPartitionMsgOffset") String str3, @QueryParam("reason") String str4) {
        if (str == null || str2 == null || (j == -1 && str3 == null)) {
            LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, streamPartitionMsgOffset={}", new Object[]{Long.valueOf(j), str2, str, str3});
            return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
        }
        SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
        params.withInstanceId(str).withSegmentName(str2).withReason(str4);
        extractOffsetFromParams(params, str3, j);
        LOGGER.info("Processing segmentStoppedConsuming:{}", params.toString());
        String jsonString = this._segmentCompletionManager.segmentStoppedConsuming(params).toJsonString();
        LOGGER.info("Response to segmentStoppedConsuming for segment:{} is:{}", str2, jsonString);
        return jsonString;
    }

    @GET
    @Path("segmentCommitStart")
    @Produces({"application/json"})
    @Authorize(targetType = TargetType.CLUSTER, action = "GetAdminInfo")
    public String segmentCommitStart(@QueryParam("instance") String str, @QueryParam("name") String str2, @QueryParam("offset") long j, @QueryParam("streamPartitionMsgOffset") String str3, @QueryParam("memoryUsedBytes") long j2, @QueryParam("buildTimeMillis") long j3, @QueryParam("waitTimeMillis") long j4, @QueryParam("rowCount") int i, @QueryParam("segmentSizeBytes") long j5) {
        if (str == null || str2 == null || (j == -1 && str3 == null)) {
            LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, streamPartitionMsgOffset={}", new Object[]{Long.valueOf(j), str2, str, str3});
            LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}", new Object[]{Long.valueOf(j), str2, str});
            return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
        }
        SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
        params.withInstanceId(str).withSegmentName(str2).withMemoryUsedBytes(j2).withBuildTimeMillis(j3).withWaitTimeMillis(j4).withNumRows(i).withSegmentSizeBytes(j5);
        extractOffsetFromParams(params, str3, j);
        LOGGER.info("Processing segmentCommitStart:{}", params.toString());
        String jsonString = this._segmentCompletionManager.segmentCommitStart(params).toJsonString();
        LOGGER.info("Response to segmentCommitStart for segment:{} is:{}", str2, jsonString);
        return jsonString;
    }

    @Path("segmentCommit")
    @Deprecated
    @POST
    @Authorize(targetType = TargetType.CLUSTER, action = "CommitSegment")
    @Authenticate(AccessType.CREATE)
    @Consumes({"multipart/form-data"})
    @Produces({"application/json"})
    public String segmentCommit(@QueryParam("instance") String str, @QueryParam("name") String str2, @QueryParam("offset") long j, @QueryParam("streamPartitionMsgOffset") String str3, @QueryParam("memoryUsedBytes") long j2, @QueryParam("buildTimeMillis") long j3, @QueryParam("waitTimeMillis") long j4, @QueryParam("segmentSizeBytes") long j5, @QueryParam("rowCount") int i, FormDataMultiPart formDataMultiPart) {
        SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
        params.withInstanceId(str).withSegmentName(str2).withSegmentSizeBytes(j5).withBuildTimeMillis(j3).withWaitTimeMillis(j4).withNumRows(i).withMemoryUsedBytes(j2);
        extractOffsetFromParams(params, str3, j);
        LOGGER.info("Processing segmentCommit:{}", params.toString());
        SegmentCompletionManager segmentCompletionManager = this._segmentCompletionManager;
        SegmentCompletionProtocol.Response segmentCommitStart = segmentCompletionManager.segmentCommitStart(params);
        CommittingSegmentDescriptor fromSegmentCompletionReqParams = CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params);
        boolean z = false;
        if (segmentCommitStart.equals(SegmentCompletionProtocol.RESP_COMMIT_CONTINUE)) {
            try {
                try {
                    File extractSegmentFromFormToLocalTempFile = extractSegmentFromFormToLocalTempFile(formDataMultiPart, str2);
                    SegmentMetadataImpl extractMetadataFromLocalSegmentFile = extractMetadataFromLocalSegmentFile(extractSegmentFromFormToLocalTempFile);
                    URI uri = URIUtils.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(), new String[]{new LLCSegmentName(str2).getTableName(), URIUtils.encode(str2)});
                    PinotFS create = PinotFSFactory.create(uri.getScheme());
                    synchronized (SEGMENT_UPLOAD_LOCK) {
                        if (create.exists(uri)) {
                            LOGGER.warn("Segment file: {} already exists. Replacing it with segment: {} from instance: {}", new Object[]{uri, str2, str});
                            create.delete(uri, true);
                        }
                        create.copyFromLocalFile(extractSegmentFromFormToLocalTempFile, uri);
                    }
                    fromSegmentCompletionReqParams = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(params, extractMetadataFromLocalSegmentFile);
                    fromSegmentCompletionReqParams.setSegmentLocation(uri.toString());
                    z = true;
                    FileUtils.deleteQuietly(extractSegmentFromFormToLocalTempFile);
                } catch (Exception e) {
                    LOGGER.error("Caught exception while committing segment: {} from instance: {}", new Object[]{str2, str, e});
                    FileUtils.deleteQuietly((File) null);
                }
            } catch (Throwable th) {
                FileUtils.deleteQuietly((File) null);
                throw th;
            }
        }
        SegmentCompletionProtocol.Response segmentCommitEnd = segmentCompletionManager.segmentCommitEnd(params, z, false, fromSegmentCompletionReqParams);
        LOGGER.info("Response to segmentCommit: instance={}, segment={}, status={}, streamMsgOffset={}", new Object[]{params.getInstanceId(), params.getSegmentName(), segmentCommitEnd.getStatus(), segmentCommitEnd.getStreamPartitionMsgOffset()});
        return segmentCommitEnd.toJsonString();
    }

    @Path("segmentUpload")
    @POST
    @TrackInflightRequestMetrics
    @Authorize(targetType = TargetType.CLUSTER, action = "UploadSegment")
    @Authenticate(AccessType.CREATE)
    @TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
    @Consumes({"multipart/form-data"})
    @Produces({"application/json"})
    public String segmentUpload(@QueryParam("instance") String str, @QueryParam("name") String str2, @QueryParam("offset") long j, @QueryParam("streamPartitionMsgOffset") String str3, FormDataMultiPart formDataMultiPart) {
        SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
        params.withInstanceId(str).withSegmentName(str2);
        extractOffsetFromParams(params, str3, j);
        LOGGER.info("Processing segmentUpload:{}", params.toString());
        File file = null;
        try {
            try {
                file = extractSegmentFromFormToLocalTempFile(formDataMultiPart, str2);
                URI uri = URIUtils.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(), new String[]{new LLCSegmentName(str2).getTableName(), URIUtils.encode(SegmentCompletionUtils.generateTmpSegmentFileName(str2))});
                PinotFSFactory.create(uri.getScheme()).copyFromLocalFile(file, uri);
                String jsonString = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(params.getStreamPartitionMsgOffset()).withSegmentLocation(uri.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS)).toJsonString();
                LOGGER.info("Response to segmentUpload for segment:{} is:{}", str2, jsonString);
                FileUtils.deleteQuietly(file);
                return jsonString;
            } catch (Exception e) {
                LOGGER.error("Caught exception while uploading segment: {} from instance: {}", new Object[]{str2, str, e});
                String jsonString2 = SegmentCompletionProtocol.RESP_FAILED.toJsonString();
                FileUtils.deleteQuietly(file);
                return jsonString2;
            }
        } catch (Throwable th) {
            FileUtils.deleteQuietly(file);
            throw th;
        }
    }

    @Path("segmentCommitEndWithMetadata")
    @Authenticate(AccessType.CREATE)
    @Consumes({"multipart/form-data"})
    @POST
    @Produces({"application/json"})
    @Authorize(targetType = TargetType.CLUSTER, action = "CommitSegment")
    public String segmentCommitEndWithMetadata(@QueryParam("instance") String str, @QueryParam("name") String str2, @QueryParam("location") String str3, @QueryParam("offset") long j, @QueryParam("streamPartitionMsgOffset") String str4, @QueryParam("memoryUsedBytes") long j2, @QueryParam("buildTimeMillis") long j3, @QueryParam("waitTimeMillis") long j4, @QueryParam("rowCount") int i, @QueryParam("segmentSizeBytes") long j5, @QueryParam("reason") String str5, FormDataMultiPart formDataMultiPart) {
        if (str == null || str2 == null || str3 == null || formDataMultiPart == null || (j == -1 && str4 == null)) {
            LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, segmentLocation={}, streamPartitionMsgOffset={}", new Object[]{Long.valueOf(j), str2, str, str3, str4});
            return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
        }
        SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
        params.withInstanceId(str).withSegmentName(str2).withSegmentLocation(str3).withSegmentSizeBytes(j5).withBuildTimeMillis(j3).withWaitTimeMillis(j4).withNumRows(i).withMemoryUsedBytes(j2).withReason(str5);
        extractOffsetFromParams(params, str4, j);
        LOGGER.info("Processing segmentCommitEndWithMetadata:{}", params.toString());
        try {
            String jsonString = this._segmentCompletionManager.segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(params, extractSegmentMetadataFromForm(formDataMultiPart, str2))).toJsonString();
            LOGGER.info("Response to segmentCommitEndWithMetadata for segment:{} is:{}", str2, jsonString);
            return jsonString;
        } catch (Exception e) {
            LOGGER.error("Caught exception while extracting metadata for segment: {} from instance: {}", new Object[]{str2, str, e});
            return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
        }
    }

    private static File extractSegmentFromFormToLocalTempFile(FormDataMultiPart formDataMultiPart, String str) throws IOException {
        try {
            Map fields = formDataMultiPart.getFields();
            Preconditions.checkState(PinotSegmentUploadDownloadRestletResource.validateMultiPart(fields, str), "Invalid multi-part for segment: %s", str);
            FormDataBodyPart formDataBodyPart = (FormDataBodyPart) ((List) fields.values().iterator().next()).get(0);
            File concatAndValidateFile = org.apache.pinot.common.utils.FileUtils.concatAndValidateFile(ControllerFilePathProvider.getInstance().getFileUploadTempDir(), getTempSegmentFileName(str), "Invalid segment name: %s", new Object[]{str});
            try {
                InputStream inputStream = (InputStream) formDataBodyPart.getValueAs(InputStream.class);
                try {
                    Files.copy(inputStream, concatAndValidateFile.toPath(), new CopyOption[0]);
                    if (inputStream != null) {
                        inputStream.close();
                    }
                    return concatAndValidateFile;
                } catch (Throwable th) {
                    if (inputStream != null) {
                        try {
                            inputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Exception e) {
                FileUtils.deleteQuietly(concatAndValidateFile);
                throw e;
            }
        } finally {
            formDataMultiPart.cleanup();
        }
    }

    private static SegmentMetadataImpl extractMetadataFromLocalSegmentFile(File file) throws Exception {
        File concatAndValidateFile = org.apache.pinot.common.utils.FileUtils.concatAndValidateFile(ControllerFilePathProvider.getInstance().getUntarredFileTempDir(), file.getName(), "Invalid segment file: %s", new Object[]{file});
        try {
            FileUtils.forceMkdir(concatAndValidateFile);
            TarGzCompressionUtils.untarOneFile(file, "metadata.properties", new File(concatAndValidateFile, "metadata.properties"));
            TarGzCompressionUtils.untarOneFile(file, "creation.meta", new File(concatAndValidateFile, "creation.meta"));
            return new SegmentMetadataImpl(concatAndValidateFile);
        } finally {
            FileUtils.deleteQuietly(concatAndValidateFile);
        }
    }

    private static SegmentMetadataImpl extractSegmentMetadataFromForm(FormDataMultiPart formDataMultiPart, String str) throws IOException, ConfigurationException {
        File concatAndValidateFile = org.apache.pinot.common.utils.FileUtils.concatAndValidateFile(ControllerFilePathProvider.getInstance().getUntarredFileTempDir(), getTempSegmentFileName(str), "Invalid segment name: %s", new Object[]{str});
        try {
            FileUtils.forceMkdir(concatAndValidateFile);
            extractFileFromForm(formDataMultiPart, "metadata.properties", concatAndValidateFile);
            extractFileFromForm(formDataMultiPart, "creation.meta", concatAndValidateFile);
            SegmentMetadataImpl segmentMetadataImpl = new SegmentMetadataImpl(concatAndValidateFile);
            FileUtils.deleteQuietly(concatAndValidateFile);
            return segmentMetadataImpl;
        } catch (Throwable th) {
            FileUtils.deleteQuietly(concatAndValidateFile);
            throw th;
        }
    }

    private static void extractFileFromForm(FormDataMultiPart formDataMultiPart, String str, File file) throws IOException {
        FormDataBodyPart field = formDataMultiPart.getField(str);
        Preconditions.checkState(field != null, "Failed to find: %s", str);
        InputStream inputStream = (InputStream) field.getValueAs(InputStream.class);
        try {
            Files.copy(inputStream, new File(file, str).toPath(), new CopyOption[0]);
            if (inputStream != null) {
                inputStream.close();
            }
        } catch (Throwable th) {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static String getTempSegmentFileName(String str) {
        return str + "." + UUID.randomUUID();
    }
}
