/*
 * Decompiled with CFR 0.152.
 */
package io.ballerina.messaging.broker.amqp.codec.frames;

import io.ballerina.messaging.broker.amqp.AmqpException;
import io.ballerina.messaging.broker.amqp.codec.AmqpChannel;
import io.ballerina.messaging.broker.amqp.codec.InMemoryMessageAggregator;
import io.ballerina.messaging.broker.amqp.codec.frames.GeneralFrame;
import io.ballerina.messaging.broker.amqp.codec.handlers.AmqpConnectionHandler;
import io.ballerina.messaging.broker.core.BrokerException;
import io.ballerina.messaging.broker.core.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ContentFrame
extends GeneralFrame {
    private static final Logger LOGGER = LoggerFactory.getLogger(ContentFrame.class);
    private final long length;
    private final ByteBuf payload;

    public ContentFrame(int channel, long length, ByteBuf payload) {
        super((byte)3, channel);
        this.length = length;
        this.payload = payload;
    }

    @Override
    public long getPayloadSize() {
        return this.length;
    }

    @Override
    public void writePayload(ByteBuf buf) {
        try {
            buf.writeBytes(this.payload);
        }
        finally {
            this.payload.release();
        }
    }

    @Override
    public void handle(ChannelHandlerContext ctx, AmqpConnectionHandler connectionHandler) {
        boolean allContentReceived;
        AmqpChannel channel = connectionHandler.getChannel(this.getChannel());
        InMemoryMessageAggregator messageAggregator = channel.getMessageAggregator();
        try {
            allContentReceived = messageAggregator.contentBodyReceived(this.length, this.payload);
        }
        catch (AmqpException e) {
            LOGGER.warn("Content receiving failed", (Throwable)e);
            return;
        }
        if (allContentReceived) {
            Message message = messageAggregator.popMessage();
            ctx.fireChannelRead(() -> {
                try {
                    messageAggregator.publish(message);
                    ctx.executor().submit(() -> channel.getFlowManager().notifyMessageRemoval(ctx));
                }
                catch (BrokerException e) {
                    LOGGER.warn("Content receiving failed", (Throwable)e);
                }
            });
        }
    }

    public static ContentFrame parse(ByteBuf buf, int channel, long payloadSize) {
        ByteBuf payload = buf.retainedSlice(buf.readerIndex(), (int)payloadSize);
        buf.skipBytes((int)payloadSize);
        return new ContentFrame(channel, payloadSize, payload);
    }
}

