/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class MessageIdTest
extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(MessageIdTest.class);

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        this.baseSetup();
    }

    @Override
    @AfterMethod
    protected void cleanup() throws Exception {
        this.internalCleanup();
    }

    @Test(timeOut=10000L)
    public void producerSendAsync() throws PulsarClientException {
        String key = "producerSendAsync";
        String topicName = "persistent://prop/cluster/namespace/topic-" + key;
        String subscriptionName = "my-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int numberOfMessages = 30;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        HashSet<MessageIdImpl> messageIds = new HashSet<MessageIdImpl>();
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        for (int i = 0; i < 30; ++i) {
            String message = messagePredicate + i;
            futures.add(producer.sendAsync((Object)message.getBytes()));
        }
        MessageIdImpl previousMessageId = null;
        for (Future future : futures) {
            try {
                MessageIdImpl currentMessageId = (MessageIdImpl)future.get();
                if (previousMessageId != null) {
                    Assert.assertTrue((currentMessageId.compareTo(previousMessageId) > 0 ? 1 : 0) != 0, (String)"Message Ids should be in ascending order");
                }
                messageIds.add(currentMessageId);
                previousMessageId = currentMessageId;
            }
            catch (Exception e) {
                Assert.fail((String)("Failed to publish message, Exception: " + e.getMessage()));
            }
        }
        log.info("Message IDs = " + messageIds);
        Assert.assertEquals((int)messageIds.size(), (int)30, (String)"Not all messages published successfully");
        for (int i = 0; i < 30; ++i) {
            Message message = consumer.receive();
            Assert.assertEquals((String)new String(message.getData()), (String)(messagePredicate + i));
            MessageId messageId = message.getMessageId();
            Assert.assertTrue((boolean)messageIds.remove(messageId), (String)"Failed to receive message");
        }
        log.info("Message IDs = " + messageIds);
        Assert.assertEquals((int)messageIds.size(), (int)0, (String)"Not all messages received successfully");
        consumer.unsubscribe();
    }

    @Test(timeOut=10000L)
    public void producerSend() throws PulsarClientException {
        int i;
        String key = "producerSend";
        String topicName = "persistent://prop/cluster/namespace/topic-" + key;
        String subscriptionName = "my-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int numberOfMessages = 30;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        HashSet<MessageId> messageIds = new HashSet<MessageId>();
        for (i = 0; i < 30; ++i) {
            String message = messagePredicate + i;
            messageIds.add(producer.send((Object)message.getBytes()));
        }
        log.info("Message IDs = " + messageIds);
        Assert.assertEquals((int)messageIds.size(), (int)30, (String)"Not all messages published successfully");
        for (i = 0; i < 30; ++i) {
            Assert.assertTrue((boolean)messageIds.remove(consumer.receive().getMessageId()), (String)"Failed to receive Message");
        }
        log.info("Message IDs = " + messageIds);
        Assert.assertEquals((int)messageIds.size(), (int)0, (String)"Not all messages received successfully");
        consumer.unsubscribe();
    }

    @Test(timeOut=10000L)
    public void partitionedProducerSendAsync() throws PulsarClientException, PulsarAdminException {
        int i;
        String key = "partitionedProducerSendAsync";
        String topicName = "persistent://prop/cluster/namespace/topic-" + key;
        String subscriptionName = "my-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int numberOfMessages = 30;
        int numberOfPartitions = 3;
        this.admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        HashSet messageIds = new HashSet();
        HashSet<CompletableFuture> futures = new HashSet<CompletableFuture>();
        for (i = 0; i < 30; ++i) {
            String message = messagePredicate + i;
            futures.add(producer.sendAsync((Object)message.getBytes()));
        }
        futures.forEach(f -> {
            try {
                messageIds.add(f.get());
            }
            catch (Exception e) {
                Assert.fail((String)("Failed to publish message, Exception: " + e.getMessage()));
            }
        });
        log.info("Message IDs = " + messageIds);
        Assert.assertEquals((int)messageIds.size(), (int)30, (String)"Not all messages published successfully");
        for (i = 0; i < 30; ++i) {
            MessageId topicMessageId = consumer.receive().getMessageId();
            MessageId messageId = ((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
            log.info("Message ID Received = " + messageId);
            Assert.assertTrue((boolean)messageIds.remove(messageId), (String)"Failed to receive Message");
        }
        log.info("Message IDs = " + messageIds);
        Assert.assertEquals((int)messageIds.size(), (int)0, (String)"Not all messages received successfully");
        consumer.unsubscribe();
    }

    @Test(timeOut=10000L)
    public void partitionedProducerSend() throws PulsarClientException, PulsarAdminException {
        int i;
        String key = "partitionedProducerSend";
        String topicName = "persistent://prop/cluster/namespace/topic-" + key;
        String subscriptionName = "my-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int numberOfMessages = 30;
        int numberOfPartitions = 7;
        this.admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        HashSet<MessageId> messageIds = new HashSet<MessageId>();
        for (i = 0; i < 30; ++i) {
            String message = messagePredicate + i;
            messageIds.add(producer.send((Object)message.getBytes()));
        }
        log.info("Message IDs = " + messageIds);
        Assert.assertEquals((int)messageIds.size(), (int)30, (String)"Not all messages published successfully");
        for (i = 0; i < 30; ++i) {
            MessageId topicMessageId = consumer.receive().getMessageId();
            MessageId messageId = ((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
            Assert.assertTrue((boolean)messageIds.remove(messageId), (String)"Failed to receive Message");
        }
        log.info("Message IDs = " + messageIds);
        Assert.assertEquals((int)messageIds.size(), (int)0, (String)"Not all messages received successfully");
    }

    @Test
    public void testChecksumVersionComptability() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/topic1";
        ProducerImpl prod = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        ProducerImpl producer = (ProducerImpl)Mockito.spy((Object)prod);
        ((ProducerImpl)Mockito.doReturn((Object)(producer.brokerChecksumSupportedVersion() + 1)).when((Object)producer)).brokerChecksumSupportedVersion();
        ((ProducerImpl)Mockito.doAnswer(invocationOnMock -> prod.getState()).when((Object)producer)).getState();
        ((ProducerImpl)Mockito.doAnswer(invocationOnMock -> prod.getClientCnx()).when((Object)producer)).getClientCnx();
        ((ProducerImpl)Mockito.doAnswer(invocationOnMock -> prod.cnx()).when((Object)producer)).cnx();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/topic1"}).subscriptionName("my-sub").subscribe();
        this.stopBroker();
        ((PulsarClientImpl)this.pulsarClient).timer().stop();
        ClientCnx mockClientCnx = (ClientCnx)Mockito.spy((Object)new ClientCnx(new ClientConfigurationData(), ((PulsarClientImpl)this.pulsarClient).eventLoopGroup()));
        ((ClientCnx)Mockito.doReturn((Object)(producer.brokerChecksumSupportedVersion() - 1)).when((Object)mockClientCnx)).getRemoteEndpointProtocolVersion();
        prod.setClientCnx(mockClientCnx);
        CompletableFuture future1 = producer.sendAsync((Object)"message-1".getBytes());
        byte[] a2 = "message-2".getBytes();
        TypedMessageBuilder msg2 = producer.newMessage().value((Object)a2);
        CompletableFuture future2 = msg2.sendAsync();
        ((TypedMessageBuilderImpl)msg2).getContent().put(a2.length - 1, (byte)51);
        prod.setClientCnx(null);
        this.startBroker();
        prod.grabCnx();
        try {
            future1.get();
            future2.get();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"Broker shouldn't verify checksum for corrupted message and it shouldn't fail");
        }
        ((ConsumerImpl)consumer).grabCnx();
        Message msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertEquals((String)new String(msg.getData()), (String)"message-1");
        msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertEquals((String)new String(msg.getData()), (String)"message-3");
    }

    @Test
    public void testChecksumReconnection() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/topic1";
        ProducerImpl prod = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        ProducerImpl producer = (ProducerImpl)Mockito.spy((Object)prod);
        ((ProducerImpl)Mockito.doReturn((Object)(producer.brokerChecksumSupportedVersion() + 1)).when((Object)producer)).brokerChecksumSupportedVersion();
        ((ProducerImpl)Mockito.doAnswer(invocationOnMock -> prod.getState()).when((Object)producer)).getState();
        ((ProducerImpl)Mockito.doAnswer(invocationOnMock -> prod.getClientCnx()).when((Object)producer)).getClientCnx();
        ((ProducerImpl)Mockito.doAnswer(invocationOnMock -> prod.cnx()).when((Object)producer)).cnx();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/topic1"}).subscriptionName("my-sub").subscribe();
        this.stopBroker();
        ((PulsarClientImpl)this.pulsarClient).timer().stop();
        ClientCnx mockClientCnx = (ClientCnx)Mockito.spy((Object)new ClientCnx(new ClientConfigurationData(), ((PulsarClientImpl)this.pulsarClient).eventLoopGroup()));
        ((ClientCnx)Mockito.doReturn((Object)(producer.brokerChecksumSupportedVersion() - 1)).when((Object)mockClientCnx)).getRemoteEndpointProtocolVersion();
        prod.setClientCnx(mockClientCnx);
        CompletableFuture future1 = producer.sendAsync((Object)"message-1".getBytes());
        byte[] a2 = "message-2".getBytes();
        TypedMessageBuilder msg2 = producer.newMessage().value((Object)a2);
        CompletableFuture future2 = msg2.sendAsync();
        ((TypedMessageBuilderImpl)msg2).getContent().put(a2.length - 1, (byte)51);
        prod.setClientCnx(null);
        this.startBroker();
        prod.grabCnx();
        try {
            future1.get(10L, TimeUnit.SECONDS);
            future2.get(10L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"Broker shouldn't verify checksum for corrupted message and it shouldn't fail");
        }
        ((ConsumerImpl)consumer).grabCnx();
        Message msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertEquals((String)new String(msg.getData()), (String)"message-1");
        msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertEquals((String)new String(msg.getData()), (String)"message-3");
    }

    @Test
    public void testCorruptMessageRemove() throws Exception {
        CompletableFuture future;
        TypedMessageBuilder msg;
        Consumer consumer;
        long producerId;
        ProducerImpl producer;
        block4: {
            String topicName = "persistent://prop/use/ns-abc/retry-topic";
            ProducerImpl prod = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/retry-topic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).sendTimeout(10, TimeUnit.MINUTES).create();
            producer = (ProducerImpl)Mockito.spy((Object)prod);
            Field producerIdField = ProducerImpl.class.getDeclaredField("producerId");
            producerIdField.setAccessible(true);
            producerId = (Long)producerIdField.get(producer);
            producer.cnx().registerProducer(producerId, producer);
            consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/retry-topic"}).subscriptionName("my-sub").subscribe();
            this.stopBroker();
            byte[] a = "message-1".getBytes();
            msg = producer.newMessage().value((Object)a);
            future = msg.sendAsync();
            ((TypedMessageBuilderImpl)msg).getContent().put(a.length - 1, (byte)50);
            this.startBroker();
            try {
                future.get();
                Assert.fail((String)"send message should have failed with checksum excetion");
            }
            catch (Exception e) {
                if (e.getCause() instanceof PulsarClientException.ChecksumException) break block4;
                Assert.fail((String)"Callback should have only failed with ChecksumException", (Throwable)e);
            }
        }
        byte[] a2 = "message-2".getBytes();
        TypedMessageBuilderImpl msg2 = (TypedMessageBuilderImpl)producer.newMessage().value((Object)"message-1".getBytes());
        ByteBuf payload = Unpooled.wrappedBuffer((ByteBuffer)msg2.getContent());
        PulsarApi.MessageMetadata.Builder metadataBuilder = ((TypedMessageBuilderImpl)msg).getMetadataBuilder();
        PulsarApi.MessageMetadata msgMetadata = metadataBuilder.setProducerName("test").setSequenceId(1L).setPublishTime(10L).build();
        ByteBufPair cmd = Commands.newSend((long)producerId, (long)1L, (int)1, (Commands.ChecksumType)Commands.ChecksumType.Crc32c, (PulsarApi.MessageMetadata)msgMetadata, (ByteBuf)payload);
        ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create((MessageImpl)((MessageImpl)msg2.getMessage()), (ByteBufPair)cmd, (long)1L, null);
        Assert.assertTrue((boolean)producer.verifyLocalBufferIsNotCorrupted(op));
        msg2.getContent().put(a2.length - 1, (byte)50);
        Assert.assertFalse((boolean)producer.verifyLocalBufferIsNotCorrupted(op));
        Assert.assertEquals((int)producer.getPendingQueueSize(), (int)0);
        this.stopBroker();
        TypedMessageBuilderImpl msg1 = (TypedMessageBuilderImpl)producer.newMessage().value((Object)"message-1".getBytes());
        future = msg1.sendAsync();
        ClientCnx cnx = (ClientCnx)Mockito.spy((Object)new ClientCnx(new ClientConfigurationData(), ((PulsarClientImpl)this.pulsarClient).eventLoopGroup()));
        String exc = "broker is already stopped";
        ((ClientCnx)Mockito.doThrow((Throwable[])new Throwable[]{new IllegalStateException(exc)}).when((Object)cnx)).ctx();
        try {
            producer.recoverChecksumError(cnx, 1L);
            Assert.fail((String)"it should call : resendMessages() => which should throw above mocked exception");
        }
        catch (IllegalStateException e) {
            Assert.assertEquals((String)exc, (String)e.getMessage());
        }
        producer.close();
        consumer.close();
        producer = null;
    }
}

