/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import io.netty.buffer.ByteBuf;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder;
import org.apache.pulsar.common.api.proto.PulsarMarkers;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ReplicatedSubscriptionsSnapshotBuilderTest {
    private final String localCluster = "a";
    private long currentTime = 0L;
    private Clock clock;
    private ServiceConfiguration conf;
    private ReplicatedSubscriptionsController controller;
    private List<ByteBuf> markers;

    @BeforeMethod
    public void setup() {
        this.clock = (Clock)Mockito.mock(Clock.class);
        Mockito.when((Object)this.clock.millis()).thenAnswer(invocation -> this.currentTime);
        this.conf = new ServiceConfiguration();
        this.conf.setReplicatedSubscriptionsSnapshotTimeoutSeconds(3);
        this.markers = new ArrayList<ByteBuf>();
        this.controller = (ReplicatedSubscriptionsController)Mockito.mock(ReplicatedSubscriptionsController.class);
        Mockito.when((Object)this.controller.localCluster()).thenReturn((Object)"a");
        ((ReplicatedSubscriptionsController)Mockito.doAnswer(invocation -> {
            ByteBuf marker = (ByteBuf)invocation.getArgument(0, ByteBuf.class);
            Commands.skipMessageMetadata((ByteBuf)marker);
            this.markers.add(marker);
            return null;
        }).when((Object)this.controller)).writeMarker((ByteBuf)Mockito.any(ByteBuf.class));
    }

    @Test
    public void testBuildSnapshotWith2Clusters() throws Exception {
        List<String> remoteClusters = Arrays.asList("b");
        ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this.controller, remoteClusters, this.conf, this.clock);
        Assert.assertTrue((boolean)this.markers.isEmpty());
        builder.start();
        Assert.assertEquals((int)this.markers.size(), (int)1);
        PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest request = Markers.parseReplicatedSubscriptionsSnapshotRequest((ByteBuf)this.markers.remove(0));
        Assert.assertEquals((String)request.getSourceCluster(), (String)"a");
        builder.receivedSnapshotResponse((Position)new PositionImpl(1L, 1L), PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse.newBuilder().setSnapshotId("snapshot-1").setCluster(PulsarMarkers.ClusterMessageId.newBuilder().setCluster("b").setMessageId(PulsarMarkers.MessageIdData.newBuilder().setLedgerId(11L).setEntryId(11L).build())).build());
        Assert.assertEquals((int)this.markers.size(), (int)1);
        PulsarMarkers.ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot((ByteBuf)this.markers.remove(0));
        Assert.assertEquals((int)snapshot.getClustersCount(), (int)1);
        Assert.assertEquals((String)snapshot.getClusters(0).getCluster(), (String)"b");
        Assert.assertEquals((long)snapshot.getClusters(0).getMessageId().getLedgerId(), (long)11L);
        Assert.assertEquals((long)snapshot.getClusters(0).getMessageId().getEntryId(), (long)11L);
        Assert.assertEquals((long)snapshot.getLocalMessageId().getLedgerId(), (long)1L);
        Assert.assertEquals((long)snapshot.getLocalMessageId().getEntryId(), (long)1L);
    }

    @Test
    public void testBuildSnapshotWith3Clusters() throws Exception {
        List<String> remoteClusters = Arrays.asList("b", "c");
        ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this.controller, remoteClusters, this.conf, this.clock);
        Assert.assertTrue((boolean)this.markers.isEmpty());
        builder.start();
        Assert.assertEquals((int)this.markers.size(), (int)1);
        PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest request = Markers.parseReplicatedSubscriptionsSnapshotRequest((ByteBuf)this.markers.remove(0));
        Assert.assertEquals((String)request.getSourceCluster(), (String)"a");
        builder.receivedSnapshotResponse((Position)new PositionImpl(1L, 1L), PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse.newBuilder().setSnapshotId("snapshot-1").setCluster(PulsarMarkers.ClusterMessageId.newBuilder().setCluster("b").setMessageId(PulsarMarkers.MessageIdData.newBuilder().setLedgerId(11L).setEntryId(11L).build())).build());
        Assert.assertTrue((boolean)this.markers.isEmpty());
        builder.receivedSnapshotResponse((Position)new PositionImpl(2L, 2L), PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse.newBuilder().setSnapshotId("snapshot-1").setCluster(PulsarMarkers.ClusterMessageId.newBuilder().setCluster("c").setMessageId(PulsarMarkers.MessageIdData.newBuilder().setLedgerId(22L).setEntryId(22L).build())).build());
        Assert.assertEquals((int)this.markers.size(), (int)1);
        request = Markers.parseReplicatedSubscriptionsSnapshotRequest((ByteBuf)this.markers.remove(0));
        Assert.assertEquals((String)request.getSourceCluster(), (String)"a");
        builder.receivedSnapshotResponse((Position)new PositionImpl(3L, 3L), PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse.newBuilder().setSnapshotId("snapshot-1").setCluster(PulsarMarkers.ClusterMessageId.newBuilder().setCluster("b").setMessageId(PulsarMarkers.MessageIdData.newBuilder().setLedgerId(33L).setEntryId(33L).build())).build());
        Assert.assertTrue((boolean)this.markers.isEmpty());
        builder.receivedSnapshotResponse((Position)new PositionImpl(4L, 4L), PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse.newBuilder().setSnapshotId("snapshot-1").setCluster(PulsarMarkers.ClusterMessageId.newBuilder().setCluster("c").setMessageId(PulsarMarkers.MessageIdData.newBuilder().setLedgerId(44L).setEntryId(44L).build())).build());
        Assert.assertEquals((int)this.markers.size(), (int)1);
        PulsarMarkers.ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot((ByteBuf)this.markers.remove(0));
        Assert.assertEquals((int)snapshot.getClustersCount(), (int)2);
        Assert.assertEquals((String)snapshot.getClusters(0).getCluster(), (String)"b");
        Assert.assertEquals((long)snapshot.getClusters(0).getMessageId().getLedgerId(), (long)11L);
        Assert.assertEquals((long)snapshot.getClusters(0).getMessageId().getEntryId(), (long)11L);
        Assert.assertEquals((String)snapshot.getClusters(1).getCluster(), (String)"c");
        Assert.assertEquals((long)snapshot.getClusters(1).getMessageId().getLedgerId(), (long)22L);
        Assert.assertEquals((long)snapshot.getClusters(1).getMessageId().getEntryId(), (long)22L);
        Assert.assertEquals((long)snapshot.getLocalMessageId().getLedgerId(), (long)4L);
        Assert.assertEquals((long)snapshot.getLocalMessageId().getEntryId(), (long)4L);
    }

    @Test
    public void testBuildTimeout() throws Exception {
        List<String> remoteClusters = Arrays.asList("b");
        ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this.controller, remoteClusters, this.conf, this.clock);
        Assert.assertFalse((boolean)builder.isTimedOut());
        builder.start();
        this.currentTime = 2000L;
        Assert.assertFalse((boolean)builder.isTimedOut());
        this.currentTime = 5000L;
        Assert.assertTrue((boolean)builder.isTimedOut());
    }
}

