/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.proxy.server.ProxyService;
import org.apache.pulsar.proxy.stats.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParserProxyHandler
extends ChannelInboundHandlerAdapter {
    private final Channel channel;
    protected static final String FRONTEND_CONN = "frontendconn";
    protected static final String BACKEND_CONN = "backendconn";
    private final String connType;
    private final int maxMessageSize;
    private final ChannelId peerChannelId;
    private final ProxyService service;
    private static final Map<String, String> producerHashMap = new ConcurrentHashMap<String, String>();
    private static final Map<String, String> consumerHashMap = new ConcurrentHashMap<String, String>();
    private static final Logger log = LoggerFactory.getLogger(ParserProxyHandler.class);

    public ParserProxyHandler(ProxyService service, Channel channel, String type, int maxMessageSize, ChannelId peerChannelId) {
        this.service = service;
        this.channel = channel;
        this.connType = type;
        this.maxMessageSize = maxMessageSize;
        this.peerChannelId = peerChannelId;
    }

    private void logging(Channel conn, PulsarApi.BaseCommand.Type cmdtype, String info, List<RawMessage> messages) throws Exception {
        if (messages != null) {
            StringBuilder infoBuilder = new StringBuilder(info);
            for (RawMessage message : messages) {
                infoBuilder.append("[").append(System.currentTimeMillis() - message.getPublishTime()).append("] ").append(new String(ByteBufUtil.getBytes((ByteBuf)message.getData()), StandardCharsets.UTF_8));
            }
            info = infoBuilder.toString();
        }
        switch (this.connType) {
            case "frontendconn": {
                log.info("frontendconn:{} cmd:{} msg:{}", new Object[]{"[" + conn.remoteAddress() + conn.localAddress() + "]", cmdtype, info});
                break;
            }
            case "backendconn": {
                log.info("backendconn:{} cmd:{} msg:{}", new Object[]{"[" + conn.localAddress() + conn.remoteAddress() + "]", cmdtype, info});
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buffer;
        ArrayList<RawMessage> messages;
        PulsarApi.BaseCommand cmd;
        block16: {
            cmd = null;
            PulsarApi.BaseCommand.Builder cmdBuilder = null;
            messages = new ArrayList<RawMessage>();
            buffer = (ByteBuf)msg;
            try {
                buffer.markReaderIndex();
                buffer.markWriterIndex();
                int cmdSize = (int)buffer.readUnsignedInt();
                int writerIndex = buffer.writerIndex();
                buffer.writerIndex(buffer.readerIndex() + cmdSize);
                ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get((ByteBuf)buffer);
                cmdBuilder = PulsarApi.BaseCommand.newBuilder();
                cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build();
                buffer.writerIndex(writerIndex);
                cmdInputStream.recycle();
                switch (cmd.getType()) {
                    case PRODUCER: {
                        producerHashMap.put(cmd.getProducer().getProducerId() + "," + ctx.channel().id(), cmd.getProducer().getTopic());
                        this.logging(ctx.channel(), cmd.getType(), "{producer:" + cmd.getProducer().getProducerName() + ",topic:" + cmd.getProducer().getTopic() + "}", null);
                        break;
                    }
                    case SEND: {
                        if (this.service.getProxyLogLevel() != 2) {
                            this.logging(ctx.channel(), cmd.getType(), "", null);
                            break;
                        }
                        TopicName topicName = TopicName.get((String)producerHashMap.get(cmd.getSend().getProducerId() + "," + ctx.channel().id()));
                        MutableLong msgBytes = new MutableLong(0L);
                        MessageParser.parseMessage((TopicName)topicName, (long)-1L, (long)-1L, (ByteBuf)buffer, message -> {
                            messages.add(message);
                            msgBytes.add((long)message.getData().readableBytes());
                        }, (int)this.maxMessageSize);
                        TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(), topic -> new TopicStats());
                        topicStats.getMsgInRate().recordMultipleEvents((long)messages.size(), msgBytes.longValue());
                        this.logging(ctx.channel(), cmd.getType(), "", messages);
                        break;
                    }
                    case SUBSCRIBE: {
                        consumerHashMap.put(cmd.getSubscribe().getConsumerId() + "," + ctx.channel().id(), cmd.getSubscribe().getTopic());
                        this.logging(ctx.channel(), cmd.getType(), "{consumer:" + cmd.getSubscribe().getConsumerName() + ",topic:" + cmd.getSubscribe().getTopic() + "}", null);
                        break;
                    }
                    case MESSAGE: {
                        if (this.service.getProxyLogLevel() != 2) {
                            this.logging(ctx.channel(), cmd.getType(), "", null);
                            break;
                        }
                        TopicName topicName = TopicName.get((String)consumerHashMap.get(cmd.getMessage().getConsumerId() + "," + this.peerChannelId));
                        MutableLong msgBytes = new MutableLong(0L);
                        MessageParser.parseMessage((TopicName)topicName, (long)-1L, (long)-1L, (ByteBuf)buffer, message -> {
                            messages.add(message);
                            msgBytes.add((long)message.getData().readableBytes());
                        }, (int)this.maxMessageSize);
                        TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(), topic -> new TopicStats());
                        topicStats.getMsgOutRate().recordMultipleEvents((long)messages.size(), msgBytes.longValue());
                        this.logging(ctx.channel(), cmd.getType(), "", messages);
                        break;
                    }
                    default: {
                        this.logging(ctx.channel(), cmd.getType(), "", null);
                    }
                }
                if (cmdBuilder == null) break block16;
            }
            catch (Exception e) {
                block17: {
                    try {
                        log.error("channelRead error ", (Throwable)e);
                        if (cmdBuilder == null) break block17;
                    }
                    catch (Throwable throwable) {
                        if (cmdBuilder != null) {
                            cmdBuilder.recycle();
                        }
                        if (cmd != null) {
                            cmd.recycle();
                        }
                        buffer.resetReaderIndex();
                        buffer.resetWriterIndex();
                        ByteBuf totalSizeBuf = Unpooled.buffer((int)4);
                        totalSizeBuf.writeInt(buffer.readableBytes());
                        CompositeByteBuf compBuf = Unpooled.compositeBuffer();
                        compBuf.addComponents(new ByteBuf[]{totalSizeBuf, buffer});
                        compBuf.writerIndex(totalSizeBuf.capacity() + buffer.capacity());
                        messages.forEach(RawMessage::release);
                        ctx.fireChannelRead((Object)compBuf);
                        throw throwable;
                    }
                    cmdBuilder.recycle();
                }
                if (cmd != null) {
                    cmd.recycle();
                }
                buffer.resetReaderIndex();
                buffer.resetWriterIndex();
                ByteBuf totalSizeBuf = Unpooled.buffer((int)4);
                totalSizeBuf.writeInt(buffer.readableBytes());
                CompositeByteBuf compBuf = Unpooled.compositeBuffer();
                compBuf.addComponents(new ByteBuf[]{totalSizeBuf, buffer});
                compBuf.writerIndex(totalSizeBuf.capacity() + buffer.capacity());
                messages.forEach(RawMessage::release);
                ctx.fireChannelRead((Object)compBuf);
            }
            cmdBuilder.recycle();
        }
        if (cmd != null) {
            cmd.recycle();
        }
        buffer.resetReaderIndex();
        buffer.resetWriterIndex();
        ByteBuf totalSizeBuf = Unpooled.buffer((int)4);
        totalSizeBuf.writeInt(buffer.readableBytes());
        CompositeByteBuf compBuf = Unpooled.compositeBuffer();
        compBuf.addComponents(new ByteBuf[]{totalSizeBuf, buffer});
        compBuf.writerIndex(totalSizeBuf.capacity() + buffer.capacity());
        messages.forEach(RawMessage::release);
        ctx.fireChannelRead((Object)compBuf);
    }
}

