package org.apache.kylin.rest.controller2;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import java.io.IOException;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.rest.controller.BasicController;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.exception.ForbiddenException;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.msg.Message;
import org.apache.kylin.rest.msg.MsgPicker;
import org.apache.kylin.rest.request.StreamingRequest;
import org.apache.kylin.rest.response.EnvelopeResponse;
import org.apache.kylin.rest.response.ResponseCode;
import org.apache.kylin.rest.service.KafkaConfigService;
import org.apache.kylin.rest.service.StreamingService;
import org.apache.kylin.rest.service.TableService;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

@RequestMapping({"/streaming"})
@Controller
/* loaded from: input_file:org/apache/kylin/rest/controller2/StreamingControllerV2.class */
public class StreamingControllerV2 extends BasicController {
    private static final Logger logger = LoggerFactory.getLogger(StreamingControllerV2.class);

    @Autowired
    @Qualifier("streamingMgmtService")
    private StreamingService streamingService;

    @Autowired
    @Qualifier("kafkaMgmtService")
    private KafkaConfigService kafkaConfigService;

    @Autowired
    @Qualifier("tableService")
    private TableService tableService;

    @RequestMapping(value = {"/getConfig"}, method = {RequestMethod.GET}, produces = {"application/vnd.apache.kylin-v2+json"})
    @ResponseBody
    public EnvelopeResponse getStreamingsV2(@RequestParam(value = "table", required = false) String str, @RequestParam(value = "pageOffset", required = false, defaultValue = "0") Integer num, @RequestParam(value = "pageSize", required = false, defaultValue = "10") Integer num2) throws IOException {
        int intValue = num.intValue() * num2.intValue();
        return new EnvelopeResponse(ResponseCode.CODE_SUCCESS, this.streamingService.getStreamingConfigs(str, Integer.valueOf(num2.intValue()), Integer.valueOf(intValue)), "");
    }

    @RequestMapping(value = {"/getKfkConfig"}, method = {RequestMethod.GET}, produces = {"application/vnd.apache.kylin-v2+json"})
    @ResponseBody
    public EnvelopeResponse getKafkaConfigsV2(@RequestParam(value = "kafkaConfigName", required = false) String str, @RequestParam(value = "pageOffset", required = false, defaultValue = "0") Integer num, @RequestParam(value = "pageSize", required = false, defaultValue = "10") Integer num2) throws IOException {
        int intValue = num.intValue() * num2.intValue();
        return new EnvelopeResponse(ResponseCode.CODE_SUCCESS, this.kafkaConfigService.getKafkaConfigs(str, Integer.valueOf(num2.intValue()), Integer.valueOf(intValue)), "");
    }

