/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.message;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.inlong.common.enums.DataProxyMsgEncType;
import org.apache.inlong.common.util.Utils;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.message.DeserializeOperator;
import org.apache.inlong.sdk.commons.protocol.ProxySdk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class PbMsgDeserializeOperator
implements DeserializeOperator {
    private static final Logger log = LoggerFactory.getLogger(PbMsgDeserializeOperator.class);

    @Override
    public boolean accept(DataProxyMsgEncType type) {
        return DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.equals((Object)type);
    }

    @Override
    public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, byte[] msgBytes, Map<String, String> headers, int index) throws Exception {
        int compressType = Integer.parseInt(headers.getOrDefault("compressType", "0"));
        byte[] values = msgBytes;
        switch (compressType) {
            case 0: {
                break;
            }
            case 1: {
                values = Utils.gzipDecompress((byte[])msgBytes, (int)0, (int)msgBytes.length);
                break;
            }
            case 2: {
                values = Utils.snappyDecompress((byte[])msgBytes, (int)0, (int)msgBytes.length);
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown compress type:" + compressType);
            }
        }
        return this.transformMessageObjs(ProxySdk.MessageObjs.parseFrom((byte[])values), streamInfo, index);
    }

    private List<BriefMQMessage> transformMessageObjs(ProxySdk.MessageObjs messageObjs, InlongStreamInfo streamInfo, int index) {
        if (null == messageObjs) {
            return null;
        }
        ArrayList<BriefMQMessage> messageList = new ArrayList<BriefMQMessage>();
        for (ProxySdk.MessageObj messageObj : messageObjs.getMsgsList()) {
            List mapFieldEntries = messageObj.getParamsList();
            HashMap<String, String> headers = new HashMap<String, String>();
            for (ProxySdk.MapFieldEntry mapFieldEntry : mapFieldEntries) {
                headers.put(mapFieldEntry.getKey(), mapFieldEntry.getValue());
            }
            BriefMQMessage message = BriefMQMessage.builder().id(Integer.valueOf(index)).inlongGroupId((String)headers.get("groupId")).inlongStreamId((String)headers.get("streamId")).dt(Long.valueOf(messageObj.getMsgTime())).clientIp((String)headers.get("clientIp")).body(new String(messageObj.getBody().toByteArray(), Charset.forName(streamInfo.getDataEncoding()))).build();
            messageList.add(message);
        }
        return messageList;
    }
}

