package com.gemstone.gemfire.cache.hdfs.internal;

import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.internal.SystemTimer;
import com.gemstone.gemfire.internal.cache.ColocationHelper;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

/* loaded from: input_file:com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.class */
public class HDFSParallelGatewaySenderQueue extends ParallelGatewaySenderQueue {
    private int currentBucketIndex;
    private int elementsPeekedAcrossBuckets;
    private SystemTimer rollListTimer;
    public static final String ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP = "gemfire.ROLL_SORTED_LIST_TIME_INTERVAL_MS";
    private final int ROLL_SORTED_LIST_TIME_INTERVAL_MS;

    /* loaded from: input_file:com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue$RollSortedListsTimerTask.class */
    class RollSortedListsTimerTask extends SystemTimer.SystemTimerTask {
        RollSortedListsTimerTask() {
        }

        @Override // com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask
        public void run2() {
            for (PartitionedRegion partitionedRegion : HDFSParallelGatewaySenderQueue.this.getRegions()) {
                Iterator it = new ArrayList(partitionedRegion.getDataStore().getAllLocalPrimaryBucketIds()).iterator();
                while (it.hasNext()) {
                    Integer num = (Integer) it.next();
                    HDFSBucketRegionQueue hDFSBucketRegionQueue = (HDFSBucketRegionQueue) partitionedRegion.getDataStore().getLocalBucketById(num);
                    if (hDFSBucketRegionQueue != null) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Rolling over the list for bucket id: " + num);
                        }
                        hDFSBucketRegionQueue.rolloverSkipList();
                    }
                }
            }
        }
    }

    public HDFSParallelGatewaySenderQueue(AbstractGatewaySender abstractGatewaySender, Set<Region> set, int i, int i2) {
        super(abstractGatewaySender, set, i, i2);
        this.currentBucketIndex = 0;
        this.elementsPeekedAcrossBuckets = 0;
        this.rollListTimer = null;
        this.ROLL_SORTED_LIST_TIME_INTERVAL_MS = Integer.getInteger(ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP, 3000).intValue();
        if (abstractGatewaySender.getBucketSorted() && this.index == 0) {
            this.rollListTimer = new SystemTimer(abstractGatewaySender.getCache().getDistributedSystem(), true);
            this.rollListTimer.scheduleAtFixedRate(new RollSortedListsTimerTask(), this.ROLL_SORTED_LIST_TIME_INTERVAL_MS, this.ROLL_SORTED_LIST_TIME_INTERVAL_MS);
        }
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue, com.gemstone.gemfire.internal.cache.RegionQueue
    public Object peek() throws InterruptedException, CacheException {
        throw new UnsupportedOperationException();
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue
    public void cleanUp() {
        super.cleanUp();
        cancelRollListTimer();
    }

    private void cancelRollListTimer() {
        if (this.rollListTimer != null) {
            this.rollListTimer.cancel();
            this.rollListTimer = null;
        }
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue, com.gemstone.gemfire.internal.cache.RegionQueue
    public List peek(int i, int i2) throws InterruptedException, CacheException {
        ArrayList arrayList = new ArrayList();
        int i3 = i * 1024 * 1024;
        PartitionedRegion randomShadowPR = getRandomShadowPR();
        if (randomShadowPR == null || randomShadowPR.getLocalMaxMemory() == 0) {
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            blockProcesorThreadIfRequired();
            return arrayList;
        }
        ArrayList arrayList2 = new ArrayList(randomShadowPR.getDataStore().getAllLocalPrimaryBucketIds());
        ArrayList arrayList3 = new ArrayList();
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            if (num.intValue() % this.nDispatcher == this.index) {
                arrayList3.add(num);
            }
        }
        if (this.resetLastPeeked) {
            int i4 = -1;
            boolean z = true;
            Iterator it2 = this.peekedEvents.iterator();
            while (it2.hasNext()) {
                HDFSGatewayEventImpl hDFSGatewayEventImpl = (HDFSGatewayEventImpl) it2.next();
                if (i4 != hDFSGatewayEventImpl.getBucketId()) {
                    z = arrayList3.contains(Integer.valueOf(hDFSGatewayEventImpl.getBucketId()));
                    i4 = hDFSGatewayEventImpl.getBucketId();
                }
                if (z) {
                    arrayList.add(hDFSGatewayEventImpl);
                } else {
                    it2.remove();
                }
            }
            this.resetLastPeeked = false;
        }
        if (arrayList3.size() == 0) {
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
            return arrayList;
        }
        if (this.sender.getBucketSorted()) {
        }
        Integer valueOf = Integer.valueOf(getCurrentBucketIndex(arrayList3.size()));
        if (valueOf.intValue() == 0 && getAndresetElementsPeekedAcrossBuckets() == 0) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
            }
        }
        HDFSBucketRegionQueue hDFSBucketRegionQueue = (HDFSBucketRegionQueue) randomShadowPR.getDataStore().getLocalBucketById((Integer) arrayList3.get(valueOf.intValue()));
        if (hDFSBucketRegionQueue == null) {
            return arrayList;
        }
        if (hDFSBucketRegionQueue.totalEntries() == 0) {
            blockProcesorThreadIfRequired();
            return arrayList;
        }
        long currentTimeMillis = System.currentTimeMillis();
        long queueSizeInBytes = hDFSBucketRegionQueue.getQueueSizeInBytes();
        if (currentTimeMillis - hDFSBucketRegionQueue.getLastPeekTimeInMillis() > i2 || queueSizeInBytes > i3 || hDFSBucketRegionQueue.shouldDrainImmediately()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Peeking queue " + hDFSBucketRegionQueue.getId() + ": bucketSizeInBytes " + queueSizeInBytes + ":  batchSizeInBytes" + i3 + ":  timeToWait" + i2 + ":  (currentTimeInMillis - hrq.getLastPeekTimeInMillis())" + (currentTimeMillis - hDFSBucketRegionQueue.getLastPeekTimeInMillis()));
            }
            ArrayList peekAhead = peekAhead(((Integer) arrayList3.get(valueOf.intValue())).intValue(), hDFSBucketRegionQueue);
            if (peekAhead != null && peekAhead.size() != 0) {
                Iterator it3 = peekAhead.iterator();
                while (it3.hasNext()) {
                    Object next = it3.next();
                    arrayList.add(next);
                    this.peekedEvents.add((HDFSGatewayEventImpl) next);
                }
            }
        } else {
            blockProcesorThreadIfRequired();
        }
        if (logger.isDebugEnabled() && arrayList.size() > 0) {
            logger.debug(this + ":  Peeked a batch of " + arrayList.size() + " entries");
        }
        setElementsPeekedAcrossBuckets(arrayList.size());
        return arrayList;
    }

    private int getCurrentBucketIndex(int i) {
        int i2 = this.currentBucketIndex;
        if (i2 >= i) {
            this.currentBucketIndex = 0;
            i2 = 0;
        }
        this.currentBucketIndex++;
        return i2;
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue, com.gemstone.gemfire.internal.cache.RegionQueue
    public void remove(int i) throws CacheException {
        int i2 = 0;
        HDFSGatewayEventImpl hDFSGatewayEventImpl = null;
        if (this.peekedEvents.size() > 0) {
            hDFSGatewayEventImpl = (HDFSGatewayEventImpl) this.peekedEvents.remove();
        }
        while (hDFSGatewayEventImpl != null && i2 < i) {
            Region<?, ?> region = hDFSGatewayEventImpl.getRegion();
            int bucketId = hDFSGatewayEventImpl.getBucketId();
            int bucketId2 = hDFSGatewayEventImpl.getBucketId();
            ArrayList<HDFSGatewayEventImpl> arrayList = new ArrayList<>();
            ArrayList arrayList2 = new ArrayList();
            while (bucketId2 == bucketId) {
                arrayList.add(hDFSGatewayEventImpl);
                arrayList2.add(hDFSGatewayEventImpl.getShadowKey());
                i2++;
                if (this.peekedEvents.size() == 0 || i2 >= i) {
                    hDFSGatewayEventImpl = null;
                    break;
                }
                hDFSGatewayEventImpl = (HDFSGatewayEventImpl) this.peekedEvents.remove();
                bucketId2 = hDFSGatewayEventImpl.getBucketId();
                if (!this.sender.isRunning()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("ParallelGatewaySenderQueue#remove: Cache is closing down. Ignoring remove request.");
                        return;
                    }
                    return;
                }
            }
            try {
                HDFSBucketRegionQueue bucketRegionQueue = getBucketRegionQueue((PartitionedRegion) region, bucketId);
                if (bucketRegionQueue != null) {
                    bucketRegionQueue.destroyKeys(arrayList);
                    addRemovedEvents(bucketRegionQueue.getPartitionedRegion(), bucketId, arrayList2);
                }
            } catch (EntryNotFoundException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("ParallelGatewaySenderQueue#remove: Got EntryNotFoundException for " + this + " for bucket = " + bucketId2);
                }
            } catch (ForceReattemptException e2) {
                if (logger.isDebugEnabled()) {
                    logger.debug("ParallelGatewaySenderQueue#remove: Got ForceReattemptException for " + this + " for bucket = " + bucketId2);
                }
            }
        }
    }

    private void setElementsPeekedAcrossBuckets(int i) {
        this.elementsPeekedAcrossBuckets += i;
    }

    private int getAndresetElementsPeekedAcrossBuckets() {
        int i = this.elementsPeekedAcrossBuckets;
        this.elementsPeekedAcrossBuckets = 0;
        return i;
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue, com.gemstone.gemfire.internal.cache.RegionQueue
    public void remove() throws CacheException {
        throw new UnsupportedOperationException("Method HDFSParallelGatewaySenderQueue#remove is not supported");
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue, com.gemstone.gemfire.internal.cache.RegionQueue
    public void put(Object obj) throws InterruptedException, CacheException {
        super.put(obj);
    }

    protected ArrayList peekAhead(int i, HDFSBucketRegionQueue hDFSBucketRegionQueue) throws CacheException {
        if (logger.isDebugEnabled()) {
            logger.debug(this + ": Peekahead for the bucket " + i);
        }
        ArrayList peekABatch = hDFSBucketRegionQueue.peekABatch();
        if (logger.isDebugEnabled() && peekABatch != null) {
            logger.debug(this + ": Peeked" + peekABatch.size() + "objects from bucket " + i);
        }
        return peekABatch;
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue, com.gemstone.gemfire.internal.cache.RegionQueue
    public Object take() {
        throw new UnsupportedOperationException("take() is not supported for " + HDFSParallelGatewaySenderQueue.class.toString());
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue
    protected boolean isUsedForHDFS() {
        return true;
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue
    protected void afterRegionAdd(PartitionedRegion partitionedRegion) {
    }

    public HDFSGatewayEventImpl get(PartitionedRegion partitionedRegion, byte[] bArr, int i) throws ForceReattemptException {
        try {
            HDFSBucketRegionQueue bucketRegionQueue = getBucketRegionQueue(partitionedRegion, i);
            if (bucketRegionQueue == null) {
                return null;
            }
            return bucketRegionQueue.getObjectForRegionKey(partitionedRegion, bArr);
        } catch (EntryNotFoundException e) {
            if (!logger.isDebugEnabled()) {
                return null;
            }
            logger.debug("HDFSParallelGatewaySenderQueue#get: Got EntryNotFoundException for " + this + " for bucket = " + i);
            return null;
        }
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue
    public void clear(PartitionedRegion partitionedRegion, int i) {
        try {
            HDFSBucketRegionQueue bucketRegionQueue = getBucketRegionQueue(partitionedRegion, i);
            if (bucketRegionQueue == null) {
                return;
            }
            bucketRegionQueue.clear();
        } catch (ForceReattemptException e) {
        }
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue
    public int size(PartitionedRegion partitionedRegion, int i) throws ForceReattemptException {
        return getBucketRegionQueue(partitionedRegion, i).size();
    }

    public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion partitionedRegion, int i) throws ForceReattemptException {
        PartitionedRegion leaderRegion = ColocationHelper.getLeaderRegion(partitionedRegion);
        if (leaderRegion == null) {
            return null;
        }
        PartitionedRegion partitionedRegion2 = this.userRegionNameToshadowPRMap.get(leaderRegion.getFullPath());
        if (partitionedRegion2 == null) {
            return null;
        }
        if (((HDFSBucketRegionQueue) partitionedRegion2.getDataStore().getLocalBucketById(Integer.valueOf(i))) == null) {
            partitionedRegion2.getRegionAdvisor().waitForLocalBucketStorage(i);
        }
        return (HDFSBucketRegionQueue) partitionedRegion2.getDataStore().getInitializedBucketForId(null, Integer.valueOf(i));
    }
}
