package org.apache.kylin.rest.controller;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.exception.ForbiddenException;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.exception.NotFoundException;
import org.apache.kylin.rest.request.StreamingRequest;
import org.apache.kylin.rest.request.StreamingRequestV2;
import org.apache.kylin.rest.response.ResponseCode;
import org.apache.kylin.rest.service.CubeService;
import org.apache.kylin.rest.service.StreamingV2Service;
import org.apache.kylin.rest.service.TableService;
import org.apache.kylin.stream.core.model.CubeAssignment;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.stats.ClusterState;
import org.apache.kylin.stream.core.model.stats.CubeRealTimeState;
import org.apache.kylin.stream.core.model.stats.ReceiverStats;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.stream.core.source.StreamingSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

@RequestMapping({"/streaming_v2"})
@Controller
/* loaded from: input_file:org/apache/kylin/rest/controller/StreamingV2Controller.class */
public class StreamingV2Controller extends BasicController {

    @Autowired
    private StreamingV2Service streamingService;

    @Autowired
    private CubeService cubeMgmtService;

    @Autowired
    @Qualifier("tableService")
    private TableService tableService;
    private static final Logger logger = LoggerFactory.getLogger(StreamingV2Controller.class);
    private static Map<String, Set<String>> COMPATIBLE_MAP = Maps.newHashMap();

    @RequestMapping(value = {"/getConfig"}, method = {RequestMethod.GET})
    @ResponseBody
    public List<StreamingSourceConfig> getStreamings(@RequestParam(value = "table", required = false) String str, @RequestParam(value = "limit", required = false) Integer num, @RequestParam(value = "offset", required = false) Integer num2) {
        try {
            return this.streamingService.getStreamingConfigs(str, num, num2);
        } catch (IOException e) {
            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
        }
    }

