package com.hazelcast.ringbuffer.impl;

import com.hazelcast.config.Config;
import com.hazelcast.config.InMemoryFormat;
import com.hazelcast.config.RingbufferConfig;
import com.hazelcast.core.IFunction;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.map.EntryProcessorOffloadableBouncingNodesTest;
import com.hazelcast.ringbuffer.OverflowPolicy;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.StaleSequenceException;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.TestThread;
import com.hazelcast.test.annotation.NightlyTest;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({NightlyTest.class})
/* loaded from: input_file:com/hazelcast/ringbuffer/impl/RingbufferAddAllReadManyStressTest.class */
public class RingbufferAddAllReadManyStressTest extends HazelcastTestSupport {
    private static final int MAX_BATCH = 100;
    private Ringbuffer<Long> ringbuffer;
    private final ILogger logger = Logger.getLogger(RingbufferAddAllReadManyStressTest.class);
    private final AtomicBoolean stop = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hazelcast/ringbuffer/impl/RingbufferAddAllReadManyStressTest$ConsumeThread.class */
    public class ConsumeThread extends TestThread {
        private final ILogger logger;
        volatile long seq;
        long lastLogMs;

        public ConsumeThread(int i) {
            super("ConsumeThread-" + i);
            this.logger = Logger.getLogger(ConsumeThread.class);
            this.lastLogMs = 0L;
        }

        @Override // com.hazelcast.test.TestThread
        public void onError(Throwable th) {
            RingbufferAddAllReadManyStressTest.this.stop.set(true);
        }

        @Override // com.hazelcast.test.TestThread
        public void doRun() throws Throwable {
            this.seq = RingbufferAddAllReadManyStressTest.this.ringbuffer.headSequence();
            Random random = new Random();
            while (true) {
                int max = Math.max(1, random.nextInt(100));
                ReadResultSet<Long> readResultSet = null;
                while (readResultSet == null) {
                    try {
                        readResultSet = (ReadResultSet) RingbufferAddAllReadManyStressTest.this.ringbuffer.readManyAsync(this.seq, 1, max, (IFunction) null).get();
                    } catch (ExecutionException e) {
                        if (!(e.getCause() instanceof StaleSequenceException)) {
                            throw e;
                        }
                        this.logger.info(getName() + " has fallen behind, catching up...");
                        long tailSequence = RingbufferAddAllReadManyStressTest.this.ringbuffer.tailSequence();
                        long headSequence = RingbufferAddAllReadManyStressTest.this.ringbuffer.headSequence();
                        this.seq = tailSequence >= headSequence ? (tailSequence + headSequence) / 2 : headSequence;
                    }
                }
                for (Long l : readResultSet) {
                    if (l.equals(Long.MIN_VALUE)) {
                        return;
                    }
                    Assert.assertEquals(new Long(this.seq), l);
                    long currentTimeMillis = System.currentTimeMillis();
                    if (this.lastLogMs + TimeUnit.SECONDS.toMillis(5L) < currentTimeMillis) {
                        this.lastLogMs = currentTimeMillis;
                        this.logger.info(getName() + " at " + this.seq);
                    }
                    this.seq++;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hazelcast/ringbuffer/impl/RingbufferAddAllReadManyStressTest$ProduceThread.class */
    public class ProduceThread extends TestThread {
        private final ILogger logger;
        private volatile long produced;
        Random random;
        long lastLogMs;

        public ProduceThread() {
            super("ProduceThread");
            this.logger = Logger.getLogger(ProduceThread.class);
            this.random = new Random();
            this.lastLogMs = 0L;
        }

        @Override // com.hazelcast.test.TestThread
        public void onError(Throwable th) {
            RingbufferAddAllReadManyStressTest.this.stop.set(true);
        }

        @Override // com.hazelcast.test.TestThread
        public void doRun() throws Throwable {
            while (!RingbufferAddAllReadManyStressTest.this.stop.get()) {
                addAll(makeBatch());
            }
            RingbufferAddAllReadManyStressTest.this.ringbuffer.add(Long.MIN_VALUE);
        }

        private LinkedList<Long> makeBatch() {
            int max = Math.max(1, this.random.nextInt(100));
            LinkedList<Long> linkedList = new LinkedList<>();
            for (int i = 0; i < max; i++) {
                linkedList.add(Long.valueOf(this.produced));
                this.produced++;
                long currentTimeMillis = System.currentTimeMillis();
                if (this.lastLogMs + TimeUnit.SECONDS.toMillis(5L) < currentTimeMillis) {
                    this.lastLogMs = currentTimeMillis;
                    this.logger.info(getName() + " at " + this.produced);
                }
            }
            return linkedList;
        }

        private void addAll(LinkedList<Long> linkedList) throws InterruptedException, ExecutionException {
            long j = 100;
            while (((Long) RingbufferAddAllReadManyStressTest.this.ringbuffer.addAllAsync(linkedList, OverflowPolicy.FAIL).get()).longValue() == -1) {
                this.logger.info("Backoff");
                TimeUnit.MILLISECONDS.sleep(j);
                j *= 2;
                if (j > 1000) {
                    j = 1000;
                }
            }
        }
    }

    @After
    public void tearDown() {
        if (this.ringbuffer != null) {
            this.ringbuffer.destroy();
        }
    }

    @Test
    public void whenNoTTL() throws Exception {
        test(new RingbufferConfig("rb").setCapacity(20000000).setInMemoryFormat(InMemoryFormat.OBJECT).setTimeToLiveSeconds(0));
    }

    @Test
    public void whenTTLEnabled() throws Exception {
        test(new RingbufferConfig("rb").setCapacity(200000).setTimeToLiveSeconds(2));
    }

    @Test
    public void whenLongTTLAndSmallBuffer() throws Exception {
        test(new RingbufferConfig("rb").setCapacity(EntryProcessorOffloadableBouncingNodesTest.COUNT_ENTRIES).setTimeToLiveSeconds(30));
    }

    @Test
    public void whenShortTTLAndBigBuffer() throws Exception {
        test(new RingbufferConfig("rb").setInMemoryFormat(InMemoryFormat.OBJECT).setCapacity(20000000).setTimeToLiveSeconds(2));
    }

    public void test(RingbufferConfig ringbufferConfig) throws Exception {
        Config config = new Config();
        config.addRingBufferConfig(ringbufferConfig);
        this.ringbuffer = createHazelcastInstanceFactory(2).newInstances(config)[0].getRingbuffer(ringbufferConfig.getName());
        ConsumeThread consumeThread = new ConsumeThread(1);
        consumeThread.start();
        ConsumeThread consumeThread2 = new ConsumeThread(2);
        consumeThread2.start();
        sleepSeconds(2);
        ProduceThread produceThread = new ProduceThread();
        produceThread.start();
        sleepAndStop(this.stop, 60L);
        this.logger.info("Waiting for completion");
        produceThread.assertSucceedsEventually();
        consumeThread.assertSucceedsEventually();
        consumeThread2.assertSucceedsEventually();
        this.logger.info(produceThread.getName() + " produced:" + produceThread.produced);
        Assert.assertEquals(produceThread.produced, consumeThread.seq);
        Assert.assertEquals(produceThread.produced, consumeThread2.seq);
    }
}
