package com.gemstone.gemfire.cache.hdfs.internal;

import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.class */
public class SignalledFlushObserver implements FlushObserver {
    private final List<FlushLatch> signals = new ArrayList();
    private final AtomicLong eventsReceived = new AtomicLong(0);
    private final AtomicLong eventsDelivered = new AtomicLong(0);

    /* loaded from: input_file:com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver$FlushLatch.class */
    private static class FlushLatch extends CountDownLatch {
        private final long seqnum;

        public FlushLatch(long j) {
            super(1);
            this.seqnum = j;
        }

        public long getSequence() {
            return this.seqnum;
        }
    }

    @Override // com.gemstone.gemfire.cache.hdfs.internal.FlushObserver
    public boolean shouldDrainImmediately() {
        boolean z;
        synchronized (this.signals) {
            z = !this.signals.isEmpty();
        }
        return z;
    }

    @Override // com.gemstone.gemfire.cache.hdfs.internal.FlushObserver
    public FlushObserver.AsyncFlushResult flush() {
        FlushLatch flushLatch;
        FlushObserver.AsyncFlushResult asyncFlushResult;
        long j = this.eventsReceived.get();
        synchronized (this.signals) {
            if (j <= this.eventsDelivered.get()) {
                flushLatch = null;
            } else {
                flushLatch = new FlushLatch(j);
                this.signals.add(flushLatch);
            }
            final FlushLatch flushLatch2 = flushLatch;
            asyncFlushResult = new FlushObserver.AsyncFlushResult() { // from class: com.gemstone.gemfire.cache.hdfs.internal.SignalledFlushObserver.1
                @Override // com.gemstone.gemfire.cache.hdfs.internal.FlushObserver.AsyncFlushResult
                public boolean waitForFlush(long j2, TimeUnit timeUnit) throws InterruptedException {
                    if (flushLatch2 == null) {
                        return true;
                    }
                    return flushLatch2.await(j2, timeUnit);
                }
            };
        }
        return asyncFlushResult;
    }

    public void push() {
        this.eventsReceived.incrementAndGet();
    }

    public void pop(int i) {
        long addAndGet = this.eventsDelivered.addAndGet(i);
        synchronized (this.signals) {
            ListIterator<FlushLatch> listIterator = this.signals.listIterator();
            while (listIterator.hasNext()) {
                FlushLatch next = listIterator.next();
                if (next.getSequence() <= addAndGet) {
                    next.countDown();
                    listIterator.remove();
                }
            }
        }
    }

    public void clear() {
        synchronized (this.signals) {
            Iterator<FlushLatch> it = this.signals.iterator();
            while (it.hasNext()) {
                it.next().countDown();
            }
            this.signals.clear();
            this.eventsReceived.set(0L);
            this.eventsDelivered.set(0L);
        }
    }
}
