/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.api.resources;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import org.apache.helix.model.IdealState;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Api(tags={"Table"}, authorizations={@Authorization(value="oauth")})
@SwaggerDefinition(securityDefinition=@SecurityDefinition(apiKeyAuthDefinitions={@ApiKeyAuthDefinition(name="Authorization", in=ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key="oauth")}))
@Path(value="/")
public class PinotRealtimeTableResource {
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeTableResource.class);
    @Inject
    ControllerConf _controllerConf;
    @Inject
    Executor _executor;
    @Inject
    HttpClientConnectionManager _connectionManager;
    @Inject
    PinotHelixResourceManager _pinotHelixResourceManager;
    @Inject
    PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;

    @POST
    @Path(value="/tables/{tableName}/pauseConsumption")
    @Authorize(targetType=TargetType.TABLE, paramName="tableName", action="PauseConsumption")
    @Produces(value={"application/json"})
    @ApiOperation(value="Pause consumption of a realtime table", notes="Pause the consumption of a realtime table")
    public Response pauseConsumption(@ApiParam(value="Name of the table", required=true) @PathParam(value="tableName") String tableName) {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
        this.validate(tableNameWithType);
        try {
            return Response.ok((Object)this._pinotLLCRealtimeSegmentManager.pauseConsumption(tableNameWithType)).build();
        }
        catch (Exception e) {
            throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, (Throwable)e);
        }
    }

    @POST
    @Path(value="/tables/{tableName}/resumeConsumption")
    @Authorize(targetType=TargetType.TABLE, paramName="tableName", action="ResumeConsumption")
    @Produces(value={"application/json"})
    @ApiOperation(value="Resume consumption of a realtime table", notes="Resume the consumption for a realtime table. ConsumeFrom parameter indicates from which offsets consumption should resume. If consumeFrom parameter is not provided, consumption continues based on the offsets in segment ZK metadata, and in case the offsets are already gone, the first available offsets are picked to minimize the data loss.")
    public Response resumeConsumption(@ApiParam(value="Name of the table", required=true) @PathParam(value="tableName") String tableName, @ApiParam(value="smallest | largest") @QueryParam(value="consumeFrom") String consumeFrom) {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
        this.validate(tableNameWithType);
        if (consumeFrom != null && !consumeFrom.equalsIgnoreCase("smallest") && !consumeFrom.equalsIgnoreCase("largest")) {
            throw new ControllerApplicationException(LOGGER, String.format("consumeFrom param '%s' is not valid.", consumeFrom), Response.Status.BAD_REQUEST);
        }
        try {
            return Response.ok((Object)this._pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType, consumeFrom)).build();
        }
        catch (Exception e) {
            throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, (Throwable)e);
        }
    }

    @POST
    @Path(value="/tables/{tableName}/forceCommit")
    @Authorize(targetType=TargetType.TABLE, paramName="tableName", action="ForceCommit")
    @Produces(value={"application/json"})
    @ApiOperation(value="Force commit the current consuming segments", notes="Force commit the current segments in consuming state and restart consumption. This should be used after schema/table config changes. Please note that this is an asynchronous operation, and 200 response does not mean it has actually been done already")
    public Map<String, String> forceCommit(@ApiParam(value="Name of the table", required=true) @PathParam(value="tableName") String tableName) throws JsonProcessingException {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
        this.validate(tableNameWithType);
        HashMap<String, String> response = new HashMap<String, String>();
        try {
            Set<String> consumingSegmentsForceCommitted = this._pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
            response.put("forceCommitStatus", "SUCCESS");
            try {
                String jobId = UUID.randomUUID().toString();
                this._pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, jobId, consumingSegmentsForceCommitted);
                response.put("jobMetaZKWriteStatus", "SUCCESS");
                response.put("forceCommitJobId", jobId);
            }
            catch (Exception e) {
                response.put("jobMetaZKWriteStatus", "FAILED");
                LOGGER.error("Could not add force commit job metadata to ZK table : {}", (Object)tableNameWithType, (Object)e);
            }
        }
        catch (Exception e) {
            throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, (Throwable)e);
        }
        return response;
    }

    @GET
    @Path(value="/tables/forceCommitStatus/{jobId}")
    @Authorize(targetType=TargetType.CLUSTER, action="GetForceCommitStatus")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get status for a submitted force commit operation", notes="Get status for a submitted force commit operation")
    public JsonNode getForceCommitJobStatus(@ApiParam(value="Force commit job id", required=true) @PathParam(value="jobId") String forceCommitJobId) throws Exception {
        Map<String, String> controllerJobZKMetadata = this._pinotHelixResourceManager.getControllerJobZKMetadata(forceCommitJobId, ControllerJobType.FORCE_COMMIT);
        if (controllerJobZKMetadata == null) {
            throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + forceCommitJobId, Response.Status.NOT_FOUND);
        }
        String tableNameWithType = controllerJobZKMetadata.get("tableName");
        Set consumingSegmentCommitted = (Set)JsonUtils.stringToObject((String)controllerJobZKMetadata.get("segmentsForceCommitted"), Set.class);
        Set<String> onlineSegmentsForTable = this._pinotHelixResourceManager.getOnlineSegmentsFromIdealState(tableNameWithType, false);
        HashSet segmentsYetToBeCommitted = new HashSet();
        consumingSegmentCommitted.forEach(segmentName -> {
            if (!onlineSegmentsForTable.contains(segmentName)) {
                segmentsYetToBeCommitted.add(segmentName);
            }
        });
        HashMap<String, String> result = new HashMap<String, String>(controllerJobZKMetadata);
        result.put("segmentsYetToBeCommitted", (String)((Object)segmentsYetToBeCommitted));
        result.put("numberOfSegmentsYetToBeCommitted", (String)((Object)Integer.valueOf(segmentsYetToBeCommitted.size())));
        return JsonUtils.objectToJsonNode(result);
    }

    @GET
    @Path(value="/tables/{tableName}/pauseStatus")
    @Authorize(targetType=TargetType.TABLE, paramName="tableName", action="GetPauseStatus")
    @Produces(value={"application/json"})
    @ApiOperation(value="Return pause status of a realtime table", notes="Return pause status of a realtime table along with list of consuming segments.")
    public Response getPauseStatus(@ApiParam(value="Name of the table", required=true) @PathParam(value="tableName") String tableName) {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
        this.validate(tableNameWithType);
        try {
            return Response.ok().entity((Object)this._pinotLLCRealtimeSegmentManager.getPauseStatus(tableNameWithType)).build();
        }
        catch (Exception e) {
            throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, (Throwable)e);
        }
    }

    @GET
    @Path(value="/tables/{tableName}/consumingSegmentsInfo")
    @Authorize(targetType=TargetType.TABLE, paramName="tableName", action="GetConsumingSegments")
    @Produces(value={"application/json"})
    @ApiOperation(value="Returns state of consuming segments", notes="Gets the status of consumers from all servers.Note that the partitionToOffsetMap has been deprecated and will be removed in the next release. The info is now embedded within each partition's state as currentOffsetsMap.")
    @ApiResponses(value={@ApiResponse(code=200, message="Success"), @ApiResponse(code=404, message="Table not found"), @ApiResponse(code=500, message="Internal server error")})
    public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsInfo(@ApiParam(value="Realtime table name with or without type", required=true, example="myTable | myTable_REALTIME") @PathParam(value="tableName") String realtimeTableName) {
        try {
            TableType tableType = TableNameBuilder.getTableTypeFromTableName((String)realtimeTableName);
            if (TableType.OFFLINE == tableType) {
                throw new IllegalStateException("Cannot get consuming segments info for OFFLINE table: " + realtimeTableName);
            }
            String tableNameWithType = TableNameBuilder.forType((TableType)TableType.REALTIME).tableNameWithType(realtimeTableName);
            ConsumingSegmentInfoReader consumingSegmentInfoReader = new ConsumingSegmentInfoReader(this._executor, this._connectionManager, this._pinotHelixResourceManager);
            return consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, this._controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
        }
        catch (Exception e) {
            throw new ControllerApplicationException(LOGGER, String.format("Failed to get consuming segments info for table %s. %s", realtimeTableName, e.getMessage()), Response.Status.INTERNAL_SERVER_ERROR, (Throwable)e);
        }
    }

    private void validate(String tableNameWithType) {
        IdealState idealState = this._pinotHelixResourceManager.getTableIdealState(tableNameWithType);
        if (idealState == null) {
            throw new ControllerApplicationException(LOGGER, String.format("Table %s not found!", tableNameWithType), Response.Status.NOT_FOUND);
        }
        if (!idealState.isEnabled()) {
            throw new ControllerApplicationException(LOGGER, String.format("Table %s is disabled!", tableNameWithType), Response.Status.BAD_REQUEST);
        }
    }
}

