/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import java.util.List;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.common.protocol.Commands;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test
public class ChecksumTest
extends BrokerTestBase {
    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void verifyChecksumStoredInManagedLedger() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/topic0";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/topic0").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic0").get();
        ManagedLedger ledger = topic.getManagedLedger();
        ManagedCursor cursor = ledger.openCursor("test");
        producer.send((Object)"Hello".getBytes());
        List entries = cursor.readEntriesOrWait(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        ByteBuf b = ((Entry)entries.get(0)).getDataBuffer();
        Assert.assertTrue((boolean)Commands.hasChecksum((ByteBuf)b));
        int parsedChecksum = Commands.readChecksum((ByteBuf)b);
        int computedChecksum = Crc32cIntChecksum.computeChecksum((ByteBuf)b);
        Assert.assertEquals((int)parsedChecksum, (int)computedChecksum);
        ((Entry)entries.get(0)).release();
        producer.close();
    }

    @Test
    public void verifyChecksumSentToConsumer() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/topic-1";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/topic-1").create();
        RawReader reader = (RawReader)RawReader.create((PulsarClient)this.pulsarClient, (String)"persistent://prop/use/ns-abc/topic-1", (String)"sub").get();
        producer.send((Object)"Hello".getBytes());
        RawMessage msg = (RawMessage)reader.readNextAsync().get();
        ByteBuf b = msg.getHeadersAndPayload();
        Assert.assertTrue((boolean)Commands.hasChecksum((ByteBuf)b));
        int parsedChecksum = Commands.readChecksum((ByteBuf)b);
        int computedChecksum = Crc32cIntChecksum.computeChecksum((ByteBuf)b);
        Assert.assertEquals((int)parsedChecksum, (int)computedChecksum);
        producer.close();
        reader.closeAsync().get();
    }
}

