/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.message;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.util.StringUtil;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.datatype.DataTypeOperator;
import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
import org.apache.inlong.manager.service.message.DeserializeOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class InlongMsgDeserializeOperator
implements DeserializeOperator {
    private static final Logger log = LoggerFactory.getLogger(InlongMsgDeserializeOperator.class);
    @Autowired
    public DataTypeOperatorFactory dataTypeOperatorFactory;

    @Override
    public boolean accept(MessageWrapType type) {
        return MessageWrapType.INLONG_MSG_V0.equals((Object)type);
    }

    @Override
    public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, byte[] msgBytes, Map<String, String> headers, int index) {
        String groupId = headers.get("groupId");
        String streamId = headers.get("streamId");
        ArrayList<BriefMQMessage> messageList = new ArrayList<BriefMQMessage>();
        InLongMsg inLongMsg = InLongMsg.parseFrom((byte[])msgBytes);
        for (String attr : inLongMsg.getAttrs()) {
            long msgTime;
            Map attrMap = StringUtil.splitKv((String)attr, (Character)Character.valueOf('&'), (Character)Character.valueOf('='), null, null);
            if (attrMap.containsKey("t")) {
                String date = ((String)attrMap.get("t")).trim();
                msgTime = StringUtil.parseDateTime((String)date);
            } else if (attrMap.containsKey("dt")) {
                String epoch = ((String)attrMap.get("dt")).trim();
                msgTime = Long.parseLong(epoch);
            } else {
                throw new IllegalArgumentException(String.format("PARSE_ATTR_ERROR_STRING%s", "t or dt"));
            }
            Iterator iterator = inLongMsg.getIterator(attr);
            while (iterator.hasNext()) {
                byte[] bodyBytes = (byte[])iterator.next();
                if (Objects.isNull(bodyBytes)) continue;
                try {
                    String body = new String(bodyBytes, Charset.forName(streamInfo.getDataEncoding()));
                    DataTypeOperator dataTypeOperator = this.dataTypeOperatorFactory.getInstance(DataTypeEnum.forType((String)streamInfo.getDataType()));
                    List<BriefMQMessage.FieldInfo> streamFieldList = dataTypeOperator.parseFields(body, streamInfo);
                    BriefMQMessage message = BriefMQMessage.builder().id(Integer.valueOf(index)).inlongGroupId(groupId).inlongStreamId(streamId).dt(Long.valueOf(msgTime)).clientIp((String)attrMap.get("clientIp")).headers(headers).body(body).fieldList(streamFieldList).build();
                    messageList.add(message);
                }
                catch (Exception e) {
                    String errMsg = String.format("decode msg failed for groupId=%s, streamId=%s", groupId, streamId);
                    log.error(errMsg, (Throwable)e);
                    throw new BusinessException(errMsg);
                }
            }
        }
        return messageList;
    }
}