    @RequestMapping(value = {""}, method = {RequestMethod.POST})
    @ResponseBody
    public StreamingRequestV2 saveStreamingConfig(@RequestBody StreamingRequestV2 streamingRequestV2) {
        String project = streamingRequestV2.getProject();
        TableDesc deserializeTableDesc = deserializeTableDesc(streamingRequestV2);
        StreamingSourceConfig deserializeStreamingConfig = deserializeStreamingConfig(streamingRequestV2.getStreamingConfig());
        validateInput(deserializeTableDesc, deserializeStreamingConfig);
        logger.info("{} try to saveStreamingConfig with table Identity {}", SecurityContextHolder.getContext().getAuthentication().getName(), deserializeTableDesc.getIdentity());
        boolean z = false;
        boolean z2 = false;
        ProjectInstance project2 = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project);
        try {
            try {
                deserializeTableDesc.setUuid(UUID.randomUUID().toString());
                this.tableService.loadTableToProject(deserializeTableDesc, null, project);
                z2 = true;
                try {
                    deserializeStreamingConfig.setName(deserializeTableDesc.getIdentity());
                    deserializeStreamingConfig.setUuid(UUID.randomUUID().toString());
                    this.streamingService.createStreamingConfig(deserializeStreamingConfig, project2);
                    z = true;
                    if (1 == 0 || 1 == 0) {
                        if (1 != 0) {
                            try {
                                this.tableService.unloadHiveTable(deserializeTableDesc.getIdentity(), project);
                            } catch (IOException e) {
                                throw new InternalErrorException("Action failed and failed to rollback the create table " + e.getLocalizedMessage(), e);
                            }
                        }
                        if (1 != 0) {
                            try {
                                this.streamingService.dropStreamingConfig(deserializeStreamingConfig);
                            } catch (IOException e2) {
                                throw new InternalErrorException("Action failed and failed to rollback the created streaming config: " + e2.getLocalizedMessage(), e2);
                            }
                        }
                    }
                    streamingRequestV2.setSuccessful(true);
                    return streamingRequestV2;
                } catch (IOException e3) {
                    logger.error("Failed to save StreamingSourceConfig:" + e3.getLocalizedMessage(), e3);
                    throw new InternalErrorException("Failed to save StreamingSourceConfig: " + e3.getLocalizedMessage());
                }
            } catch (Throwable th) {
                if (!z2 || !z) {
                    if (z2) {
                        try {
                            this.tableService.unloadHiveTable(deserializeTableDesc.getIdentity(), project);
                        } catch (IOException e4) {
                            throw new InternalErrorException("Action failed and failed to rollback the create table " + e4.getLocalizedMessage(), e4);
                        }
                    }
                    if (z) {
                        try {
                            this.streamingService.dropStreamingConfig(deserializeStreamingConfig);
                        } catch (IOException e5) {
                            throw new InternalErrorException("Action failed and failed to rollback the created streaming config: " + e5.getLocalizedMessage(), e5);
                        }
                    }
                }
                throw th;
            }
        } catch (IOException e6) {
            throw new BadRequestException("Failed to add streaming table.");
        }
    }

    private void validateInput(TableDesc tableDesc, StreamingSourceConfig streamingSourceConfig) {
        if (StringUtils.isEmpty(tableDesc.getIdentity()) || StringUtils.isEmpty(streamingSourceConfig.getName())) {
            logger.error("streamingSourceConfig name should not be empty.");
            throw new BadRequestException("streamingSourceConfig name should not be empty.");
        }
        if (tableDesc.getSourceType() == 21) {
            try {
                List<FieldSchema> fields = new HiveMetaStoreClient(new HiveConf()).getFields(tableDesc.getDatabase(), tableDesc.getName());
                HashMap newHashMap = Maps.newHashMap();
                for (FieldSchema fieldSchema : fields) {
                    newHashMap.put(fieldSchema.getName().toUpperCase(Locale.ROOT), fieldSchema);
                }
                ArrayList newArrayList = Lists.newArrayList();
                for (ColumnDesc columnDesc : tableDesc.getColumns()) {
                    FieldSchema fieldSchema2 = (FieldSchema) newHashMap.get(columnDesc.getName().toUpperCase(Locale.ROOT));
                    if (fieldSchema2 == null) {
                        newArrayList.add("column not exist in hive table:" + columnDesc.getName());
                    } else if (!checkHiveTableFieldCompatible(fieldSchema2, columnDesc)) {
                        newArrayList.add(String.format(Locale.ROOT, "column:%s defined in hive type:%s is incompatible with the column definition:%s", columnDesc.getName(), fieldSchema2.getType(), columnDesc.getDatatype()));
                    }
                }
                if (newArrayList.size() > 0) {
                    logger.info("incompatible for hive and input table schema:{}", newArrayList);
                    throw new BadRequestException("incompatible for hive schema and input table schema:" + newArrayList);
                }
            } catch (NoSuchObjectException e) {
                logger.info("table not exist in hive meta store for table:" + tableDesc.getIdentity(), e);
                throw new BadRequestException("table doesn't exist in hive meta store for table:" + tableDesc.getIdentity(), ResponseCode.CODE_UNDEFINED, e);
            } catch (Exception e2) {
                logger.error("error when get metadata from hive meta store for table:" + tableDesc.getIdentity(), e2);
                throw new BadRequestException("error when connect hive meta store", ResponseCode.CODE_UNDEFINED, e2);
            }
        }
    }

    private boolean checkHiveTableFieldCompatible(FieldSchema fieldSchema, ColumnDesc columnDesc) {
        DataType type = DataType.getType(columnDesc.getDatatype());
        String datatype = type == null ? columnDesc.getDatatype() : type.toString();
        if (fieldSchema.getType().equals(datatype)) {
            return true;
        }
        Set<String> set = COMPATIBLE_MAP.get(fieldSchema.getType());
        return set != null && set.contains(datatype);
    }

    @RequestMapping(value = {"/updateConfig"}, method = {RequestMethod.PUT})
    @ResponseBody
    public StreamingRequest updateStreamingConfig(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
        StreamingSourceConfig deserializeStreamingConfig = deserializeStreamingConfig(streamingRequest.getStreamingConfig());
        if (deserializeStreamingConfig == null) {
            return streamingRequest;
        }
        logger.info("{} try to updateStreamingConfig.", SecurityContextHolder.getContext().getAuthentication().getName());
        try {
            this.streamingService.updateStreamingConfig(deserializeStreamingConfig);
            streamingRequest.setSuccessful(true);
            return streamingRequest;
        } catch (Exception e) {
            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
        } catch (AccessDeniedException e2) {
            throw new ForbiddenException("You don't have right to update this StreamingSourceConfig.");
        }
    }

    @RequestMapping(value = {"/{configName}"}, method = {RequestMethod.DELETE})
    @ResponseBody
    public void deleteConfig(@PathVariable String str) throws IOException {
        StreamingSourceConfig config = this.streamingService.getStreamingManagerV2().getConfig(str);
        if (null == config) {
            throw new NotFoundException("StreamingSourceConfig with name " + str + " not found..");
        }
        logger.info("{} try to delete config: {}", SecurityContextHolder.getContext().getAuthentication().getName(), str);
        try {
            this.streamingService.dropStreamingConfig(config);
        } catch (Exception e) {
            logger.error(e.getLocalizedMessage(), e);
            throw new InternalErrorException("Failed to delete StreamingSourceConfig.  Caused by: " + e.getMessage(), e);
        }
    }

    @RequestMapping(value = {"/parserTemplate"}, method = {RequestMethod.GET})
    @ResponseBody
    public String getParserTemplate(@RequestParam("sourceType") int i, @RequestParam("streamingConfig") String str) {
        return this.streamingService.getParserTemplate(i, deserializeStreamingConfig(str));
    }

    @RequestMapping(value = {"/cubeAssignments"}, method = {RequestMethod.GET})
    @ResponseBody
    public List<CubeAssignment> getCubeAssignments(@RequestParam(value = "cube", required = false) String str) {
        CubeInstance cubeInstance = null;
        if (str != null) {
            cubeInstance = this.cubeMgmtService.getCubeManager().getCube(str);
        }
        return this.streamingService.getStreamingCubeAssignments(cubeInstance);
    }

    @RequestMapping(value = {"/rsAssignments"}, method = {RequestMethod.GET})
    @ResponseBody
    public Map<Integer, Map<String, List<Partition>>> getReplicaSetAssignments(@RequestParam(value = "replicaSetID", required = false) Integer num) {
        return this.streamingService.getStreamingReplicaSetAssignments(num);
    }

    @RequestMapping(value = {"/balance/recommend"}, method = {RequestMethod.GET})
    @ResponseBody
    public Map<Integer, Map<String, List<Partition>>> reBalanceRecommend() {
        return this.streamingService.reBalancePlan();
    }

    @RequestMapping(value = {"/balance"}, method = {RequestMethod.POST})
    @ResponseBody
    public void reBalance(@RequestBody String str) {
        logger.info("{} try to do reBalance.", SecurityContextHolder.getContext().getAuthentication().getName());
        this.streamingService.reBalance(deserializeRebalancePlan(str));
    }

    private Map<Integer, Map<String, List<Partition>>> deserializeRebalancePlan(String str) {
        try {
            return (Map) new ObjectMapper().readValue(str, new TypeReference<Map<Integer, Map<String, List<Partition>>>>() { // from class: org.apache.kylin.rest.controller.StreamingV2Controller.1
            });
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @RequestMapping(value = {"/cubeAssignments/{cubeName}"}, method = {RequestMethod.DELETE})
    @ResponseBody
    public void removeCubeAssignment(@PathVariable String str) {
        logger.info("{} try to remove CubeAssignment {}", SecurityContextHolder.getContext().getAuthentication().getName(), str);
        this.streamingService.removeCubeAssignment();
    }

    @RequestMapping(value = {"/cubes"}, method = {RequestMethod.GET})
    @ResponseBody
    public List<String> getStreamingCubes() {
        return this.streamingService.getStreamingCubes();
    }

    @RequestMapping(value = {"/cubes/{cubeName}/consumeState"}, method = {RequestMethod.GET})
    @ResponseBody
    public String getStreamingCubeConsumeState(@PathVariable String str) {
        return this.streamingService.getStreamingCubeConsumeState(str).toString();
    }

    @RequestMapping(value = {"/cubes/{cubeName}/assign"}, method = {RequestMethod.PUT})
    @ResponseBody
    public void assignStreamingCube(@PathVariable String str) {
        logger.info("{} try to assign cube {}", SecurityContextHolder.getContext().getAuthentication().getName(), str);
        this.streamingService.assignCube(this.cubeMgmtService.getCubeManager().getCube(str));
    }

    @RequestMapping(value = {"/cubes/{cubeName}/unAssign"}, method = {RequestMethod.PUT})
    @ResponseBody
    public void unAssignStreamingCube(@PathVariable String str) {
        logger.info("{} try to unAssign cube {}", SecurityContextHolder.getContext().getAuthentication().getName(), str);
        this.streamingService.unAssignCube(this.cubeMgmtService.getCubeManager().getCube(str));
    }

    @RequestMapping(value = {"/cubes/{cubeName}/reAssign"}, method = {RequestMethod.POST})
    @ResponseBody
    public void reAssignStreamingCube(@PathVariable String str, @RequestBody CubeAssignment cubeAssignment) {
        logger.info("{} try to reAssign cube {}", SecurityContextHolder.getContext().getAuthentication().getName(), str);
        this.streamingService.reAssignCube(str, cubeAssignment);
    }

    @RequestMapping(value = {"/receivers"}, method = {RequestMethod.GET})
    @ResponseBody
    public List<Node> getStreamingReceivers() {
        return this.streamingService.getReceivers();
    }

    @RequestMapping(value = {"/receivers/{receiverID:.+}"}, method = {RequestMethod.DELETE})
    @ResponseBody
    public void removeStreamingReceiver(@PathVariable String str) {
        this.streamingService.removeReceiver(Node.fromNormalizeString(str));
    }

    @RequestMapping(value = {"/replicaSet"}, method = {RequestMethod.POST})
    @ResponseBody
    public void createReplicaSet(@RequestBody ReplicaSet replicaSet) {
        logger.info("{} try to create ReplicaSet {}", SecurityContextHolder.getContext().getAuthentication().getName(), Integer.valueOf(replicaSet.getReplicaSetID()));
        this.streamingService.createReplicaSet(replicaSet);
    }

    @RequestMapping(value = {"/replicaSet/{replicaSetID}"}, method = {RequestMethod.DELETE})
    @ResponseBody
    public void removeReplicaSet(@PathVariable Integer num) {
        logger.info("{} try to remove ReplicaSet {}", SecurityContextHolder.getContext().getAuthentication().getName(), num);
        this.streamingService.removeReplicaSet(num.intValue());
    }

    @RequestMapping(value = {"/replicaSets"}, method = {RequestMethod.GET})
    @ResponseBody
    public List<ReplicaSet> getReplicaSets() {
        return this.streamingService.getReplicaSets();
    }

    @RequestMapping(value = {"/replicaSet/{replicaSetID}/{nodeID:.+}"}, method = {RequestMethod.PUT})
    @ResponseBody
    public void addNodeToReplicaSet(@PathVariable Integer num, @PathVariable String str) {
        logger.info("{} try to add Node {} To ReplicaSet {}", new Object[]{SecurityContextHolder.getContext().getAuthentication().getName(), str, num});
        this.streamingService.addNodeToReplicaSet(num, str);
    }

    @RequestMapping(value = {"/replicaSet/{replicaSetID}/{nodeID:.+}"}, method = {RequestMethod.DELETE})
    @ResponseBody
    public void removeNodeFromReplicaSet(@PathVariable Integer num, @PathVariable String str) {
        logger.info("{} try to remove Node {} from ReplicaSet {}", new Object[]{SecurityContextHolder.getContext().getAuthentication().getName(), str, num});
        this.streamingService.removeNodeFromReplicaSet(num, str);
    }

    @RequestMapping(value = {"/cubes/{cubeName}/suspendConsume"}, method = {RequestMethod.PUT})
    @ResponseBody
    public void pauseCubeConsume(@PathVariable String str) {
        logger.info("{} try to pause Consumers for cube {}", SecurityContextHolder.getContext().getAuthentication().getName(), str);
        this.streamingService.pauseConsumers(this.cubeMgmtService.getCubeManager().getCube(str));
    }

    @RequestMapping(value = {"/cubes/{cubeName}/resumeConsume"}, method = {RequestMethod.PUT})
    @ResponseBody
    public void resumeCubeConsume(@PathVariable String str) {
        logger.info("{} try to resume Consumers for cube {}", SecurityContextHolder.getContext().getAuthentication().getName(), str);
        this.streamingService.resumeConsumers(this.cubeMgmtService.getCubeManager().getCube(str));
    }

    @RequestMapping(value = {"/cubes/{cubeName}/stats"}, method = {RequestMethod.GET})
    @ResponseBody
    public CubeRealTimeState getCubeRealTimeState(@PathVariable String str) {
        return this.streamingService.getCubeRealTimeState(this.cubeMgmtService.getCubeManager().getCube(str));
    }

    @RequestMapping(value = {"/receivers/{receiverID:.+}/stats"}, method = {RequestMethod.GET})
    @ResponseBody
    public ReceiverStats getReceiverStats(@PathVariable String str) {
        return this.streamingService.getReceiverStats(Node.fromNormalizeString(str));
    }

    @RequestMapping(value = {"/cluster/state"}, method = {RequestMethod.GET})
    @ResponseBody
    public ClusterState getClusterState() {
        return this.streamingService.getClusterState();
    }

    private TableDesc deserializeTableDesc(StreamingRequestV2 streamingRequestV2) {
        TableDesc tableDesc = null;
        try {
            logger.debug("Saving TableDesc " + streamingRequestV2.getTableData());
            tableDesc = (TableDesc) JsonUtil.readValue(streamingRequestV2.getTableData(), TableDesc.class);
        } catch (JsonParseException e) {
            logger.error("The TableDesc definition is invalid.", e);
            updateRequest(streamingRequestV2, false, e.getMessage());
        } catch (IOException e2) {
            logger.error("Failed to deal with the request.", e2);
            throw new InternalErrorException("Failed to deal with the request:" + e2.getMessage(), e2);
        } catch (JsonMappingException e3) {
            logger.error("The data TableDesc definition is invalid.", e3);
            updateRequest(streamingRequestV2, false, e3.getMessage());
        }
        String[] parseHiveTableName = HadoopUtil.parseHiveTableName(tableDesc.getName());
        tableDesc.setName(parseHiveTableName[1]);
        tableDesc.setDatabase(parseHiveTableName[0]);
        tableDesc.getIdentity();
        return tableDesc;
    }

    private StreamingSourceConfig deserializeStreamingConfig(String str) {
        try {
            logger.debug("Saving StreamingSourceConfig " + str);
            return (StreamingSourceConfig) JsonUtil.readValue(str, StreamingSourceConfig.class);
        } catch (Exception e) {
            logger.error("The StreamingSourceConfig definition is invalid.", e);
            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
        }
    }

    private void updateRequest(StreamingRequestV2 streamingRequestV2, boolean z, String str) {
        streamingRequestV2.setSuccessful(z);
        streamingRequestV2.setMessage(str);
    }

    public void setCubeService(CubeService cubeService) {
        this.cubeMgmtService = cubeService;
    }

    static {
        COMPATIBLE_MAP.put("float", Sets.newHashSet(new String[]{"double"}));
        COMPATIBLE_MAP.put("string", Sets.newHashSet(new String[]{"varchar", "char", "varchar(256)"}));
        COMPATIBLE_MAP.put("varchar", Sets.newHashSet(new String[]{"string", "char"}));
        COMPATIBLE_MAP.put("varchar(256)", Sets.newHashSet(new String[]{"string", "char", "varchar"}));
        COMPATIBLE_MAP.put("long", Sets.newHashSet(new String[]{"bigint", "int", "smallint", "integer"}));
        COMPATIBLE_MAP.put("bigint", Sets.newHashSet(new String[]{"long", "int", "smallint", "integer"}));
        COMPATIBLE_MAP.put("int", Sets.newHashSet(new String[]{"smallint", "integer"}));
    }
}
