/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.PersistentTopicTest;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.apache.pulsar.transaction.impl.common.TxnID;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@PrepareForTest(value={ZooKeeperDataCache.class, BrokerService.class})
public class PersistentSubscriptionTest {
    private PulsarService pulsarMock;
    private BrokerService brokerMock;
    private ManagedLedgerFactory mlFactoryMock;
    private ManagedLedger ledgerMock;
    private ManagedCursorImpl cursorMock;
    private ConfigurationCacheService configCacheServiceMock;
    private PersistentTopic topic;
    private PersistentSubscription persistentSubscription;
    private Consumer consumerMock;
    final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
    final String subName = "subscriptionName";
    final TxnID txnID1 = new TxnID(1L, 1L);
    final TxnID txnID2 = new TxnID(1L, 2L);
    private static final Logger log = LoggerFactory.getLogger(PersistentTopicTest.class);

    @BeforeMethod
    public void setup() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        this.pulsarMock = (PulsarService)Mockito.spy((Object)new PulsarService(svcConfig));
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsarMock)).getConfiguration();
        ((PulsarService)Mockito.doReturn((Object)Mockito.mock(Compactor.class)).when((Object)this.pulsarMock)).getCompactor();
        this.mlFactoryMock = (ManagedLedgerFactory)Mockito.mock(ManagedLedgerFactory.class);
        ((PulsarService)Mockito.doReturn((Object)this.mlFactoryMock).when((Object)this.pulsarMock)).getManagedLedgerFactory();
        MockZooKeeper zkMock = MockedPulsarServiceBaseTest.createMockZooKeeper();
        ((PulsarService)Mockito.doReturn((Object)zkMock).when((Object)this.pulsarMock)).getZkClient();
        ((PulsarService)Mockito.doReturn((Object)((Object)MockedPulsarServiceBaseTest.createMockBookKeeper((ZooKeeper)zkMock, this.pulsarMock.getOrderedExecutor().chooseThread(0L)))).when((Object)this.pulsarMock)).getBookKeeperClient();
        ZooKeeperCache cache = (ZooKeeperCache)Mockito.mock(ZooKeeperCache.class);
        ((ZooKeeperCache)Mockito.doReturn((Object)30).when((Object)cache)).getZkOperationTimeoutSeconds();
        CompletableFuture getDataFuture = new CompletableFuture();
        getDataFuture.complete(Optional.empty());
        ((ZooKeeperCache)Mockito.doReturn(getDataFuture).when((Object)cache)).getDataAsync(ArgumentMatchers.anyString(), (Watcher)ArgumentMatchers.any(), (ZooKeeperCache.Deserializer)ArgumentMatchers.any());
        ((PulsarService)Mockito.doReturn((Object)cache).when((Object)this.pulsarMock)).getLocalZkCache();
        this.configCacheServiceMock = (ConfigurationCacheService)Mockito.mock(ConfigurationCacheService.class);
        ZooKeeperDataCache zkPoliciesDataCacheMock = (ZooKeeperDataCache)Mockito.mock(ZooKeeperDataCache.class);
        ((ConfigurationCacheService)Mockito.doReturn((Object)zkPoliciesDataCacheMock).when((Object)this.configCacheServiceMock)).policiesCache();
        ((PulsarService)Mockito.doReturn((Object)this.configCacheServiceMock).when((Object)this.pulsarMock)).getConfigurationCache();
        ((ZooKeeperDataCache)Mockito.doReturn(Optional.empty()).when((Object)zkPoliciesDataCacheMock)).get(ArgumentMatchers.anyString());
        LocalZooKeeperCacheService zkCacheMock = (LocalZooKeeperCacheService)Mockito.mock(LocalZooKeeperCacheService.class);
        ((ZooKeeperDataCache)Mockito.doReturn(CompletableFuture.completedFuture(Optional.empty())).when((Object)zkPoliciesDataCacheMock)).getAsync((String)ArgumentMatchers.any());
        ((LocalZooKeeperCacheService)Mockito.doReturn((Object)zkPoliciesDataCacheMock).when((Object)zkCacheMock)).policiesCache();
        ((PulsarService)Mockito.doReturn((Object)zkCacheMock).when((Object)this.pulsarMock)).getLocalZkCacheService();
        this.brokerMock = (BrokerService)Mockito.spy((Object)new BrokerService(this.pulsarMock));
        ((BrokerService)Mockito.doNothing().when((Object)this.brokerMock)).unloadNamespaceBundlesGracefully();
        ((PulsarService)Mockito.doReturn((Object)this.brokerMock).when((Object)this.pulsarMock)).getBrokerService();
        this.ledgerMock = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        this.cursorMock = (ManagedCursorImpl)Mockito.mock(ManagedCursorImpl.class);
        ((ManagedLedger)Mockito.doReturn(new ArrayList()).when((Object)this.ledgerMock)).getCursors();
        ((ManagedCursorImpl)Mockito.doReturn((Object)"mockCursor").when((Object)this.cursorMock)).getName();
        ((ManagedCursorImpl)Mockito.doReturn((Object)new PositionImpl(1L, 50L)).when((Object)this.cursorMock)).getMarkDeletedPosition();
        this.topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerMock);
        this.consumerMock = (Consumer)Mockito.mock(Consumer.class);
        this.persistentSubscription = new PersistentSubscription(this.topic, "subscriptionName", (ManagedCursor)this.cursorMock, false);
    }

    @AfterMethod
    public void teardown() throws Exception {
        this.brokerMock.close();
        try {
            this.pulsarMock.close();
        }
        catch (Exception e) {
            log.warn("Failed to close pulsar service", (Throwable)e);
            throw e;
        }
    }

    @Test
    public void testCanAcknowledgeAndCommitForTransaction() throws TransactionConflictException {
        ArrayList<PositionImpl> expectedSinglePositions = new ArrayList<PositionImpl>();
        expectedSinglePositions.add(new PositionImpl(1L, 1L));
        expectedSinglePositions.add(new PositionImpl(1L, 3L));
        expectedSinglePositions.add(new PositionImpl(1L, 5L));
        ((ManagedCursorImpl)Mockito.doAnswer(invocationOnMock -> {
            Assert.assertTrue((boolean)((List)invocationOnMock.getArguments()[0]).containsAll(expectedSinglePositions));
            ((AsyncCallbacks.DeleteCallback)invocationOnMock.getArguments()[1]).deleteComplete(invocationOnMock.getArguments()[2]);
            return null;
        }).when((Object)this.cursorMock)).asyncDelete((Iterable)ArgumentMatchers.any(List.class), (AsyncCallbacks.DeleteCallback)ArgumentMatchers.any(AsyncCallbacks.DeleteCallback.class), ArgumentMatchers.any());
        ((ManagedCursorImpl)Mockito.doAnswer(invocationOnMock -> {
            Assert.assertEquals((int)((PositionImpl)invocationOnMock.getArguments()[0]).compareTo(new PositionImpl(3L, 100L)), (int)0);
            ((AsyncCallbacks.MarkDeleteCallback)invocationOnMock.getArguments()[2]).markDeleteComplete(invocationOnMock.getArguments()[3]);
            return null;
        }).when((Object)this.cursorMock)).asyncMarkDelete((Position)ArgumentMatchers.any(), (Map)ArgumentMatchers.any(), (AsyncCallbacks.MarkDeleteCallback)ArgumentMatchers.any(AsyncCallbacks.MarkDeleteCallback.class), ArgumentMatchers.any());
        ArrayList<PositionImpl> positions = new ArrayList<PositionImpl>();
        positions.add(new PositionImpl(1L, 1L));
        positions.add(new PositionImpl(1L, 3L));
        positions.add(new PositionImpl(1L, 5L));
        this.persistentSubscription.acknowledgeMessage(this.txnID1, positions, PulsarApi.CommandAck.AckType.Individual);
        positions.clear();
        positions.add(new PositionImpl(3L, 100L));
        this.persistentSubscription.acknowledgeMessage(this.txnID1, positions, PulsarApi.CommandAck.AckType.Cumulative);
        this.persistentSubscription.commitTxn(this.txnID1, Collections.emptyMap());
        ((ManagedCursorImpl)Mockito.verify((Object)this.cursorMock, (VerificationMode)Mockito.times((int)1))).asyncDelete((Iterable)ArgumentMatchers.any(List.class), (AsyncCallbacks.DeleteCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        ((ManagedCursorImpl)Mockito.verify((Object)this.cursorMock, (VerificationMode)Mockito.times((int)1))).asyncMarkDelete((Position)ArgumentMatchers.any(), (Map)ArgumentMatchers.any(Map.class), (AsyncCallbacks.MarkDeleteCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
    }

    @Test
    public void testCanAcknowledgeAndAbortForTransaction() throws TransactionConflictException, BrokerServiceException {
        ArrayList<PositionImpl> positions = new ArrayList<PositionImpl>();
        positions.add(new PositionImpl(2L, 1L));
        positions.add(new PositionImpl(2L, 3L));
        positions.add(new PositionImpl(2L, 5L));
        Position[] expectedSinglePositions = new Position[]{new PositionImpl(3L, 1L), new PositionImpl(3L, 3L), new PositionImpl(3L, 5L)};
        ((ManagedCursorImpl)Mockito.doAnswer(invocationOnMock -> {
            Assert.assertTrue((boolean)Arrays.deepEquals(((List)invocationOnMock.getArguments()[0]).toArray(), expectedSinglePositions));
            ((AsyncCallbacks.DeleteCallback)invocationOnMock.getArguments()[1]).deleteComplete(invocationOnMock.getArguments()[2]);
            return null;
        }).when((Object)this.cursorMock)).asyncDelete((Iterable)ArgumentMatchers.any(List.class), (AsyncCallbacks.DeleteCallback)ArgumentMatchers.any(AsyncCallbacks.DeleteCallback.class), ArgumentMatchers.any());
        ((Consumer)Mockito.doReturn((Object)PulsarApi.CommandSubscribe.SubType.Exclusive).when((Object)this.consumerMock)).subType();
        this.persistentSubscription.addConsumer(this.consumerMock);
        this.persistentSubscription.acknowledgeMessage(this.txnID1, positions, PulsarApi.CommandAck.AckType.Individual);
        positions.clear();
        positions.add(new PositionImpl(1L, 100L));
        this.persistentSubscription.acknowledgeMessage(this.txnID1, positions, PulsarApi.CommandAck.AckType.Cumulative);
        positions.clear();
        positions.add(new PositionImpl(2L, 1L));
        try {
            this.persistentSubscription.acknowledgeMessage(this.txnID2, positions, PulsarApi.CommandAck.AckType.Individual);
            Assert.fail((String)"Single acknowledge for transaction2 should fail. ");
        }
        catch (TransactionConflictException e) {
            Assert.assertEquals((String)e.getMessage(), (String)"[persistent://prop/use/ns-abc/successTopic][subscriptionName] Transaction:(1,2) try to ack message:2:1 in pending ack status.");
        }
        positions.clear();
        positions.add(new PositionImpl(2L, 50L));
        try {
            this.persistentSubscription.acknowledgeMessage(this.txnID2, positions, PulsarApi.CommandAck.AckType.Cumulative);
            Assert.fail((String)"Cumulative acknowledge for transaction2 should fail. ");
        }
        catch (TransactionConflictException e) {
            System.out.println(e.getMessage());
            Assert.assertEquals((String)e.getMessage(), (String)"[persistent://prop/use/ns-abc/successTopic][subscriptionName] Transaction:(1,2) try to cumulative ack message while transaction:(1,1) already cumulative acked messages.");
        }
        positions.clear();
        positions.add(new PositionImpl(1L, 1L));
        positions.add(new PositionImpl(1L, 3L));
        positions.add(new PositionImpl(1L, 5L));
        positions.add(new PositionImpl(3L, 1L));
        positions.add(new PositionImpl(3L, 3L));
        positions.add(new PositionImpl(3L, 5L));
        this.persistentSubscription.acknowledgeMessage(positions, PulsarApi.CommandAck.AckType.Individual, Collections.emptyMap());
        this.persistentSubscription.abortTxn(this.txnID1, this.consumerMock);
        positions.clear();
        positions.add(new PositionImpl(2L, 50L));
        this.persistentSubscription.acknowledgeMessage(this.txnID2, positions, PulsarApi.CommandAck.AckType.Cumulative);
        positions.clear();
        positions.add(new PositionImpl(2L, 1L));
        this.persistentSubscription.acknowledgeMessage(this.txnID2, positions, PulsarApi.CommandAck.AckType.Individual);
    }
}

