/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.message;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.inlong.common.enums.DataProxyMsgEncType;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.util.StringUtil;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.message.DeserializeOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class InlongMsgDeserializeOperator
implements DeserializeOperator {
    private static final Logger log = LoggerFactory.getLogger(InlongMsgDeserializeOperator.class);

    @Override
    public boolean accept(DataProxyMsgEncType type) {
        return DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.equals((Object)type);
    }

    @Override
    public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, byte[] msgBytes, Map<String, String> headers, int index) throws Exception {
        String groupId = headers.get("groupId");
        String streamId = headers.get("streamId");
        ArrayList<BriefMQMessage> messageList = new ArrayList<BriefMQMessage>();
        InLongMsg inLongMsg = InLongMsg.parseFrom((byte[])msgBytes);
        for (String attr : inLongMsg.getAttrs()) {
            long msgTime;
            Map attributes = StringUtil.splitKv((String)attr, (Character)Character.valueOf('&'), (Character)Character.valueOf('='), null, null);
            if (attributes.containsKey("t")) {
                String date = ((String)attributes.get("t")).trim();
                msgTime = StringUtil.parseDateTime((String)date);
            } else if (attributes.containsKey("dt")) {
                String epoch = ((String)attributes.get("dt")).trim();
                msgTime = Long.parseLong(epoch);
            } else {
                throw new IllegalArgumentException(String.format("PARSE_ATTR_ERROR_STRING%s", "t or dt"));
            }
            Iterator iterator = inLongMsg.getIterator(attr);
            while (iterator.hasNext()) {
                byte[] bodyBytes = (byte[])iterator.next();
                if (Objects.isNull(bodyBytes)) continue;
                BriefMQMessage inLongMessage = new BriefMQMessage(Integer.valueOf(index), groupId, streamId, Long.valueOf(msgTime), (String)attributes.get("clientIp"), new String(bodyBytes, Charset.forName(streamInfo.getDataEncoding())));
                messageList.add(inLongMessage);
            }
        }
        return messageList;
    }
}

