package org.apache.geode.internal.cache.wan.parallel;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.class */
public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
    private static final Logger logger = LogService.getLogger();
    private Map<String, Map<Integer, List<Object>>> regionToDispatchedKeysMap;

    public ParallelQueueRemovalMessage() {
    }

    public ParallelQueueRemovalMessage(Map<String, Map<Integer, List<Object>>> map) {
        this.regionToDispatchedKeysMap = map;
    }

    public int getDSFID() {
        return 2161;
    }

    @Override // org.apache.geode.distributed.internal.DistributionMessage
    public String toString() {
        return getShortClassName() + "regionToDispatchedKeysMap=" + this.regionToDispatchedKeysMap + " sender=" + mo236getSender();
    }

    @Override // org.apache.geode.distributed.internal.DistributionMessage
    protected void process(ClusterDistributionManager clusterDistributionManager) {
        boolean isDebugEnabled = logger.isDebugEnabled();
        InternalCache cache = clusterDistributionManager.getCache();
        if (cache != null) {
            LocalRegion.InitializationLevel threadInitLevelRequirement = LocalRegion.setThreadInitLevelRequirement(LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE);
            try {
                for (String str : this.regionToDispatchedKeysMap.keySet()) {
                    PartitionedRegion partitionedRegion = (PartitionedRegion) cache.getRegion(str);
                    if (partitionedRegion != null) {
                        AbstractGatewaySender parallelGatewaySender = partitionedRegion.getParallelGatewaySender();
                        Map<Integer, List<Object>> map = this.regionToDispatchedKeysMap.get(str);
                        for (Integer num : map.keySet()) {
                            String str2 = "/__PR/" + partitionedRegion.getBucketName(num.intValue());
                            AbstractBucketRegionQueue abstractBucketRegionQueue = (AbstractBucketRegionQueue) cache.getInternalRegionByPath(str2);
                            if (isDebugEnabled) {
                                logger.debug("ParallelQueueRemovalMessage : The bucket in the cache is bucketRegionName : {} bucket: {}", str2, abstractBucketRegionQueue);
                            }
                            List<Object> list = map.get(num);
                            if (list != null) {
                                for (Object obj : list) {
                                    parallelGatewaySender.removeFromTempQueueEvents(obj);
                                    if (abstractBucketRegionQueue == null) {
                                        destroyFromTempQueue(partitionedRegion, num.intValue(), obj);
                                    } else if (abstractBucketRegionQueue.isInitialized()) {
                                        if (isDebugEnabled) {
                                            logger.debug("ParallelQueueRemovalMessage : The bucket {} is initialized. Destroying the key {} from BucketRegionQueue.", str2, obj);
                                        }
                                        afterAckForSecondary_EventInBucket(parallelGatewaySender, abstractBucketRegionQueue, obj);
                                        destroyKeyFromBucketQueue(abstractBucketRegionQueue, obj, partitionedRegion);
                                    } else {
                                        if (isDebugEnabled) {
                                            logger.debug("ParallelQueueRemovalMessage : The bucket {} is not yet initialized.", str2);
                                        }
                                        abstractBucketRegionQueue.getInitializationLock().readLock().lock();
                                        try {
                                            if (abstractBucketRegionQueue.containsKey(obj)) {
                                                afterAckForSecondary_EventInBucket(parallelGatewaySender, abstractBucketRegionQueue, obj);
                                                destroyKeyFromBucketQueue(abstractBucketRegionQueue, obj, partitionedRegion);
                                            }
                                            destroyFromTempQueue(abstractBucketRegionQueue.getPartitionedRegion(), num.intValue(), obj);
                                            abstractBucketRegionQueue.addToFailedBatchRemovalMessageKeys(obj);
                                            abstractBucketRegionQueue.getInitializationLock().readLock().unlock();
                                        } catch (Throwable th) {
                                            abstractBucketRegionQueue.getInitializationLock().readLock().unlock();
                                            throw th;
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            } finally {
                LocalRegion.setThreadInitLevelRequirement(threadInitLevelRequirement);
            }
        }
    }

    private void afterAckForSecondary_EventInBucket(AbstractGatewaySender abstractGatewaySender, AbstractBucketRegionQueue abstractBucketRegionQueue, Object obj) {
        for (GatewayEventFilter gatewayEventFilter : abstractGatewaySender.getGatewayEventFilters()) {
            GatewayQueueEvent gatewayQueueEvent = (GatewayQueueEvent) abstractBucketRegionQueue.get(obj);
            if (gatewayQueueEvent != null) {
                try {
                    gatewayEventFilter.afterAcknowledgement(gatewayQueueEvent);
                } catch (Exception e) {
                    logger.fatal(String.format("Exception occurred while handling call to %s.afterAcknowledgement for event %s:", gatewayEventFilter.toString(), gatewayQueueEvent), e);
                }
            }
        }
    }

    void destroyKeyFromBucketQueue(AbstractBucketRegionQueue abstractBucketRegionQueue, Object obj, PartitionedRegion partitionedRegion) {
        boolean isDebugEnabled = logger.isDebugEnabled();
        try {
            abstractBucketRegionQueue.destroyKey(obj);
            if (!abstractBucketRegionQueue.getBucketAdvisor().isPrimary()) {
                partitionedRegion.getParallelGatewaySender().getStatistics().decSecondaryQueueSize();
                partitionedRegion.getParallelGatewaySender().getStatistics().incEventsProcessedByPQRM(1);
            }
            if (isDebugEnabled) {
                logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", obj, partitionedRegion.getName(), Integer.valueOf(abstractBucketRegionQueue.getId()));
            }
        } catch (CancelException e) {
        } catch (EntryNotFoundException e2) {
            if (isDebugEnabled) {
                logger.debug("Got EntryNotFoundException while destroying the key {} for bucket {}", obj, Integer.valueOf(abstractBucketRegionQueue.getId()));
            }
            if (abstractBucketRegionQueue.isFailedBatchRemovalMessageKeysClearedFlag()) {
                return;
            }
            abstractBucketRegionQueue.addToFailedBatchRemovalMessageKeys(obj);
        } catch (CacheException e3) {
            logger.error(String.format("ParallelQueueRemovalMessage::process:Exception in processing the last disptached key for a ParallelGatewaySenderQueue's shadowPR. The problem is with key,%s for shadowPR with name=%s", obj, partitionedRegion.getName()), e3);
        } catch (ForceReattemptException e4) {
            if (isDebugEnabled) {
                logger.debug("Got ForceReattemptException while getting bucket {} to destroyLocally the keys.", Integer.valueOf(abstractBucketRegionQueue.getId()));
            }
        }
    }

    private boolean destroyFromTempQueue(PartitionedRegion partitionedRegion, int i, Object obj) {
        BlockingQueue<GatewaySenderEventImpl> bucketTmpQueue;
        boolean z = false;
        Set<RegionQueue> queues = partitionedRegion.getParallelGatewaySender().getQueues();
        if (queues != null && (bucketTmpQueue = ((ConcurrentParallelGatewaySenderQueue) queues.toArray()[0]).getBucketTmpQueue(i)) != null) {
            Iterator it = bucketTmpQueue.iterator();
            while (it.hasNext()) {
                GatewaySenderEventImpl gatewaySenderEventImpl = (GatewaySenderEventImpl) it.next();
                afterAckForSecondary_EventInTempQueue(partitionedRegion.getParallelGatewaySender(), gatewaySenderEventImpl);
                if (gatewaySenderEventImpl.getShadowKey().equals(obj)) {
                    it.remove();
                    gatewaySenderEventImpl.release();
                    z = true;
                }
            }
        }
        return z;
    }

    private void afterAckForSecondary_EventInTempQueue(AbstractGatewaySender abstractGatewaySender, GatewaySenderEventImpl gatewaySenderEventImpl) {
        for (GatewayEventFilter gatewayEventFilter : abstractGatewaySender.getGatewayEventFilters()) {
            if (gatewaySenderEventImpl != null) {
                try {
                    gatewayEventFilter.afterAcknowledgement(gatewaySenderEventImpl);
                } catch (Exception e) {
                    logger.fatal(String.format("Exception occurred while handling call to %s.afterAcknowledgement for event %s:", gatewayEventFilter.toString(), gatewaySenderEventImpl), e);
                }
            }
        }
    }

    @Override // org.apache.geode.distributed.internal.DistributionMessage
    public void toData(DataOutput dataOutput, SerializationContext serializationContext) throws IOException {
        super.toData(dataOutput, serializationContext);
        DataSerializer.writeHashMap(this.regionToDispatchedKeysMap, dataOutput);
    }

    @Override // org.apache.geode.distributed.internal.DistributionMessage
    public void fromData(DataInput dataInput, DeserializationContext deserializationContext) throws IOException, ClassNotFoundException {
        super.fromData(dataInput, deserializationContext);
        this.regionToDispatchedKeysMap = DataSerializer.readHashMap(dataInput);
    }
}