    @RequestMapping(value = {""}, method = {RequestMethod.POST}, produces = {"application/vnd.apache.kylin-v2+json"})
    @ResponseBody
    public void saveStreamingConfigV2(@RequestBody StreamingRequest streamingRequest) throws IOException {
        Message msg = MsgPicker.getMsg();
        String project = streamingRequest.getProject();
        TableDesc deserializeTableDescV2 = deserializeTableDescV2(streamingRequest);
        if (null == deserializeTableDescV2) {
            throw new BadRequestException(msg.getINVALID_TABLE_DESC_DEFINITION());
        }
        StreamingConfig deserializeSchemalDescV2 = deserializeSchemalDescV2(streamingRequest);
        KafkaConfig deserializeKafkaSchemalDescV2 = deserializeKafkaSchemalDescV2(streamingRequest);
        boolean z = false;
        boolean z2 = false;
        try {
            this.tableService.addStreamingTable(deserializeTableDescV2, project);
            deserializeSchemalDescV2.setName(deserializeTableDescV2.getIdentity());
            deserializeKafkaSchemalDescV2.setName(deserializeTableDescV2.getIdentity());
            try {
                if (StringUtils.isEmpty(deserializeSchemalDescV2.getName())) {
                    logger.info("StreamingConfig should not be empty.");
                    throw new BadRequestException(msg.getEMPTY_STREAMING_CONFIG_NAME());
                }
                try {
                    deserializeSchemalDescV2.setUuid(UUID.randomUUID().toString());
                    this.streamingService.createStreamingConfig(deserializeSchemalDescV2);
                    z = true;
                    try {
                        deserializeKafkaSchemalDescV2.setUuid(UUID.randomUUID().toString());
                        this.kafkaConfigService.createKafkaConfig(deserializeKafkaSchemalDescV2);
                        z2 = true;
                        if (1 == 0 || 1 == 0) {
                            if (1 == 1) {
                                try {
                                    this.streamingService.dropStreamingConfig(this.streamingService.getStreamingManager().getStreamingConfig(deserializeSchemalDescV2.getName()));
                                } catch (IOException e) {
                                    throw new InternalErrorException(msg.getROLLBACK_STREAMING_CONFIG_FAIL());
                                }
                            }
                            if (1 == 1) {
                                try {
                                    this.kafkaConfigService.dropKafkaConfig(this.kafkaConfigService.getKafkaConfig(deserializeKafkaSchemalDescV2.getName()));
                                } catch (IOException e2) {
                                    throw new InternalErrorException(msg.getROLLBACK_KAFKA_CONFIG_FAIL());
                                }
                            }
                        }
                    } catch (IOException e3) {
                        try {
                            this.streamingService.dropStreamingConfig(deserializeSchemalDescV2);
                            logger.error("Failed to save KafkaConfig:" + e3.getLocalizedMessage(), e3);
                            throw new InternalErrorException(msg.getSAVE_KAFKA_CONFIG_FAIL());
                        } catch (IOException e4) {
                            throw new InternalErrorException(msg.getCREATE_KAFKA_CONFIG_FAIL());
                        }
                    }
                } catch (IOException e5) {
                    logger.error("Failed to save StreamingConfig:" + e5.getLocalizedMessage(), e5);
                    throw new InternalErrorException(msg.getSAVE_STREAMING_CONFIG_FAIL());
                }
            } catch (Throwable th) {
                if (!z2 || !z) {
                    if (z) {
                        try {
                            this.streamingService.dropStreamingConfig(this.streamingService.getStreamingManager().getStreamingConfig(deserializeSchemalDescV2.getName()));
                        } catch (IOException e6) {
                            throw new InternalErrorException(msg.getROLLBACK_STREAMING_CONFIG_FAIL());
                        }
                    }
                    if (z2) {
                        try {
                            this.kafkaConfigService.dropKafkaConfig(this.kafkaConfigService.getKafkaConfig(deserializeKafkaSchemalDescV2.getName()));
                        } catch (IOException e7) {
                            throw new InternalErrorException(msg.getROLLBACK_KAFKA_CONFIG_FAIL());
                        }
                    }
                }
                throw th;
            }
        } catch (IOException e8) {
            throw new InternalErrorException(msg.getADD_STREAMING_TABLE_FAIL());
        }
    }

    @RequestMapping(value = {""}, method = {RequestMethod.PUT}, produces = {"application/vnd.apache.kylin-v2+json"})
    @ResponseBody
    public void updateStreamingConfigV2(@RequestBody StreamingRequest streamingRequest) throws IOException {
        Message msg = MsgPicker.getMsg();
        StreamingConfig deserializeSchemalDescV2 = deserializeSchemalDescV2(streamingRequest);
        KafkaConfig deserializeKafkaSchemalDescV2 = deserializeKafkaSchemalDescV2(streamingRequest);
        if (deserializeSchemalDescV2 == null) {
            throw new BadRequestException(msg.getINVALID_STREAMING_CONFIG_DEFINITION());
        }
        try {
            this.streamingService.updateStreamingConfig(deserializeSchemalDescV2);
            try {
                this.kafkaConfigService.updateKafkaConfig(deserializeKafkaSchemalDescV2);
            } catch (AccessDeniedException e) {
                throw new ForbiddenException(msg.getUPDATE_KAFKA_CONFIG_NO_RIGHT());
            }
        } catch (AccessDeniedException e2) {
            throw new ForbiddenException(msg.getUPDATE_STREAMING_CONFIG_NO_RIGHT());
        }
    }

