package com.gemstone.gemfire.internal.cache.wan.parallel;

import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.class */
public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor {
    private static final Logger logger = LogService.getLogger();
    final int index;
    final int nDispatcher;

    protected ParallelGatewaySenderEventProcessor(AbstractGatewaySender abstractGatewaySender) {
        super(LoggingThreadGroup.createThreadGroup("Event Processor for GatewaySender_" + abstractGatewaySender.getId(), logger), "Event Processor for GatewaySender_" + abstractGatewaySender.getId(), abstractGatewaySender);
        this.index = 0;
        this.nDispatcher = 1;
        initializeMessageQueue(abstractGatewaySender.getId());
        setDaemon(true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ParallelGatewaySenderEventProcessor(AbstractGatewaySender abstractGatewaySender, Set<Region> set, int i, int i2) {
        super(LoggingThreadGroup.createThreadGroup("Event Processor for GatewaySender_" + abstractGatewaySender.getId(), logger), "Event Processor for GatewaySender_" + abstractGatewaySender.getId() + PartitionedRegion.BUCKET_NAME_SEPARATOR + i, abstractGatewaySender);
        this.index = i;
        this.nDispatcher = i2;
        initializeMessageQueue(abstractGatewaySender.getId());
        setDaemon(true);
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    protected void initializeMessageQueue(String str) {
        HashSet hashSet = new HashSet();
        for (LocalRegion localRegion : ((GemFireCacheImpl) this.sender.getCache()).getApplicationRegions()) {
            if (localRegion.getAllGatewaySenderIds().contains(str)) {
                hashSet.add(localRegion);
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("The target Regions are(PGSEP) {}", new Object[]{hashSet});
        }
        ParallelGatewaySenderQueue hDFSParallelGatewaySenderQueue = this.sender.getIsHDFSQueue() ? new HDFSParallelGatewaySenderQueue(this.sender, hashSet, this.index, this.nDispatcher) : new ParallelGatewaySenderQueue(this.sender, hashSet, this.index, this.nDispatcher);
        hDFSParallelGatewaySenderQueue.start();
        this.queue = hDFSParallelGatewaySenderQueue;
        if (hDFSParallelGatewaySenderQueue.localSize() > 0) {
            hDFSParallelGatewaySenderQueue.notifyEventProcessorIfRequired();
        }
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void enqueueEvent(EnumListenerEvent enumListenerEvent, EntryEvent entryEvent, Object obj) throws IOException, CacheException {
        GatewaySenderEventImpl gatewaySenderEventImpl = null;
        if (!(entryEvent.getRegion() instanceof DistributedRegion) && ((EntryEventImpl) entryEvent).getTailKey().longValue() == -1) {
            if (logger.isDebugEnabled()) {
                logger.debug("ParallelGatewaySenderEventProcessor not enqueing the following event since tailKey is not set. {}", new Object[]{entryEvent});
                return;
            }
            return;
        }
        try {
            EventID eventId = ((EntryEventImpl) entryEvent).getEventId();
            gatewaySenderEventImpl = !this.sender.getIsHDFSQueue() ? new GatewaySenderEventImpl(enumListenerEvent, entryEvent, obj, true, eventId.getBucketID()) : new HDFSGatewayEventImpl(enumListenerEvent, entryEvent, obj, true, eventId.getBucketID());
            if (getSender().beforeEnqueue(gatewaySenderEventImpl)) {
                long startTime = getSender().getStatistics().startTime();
                try {
                    this.queue.put(gatewaySenderEventImpl);
                    gatewaySenderEventImpl = null;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                getSender().getStatistics().endPut(startTime);
            } else {
                if (logger.isDebugEnabled()) {
                    logger.debug("The Event {} is filtered.", new Object[]{gatewaySenderEventImpl});
                }
                getSender().getStatistics().incEventsFiltered();
            }
        } finally {
            if (gatewaySenderEventImpl != null) {
                gatewaySenderEventImpl.release();
            }
        }
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void clear(PartitionedRegion partitionedRegion, int i) {
        ((ParallelGatewaySenderQueue) this.queue).clear(partitionedRegion, i);
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void notifyEventProcessorIfRequired(int i) {
        ((ParallelGatewaySenderQueue) this.queue).notifyEventProcessorIfRequired();
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int i) {
        return ((ParallelGatewaySenderQueue) this.queue).getBucketToTempQueueMap().get(Integer.valueOf(i));
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public PartitionedRegion getRegion(String str) {
        return ((ParallelGatewaySenderQueue) this.queue).getRegion(str);
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void removeShadowPR(String str) {
        ((ParallelGatewaySenderQueue) this.queue).removeShadowPR(str);
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void conflateEvent(Conflatable conflatable, int i, Long l) {
        ((ParallelGatewaySenderQueue) this.queue).conflateEvent(conflatable, i, l);
    }

    public HDFSGatewayEventImpl get(PartitionedRegion partitionedRegion, byte[] bArr, int i) throws ForceReattemptException {
        return ((HDFSParallelGatewaySenderQueue) this.queue).get(partitionedRegion, bArr, i);
    }

    public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion partitionedRegion, int i) throws ForceReattemptException {
        return ((HDFSParallelGatewaySenderQueue) this.queue).getBucketRegionQueue(partitionedRegion, i);
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void addShadowPartitionedRegionForUserPR(PartitionedRegion partitionedRegion) {
        ((ParallelGatewaySenderQueue) this.queue).addShadowPartitionedRegionForUserPR(partitionedRegion);
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void addShadowPartitionedRegionForUserRR(DistributedRegion distributedRegion) {
        ((ParallelGatewaySenderQueue) this.queue).addShadowPartitionedRegionForUserRR(distributedRegion);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void rebalance() {
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void initializeEventDispatcher() {
        if (logger.isDebugEnabled()) {
            logger.debug(" Creating the GatewayEventCallbackDispatcher");
        }
        this.dispatcher = new GatewaySenderEventCallbackDispatcher(this);
    }
}
