package com.hazelcast.ringbuffer.impl;

import com.hazelcast.config.Config;
import com.hazelcast.config.RingbufferConfig;
import com.hazelcast.ringbuffer.OverflowPolicy;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.StaleSequenceException;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.TestThread;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.topic.impl.reliable.ReliableTopicDestroyTest;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({NightlyTest.class})
/* loaded from: input_file:com/hazelcast/ringbuffer/impl/RingbufferAsyncAddWithBackoffStressTest.class */
public class RingbufferAsyncAddWithBackoffStressTest extends HazelcastTestSupport {
    private final AtomicBoolean stop = new AtomicBoolean();
    private Ringbuffer<Long> ringbuffer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hazelcast/ringbuffer/impl/RingbufferAsyncAddWithBackoffStressTest$ConsumeThread.class */
    public class ConsumeThread extends TestThread {
        volatile long seq;

        ConsumeThread(int i) {
            super("ConsumeThread-" + i);
        }

        @Override // com.hazelcast.test.TestThread
        public void onError(Throwable th) {
            RingbufferAsyncAddWithBackoffStressTest.this.stop.set(true);
        }

        @Override // com.hazelcast.test.TestThread
        public void doRun() throws Throwable {
            long currentTimeMillis = System.currentTimeMillis();
            this.seq = RingbufferAsyncAddWithBackoffStressTest.this.ringbuffer.headSequence();
            while (true) {
                Long l = null;
                while (l == null) {
                    try {
                        l = (Long) RingbufferAsyncAddWithBackoffStressTest.this.ringbuffer.readOne(this.seq);
                    } catch (StaleSequenceException e) {
                        System.out.println(getName() + " has fallen behind, catching up...");
                        long tailSequence = RingbufferAsyncAddWithBackoffStressTest.this.ringbuffer.tailSequence();
                        long headSequence = RingbufferAsyncAddWithBackoffStressTest.this.ringbuffer.headSequence();
                        this.seq = tailSequence >= headSequence ? (tailSequence + headSequence) / 2 : headSequence;
                    }
                }
                if (l.equals(Long.MIN_VALUE)) {
                    return;
                }
                Assert.assertEquals(Long.valueOf(this.seq), l);
                this.seq++;
                long currentTimeMillis2 = System.currentTimeMillis();
                if (currentTimeMillis2 > currentTimeMillis + 2000) {
                    currentTimeMillis = currentTimeMillis2;
                    System.out.println(getName() + " at " + this.seq);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hazelcast/ringbuffer/impl/RingbufferAsyncAddWithBackoffStressTest$ProduceThread.class */
    public class ProduceThread extends TestThread {
        private volatile long produced;

        ProduceThread() {
            super("ProduceThread");
        }

        @Override // com.hazelcast.test.TestThread
        public void onError(Throwable th) {
            RingbufferAsyncAddWithBackoffStressTest.this.stop.set(true);
        }

        @Override // com.hazelcast.test.TestThread
        public void doRun() throws Throwable {
            long currentTimeMillis = System.currentTimeMillis();
            while (!RingbufferAsyncAddWithBackoffStressTest.this.stop.get()) {
                long j = 100;
                while (((Long) RingbufferAsyncAddWithBackoffStressTest.this.ringbuffer.addAsync(Long.valueOf(this.produced), OverflowPolicy.FAIL).toCompletableFuture().get()).longValue() == -1) {
                    TimeUnit.MILLISECONDS.sleep(j);
                    j *= 2;
                    if (j > 1000) {
                        j = 1000;
                    }
                }
                this.produced++;
                long currentTimeMillis2 = System.currentTimeMillis();
                if (currentTimeMillis2 > currentTimeMillis + 2000) {
                    currentTimeMillis = currentTimeMillis2;
                    System.out.println(getName() + " at " + this.produced);
                }
            }
            RingbufferAsyncAddWithBackoffStressTest.this.ringbuffer.add(Long.MIN_VALUE);
        }
    }

    @Test
    public void whenNoTTL() throws Exception {
        test(new RingbufferConfig(ReliableTopicDestroyTest.RELIABLE_TOPIC_NAME).setCapacity(200000).setTimeToLiveSeconds(0));
    }

    @Test
    public void whenTTLEnabled() throws Exception {
        test(new RingbufferConfig(ReliableTopicDestroyTest.RELIABLE_TOPIC_NAME).setCapacity(200000).setTimeToLiveSeconds(2));
    }

    @Test
    public void whenLongTTLAndSmallBuffer() throws Exception {
        test(new RingbufferConfig(ReliableTopicDestroyTest.RELIABLE_TOPIC_NAME).setCapacity(1000).setTimeToLiveSeconds(30));
    }

    @Test(timeout = 900000)
    public void whenShortTTLAndBigBuffer() throws Exception {
        test(new RingbufferConfig(ReliableTopicDestroyTest.RELIABLE_TOPIC_NAME).setCapacity(10000000).setTimeToLiveSeconds(3));
    }

    public void test(RingbufferConfig ringbufferConfig) throws Exception {
        Config config = new Config();
        config.addRingBufferConfig(ringbufferConfig);
        this.ringbuffer = createHazelcastInstanceFactory(2).newInstances(config)[0].getRingbuffer(ringbufferConfig.getName());
        ConsumeThread consumeThread = new ConsumeThread(1);
        consumeThread.start();
        ConsumeThread consumeThread2 = new ConsumeThread(2);
        consumeThread2.start();
        sleepSeconds(2);
        ProduceThread produceThread = new ProduceThread();
        produceThread.start();
        sleepAndStop(this.stop, 180L);
        System.out.println("Waiting for completion");
        produceThread.assertSucceedsEventually();
        consumeThread.assertSucceedsEventually();
        consumeThread2.assertSucceedsEventually();
        System.out.println("producer.produced:" + produceThread.produced);
        Assert.assertEquals(produceThread.produced, consumeThread.seq);
        Assert.assertEquals(produceThread.produced, consumeThread2.seq);
    }
}