    @RequestMapping(value = {"/{configName}"}, method = {RequestMethod.DELETE}, produces = {"application/vnd.apache.kylin-v2+json"})
    @ResponseBody
    public void deleteConfigV2(@PathVariable String str) throws IOException {
        Message msg = MsgPicker.getMsg();
        StreamingConfig streamingConfig = this.streamingService.getStreamingManager().getStreamingConfig(str);
        KafkaConfig kafkaConfig = this.kafkaConfigService.getKafkaConfig(str);
        if (null == streamingConfig) {
            throw new BadRequestException(String.format(msg.getSTREAMING_CONFIG_NOT_FOUND(), str));
        }
        this.streamingService.dropStreamingConfig(streamingConfig);
        this.kafkaConfigService.dropKafkaConfig(kafkaConfig);
    }

    private TableDesc deserializeTableDescV2(StreamingRequest streamingRequest) throws IOException {
        Message msg = MsgPicker.getMsg();
        try {
            logger.debug("Saving TableDesc " + streamingRequest.getTableData());
            TableDesc tableDesc = (TableDesc) JsonUtil.readValue(streamingRequest.getTableData(), TableDesc.class);
            if (null != tableDesc) {
                String[] parseHiveTableName = HadoopUtil.parseHiveTableName(tableDesc.getName());
                tableDesc.setName(parseHiveTableName[1]);
                tableDesc.setDatabase(parseHiveTableName[0]);
                tableDesc.getIdentity();
            }
            return tableDesc;
        } catch (JsonParseException e) {
            logger.error("The TableDesc definition is invalid.", e);
            throw new BadRequestException(msg.getINVALID_TABLE_DESC_DEFINITION());
        } catch (JsonMappingException e2) {
            logger.error("The data TableDesc definition is invalid.", e2);
            throw new BadRequestException(msg.getINVALID_TABLE_DESC_DEFINITION());
        }
    }

    private StreamingConfig deserializeSchemalDescV2(StreamingRequest streamingRequest) throws IOException {
        Message msg = MsgPicker.getMsg();
        try {
            logger.debug("Saving StreamingConfig " + streamingRequest.getStreamingConfig());
            return (StreamingConfig) JsonUtil.readValue(streamingRequest.getStreamingConfig(), StreamingConfig.class);
        } catch (JsonParseException e) {
            logger.error("The StreamingConfig definition is invalid.", e);
            throw new BadRequestException(msg.getINVALID_STREAMING_CONFIG_DEFINITION());
        } catch (JsonMappingException e2) {
            logger.error("The data StreamingConfig definition is invalid.", e2);
            throw new BadRequestException(msg.getINVALID_STREAMING_CONFIG_DEFINITION());
        }
    }

    private KafkaConfig deserializeKafkaSchemalDescV2(StreamingRequest streamingRequest) throws IOException {
        Message msg = MsgPicker.getMsg();
        try {
            logger.debug("Saving KafkaConfig " + streamingRequest.getKafkaConfig());
            return (KafkaConfig) JsonUtil.readValue(streamingRequest.getKafkaConfig(), KafkaConfig.class);
        } catch (JsonParseException e) {
            logger.error("The KafkaConfig definition is invalid.", e);
            throw new BadRequestException(msg.getINVALID_KAFKA_CONFIG_DEFINITION());
        } catch (JsonMappingException e2) {
            logger.error("The data KafkaConfig definition is invalid.", e2);
            updateRequest(streamingRequest, false, e2.getMessage());
            throw new BadRequestException(msg.getINVALID_KAFKA_CONFIG_DEFINITION());
        }
    }

    private void updateRequest(StreamingRequest streamingRequest, boolean z, String str) {
        streamingRequest.setSuccessful(z);
        streamingRequest.setMessage(str);
    }
}
