package org.apache.geode.internal.cache.wan;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.geode.CancelException;
import org.apache.geode.GemFireException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderQueue;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.pdx.internal.PeerTypeRegistration;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.class */
public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread implements GatewaySenderEventProcessor {
    protected RegionQueue queue;
    protected GatewaySenderEventDispatcher dispatcher;
    protected final AbstractGatewaySender sender;
    protected int batchId;
    private volatile boolean isStopped;
    protected volatile boolean isPaused;
    protected boolean isDispatcherWaiting;
    protected final Object pausedLock;
    private final Object runningStateLock;
    protected boolean eventQueueSizeWarning;
    private Exception exception;
    private final ThreadsMonitoring threadMonitoring;
    private Map<Integer, List<GatewaySenderEventImpl>[]> batchIdToEventsMap;
    private Map<Integer, List<GatewaySenderEventImpl>> batchIdToPDXEventsMap;
    private List<GatewaySenderEventImpl> pdxSenderEventsList;
    private Map<Object, GatewaySenderEventImpl> pdxEventsMap;
    private volatile boolean rebuildPdxList;
    private volatile boolean resetLastPeekedEvents;
    private long numEventsDispatched;
    private int batchSize;
    private final ConcurrentHashMap<Integer, long[]> failureLogInterval;
    private static final Logger logger = LogService.getLogger();
    protected static final int FAILURE_MAP_MAXSIZE = Integer.getInteger("gemfire.GatewaySender.FAILURE_MAP_MAXSIZE", 1000000).intValue();
    protected static final int FAILURE_LOG_MAX_INTERVAL = Integer.getInteger("gemfire.GatewaySender.FAILURE_LOG_MAX_INTERVAL", 300000).intValue();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor$ConflationKey.class */
    public static class ConflationKey {
        private Object key;
        private Operation operation;
        private String regionName;
        private EventID eventId;

        private ConflationKey(String str, Object obj, Operation operation) {
            this(str, obj, operation, (EventID) null);
        }

        private ConflationKey(String str, Object obj, Operation operation, EventID eventID) {
            this.key = obj;
            this.operation = operation;
            this.regionName = str;
            this.eventId = eventID;
        }

        public int hashCode() {
            return (31 * ((31 * ((31 * ((31 * 1) + this.key.hashCode())) + this.operation.hashCode())) + this.regionName.hashCode())) + (this.eventId == null ? 0 : this.eventId.hashCode());
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ConflationKey conflationKey = (ConflationKey) obj;
            return this.regionName.equals(conflationKey.regionName) && this.key.equals(conflationKey.key) && this.operation.equals(conflationKey.operation) && Objects.equals(this.eventId, conflationKey.eventId);
        }
    }

    /* loaded from: input_file:org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor$SenderStopperCallable.class */
    protected class SenderStopperCallable implements Callable<Boolean> {
        private final AbstractGatewaySenderEventProcessor p;

        public SenderStopperCallable(AbstractGatewaySenderEventProcessor abstractGatewaySenderEventProcessor) {
            this.p = abstractGatewaySenderEventProcessor;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() {
            this.p.stopProcessing();
            return true;
        }
    }

    public AbstractGatewaySenderEventProcessor(String str, GatewaySender gatewaySender, ThreadsMonitoring threadsMonitoring) {
        super(str);
        this.batchId = 0;
        this.isStopped = true;
        this.isPaused = false;
        this.isDispatcherWaiting = false;
        this.pausedLock = new Object();
        this.runningStateLock = new Object();
        this.eventQueueSizeWarning = false;
        this.batchIdToEventsMap = Collections.synchronizedMap(new HashMap());
        this.batchIdToPDXEventsMap = Collections.synchronizedMap(new HashMap());
        this.pdxSenderEventsList = new ArrayList();
        this.pdxEventsMap = new HashMap();
        this.rebuildPdxList = false;
        this.failureLogInterval = new ConcurrentHashMap<>();
        this.sender = (AbstractGatewaySender) gatewaySender;
        this.batchSize = gatewaySender.getBatchSize();
        this.threadMonitoring = threadsMonitoring;
    }

    public Object getRunningStateLock() {
        return this.runningStateLock;
    }

    @Override // org.apache.geode.internal.cache.wan.GatewaySenderEventProcessor
    public int getTotalQueueSize() {
        return getQueue().size();
    }

    protected abstract void initializeMessageQueue(String str);

    public abstract void enqueueEvent(EnumListenerEvent enumListenerEvent, EntryEvent entryEvent, Object obj) throws IOException, CacheException;

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void rebalance();

    public boolean isStopped() {
        return this.isStopped;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setIsStopped(boolean z) {
        if (!z) {
            this.isStopped = z;
        } else {
            this.isStopped = true;
            this.failureLogInterval.clear();
        }
    }

    public boolean isPaused() {
        return this.isPaused;
    }

    public RegionQueue getQueue() {
        return this.queue;
    }

    public void incrementBatchId() {
        if (this.batchId + 1 == Integer.MAX_VALUE) {
            this.batchId = -1;
        }
        this.batchId++;
    }

    protected void resetBatchId() {
        this.batchId = 0;
        this.resetLastPeekedEvents = true;
    }

    protected int getBatchSize() {
        return this.batchSize;
    }

    protected void setBatchSize(int i) {
        int i2 = this.batchSize;
        if (i <= 0) {
            this.batchSize = 1;
            logger.warn("Attempting to set the batch size from {} to {} events failed. Instead it was set to 1.", new Object[]{Integer.valueOf(i2), Integer.valueOf(i)});
        } else {
            this.batchSize = i;
            logger.info("Set the batch size from {} to {} events", new Object[]{Integer.valueOf(i2), Integer.valueOf(this.batchSize)});
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getBatchId() {
        return this.batchId;
    }

    protected boolean isConnectionReset() {
        return this.resetLastPeekedEvents;
    }

    protected void eventQueueRemove(int i) throws CacheException {
        this.queue.remove(i);
    }

    protected Object eventQueueTake() throws CacheException, InterruptedException {
        throw new UnsupportedOperationException();
    }

    public int eventQueueSize() {
        if (this.queue == null) {
            return 0;
        }
        return this.queue instanceof ConcurrentParallelGatewaySenderQueue ? ((ConcurrentParallelGatewaySenderQueue) this.queue).localSize() : this.queue.size();
    }

    public int secondaryEventQueueSize() {
        if (this.queue == null) {
            return 0;
        }
        return this.queue instanceof ConcurrentParallelGatewaySenderQueue ? ((ConcurrentParallelGatewaySenderQueue) this.queue).localSize(true) - ((ConcurrentParallelGatewaySenderQueue) this.queue).localSize(false) : this.queue.size();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void registerEventDroppedInPrimaryQueue(EntryEventImpl entryEventImpl);

    public AbstractGatewaySender getSender() {
        return this.sender;
    }

    public void pauseDispatching() {
        if (this.isPaused) {
            return;
        }
        this.isPaused = true;
    }

    public void waitForDispatcherToPause() {
        if (!this.isPaused) {
            throw new IllegalStateException("Should be trying to pause!");
        }
        boolean z = false;
        synchronized (this.pausedLock) {
            while (!this.isDispatcherWaiting && !isStopped() && this.sender.getSenderAdvisor().isPrimary()) {
                try {
                    this.pausedLock.wait();
                } catch (InterruptedException e) {
                    z = true;
                }
            }
        }
        if (z) {
            Thread.currentThread().interrupt();
        }
    }

    public void resumeDispatching() {
        if (this.isPaused) {
            this.isPaused = false;
            if (logger.isDebugEnabled()) {
                logger.debug("{}: Resumed dispatching", this);
            }
            synchronized (this.pausedLock) {
                this.pausedLock.notifyAll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean stopped() {
        return this.isStopped || this.sender.getStopper().isCancelInProgress();
    }

    public boolean skipFailureLogging(Integer num) {
        boolean z = false;
        if (this.failureLogInterval.size() < FAILURE_MAP_MAXSIZE) {
            long[] jArr = this.failureLogInterval.get(num);
            if (jArr == null) {
                jArr = this.failureLogInterval.putIfAbsent(num, new long[]{System.currentTimeMillis(), 1000});
            }
            if (jArr != null) {
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis - jArr[0] < jArr[1]) {
                    z = true;
                } else {
                    jArr[0] = currentTimeMillis;
                    if (jArr[1] <= FAILURE_LOG_MAX_INTERVAL / 4) {
                        long[] jArr2 = jArr;
                        jArr2[1] = jArr2[1] * 4;
                    }
                }
            }
        }
        return z;
    }

    public boolean removeEventFromFailureMap(Integer num) {
        return this.failureLogInterval.remove(num) != null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v163, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r7v0, types: [org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor] */
    public void processQueue() {
        BucketRegion localBucketById;
        boolean isDebugEnabled = logger.isDebugEnabled();
        boolean isTraceEnabled = logger.isTraceEnabled();
        int batchTimeInterval = this.sender.getBatchTimeInterval();
        GatewaySenderStats statistics = this.sender.getStatistics();
        if (isDebugEnabled) {
            logger.debug("STARTED processQueue {}", Long.valueOf(getId()));
        }
        List<GatewaySenderEventImpl> list = null;
        new ArrayList();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        while (!stopped()) {
            try {
                if (this.isPaused) {
                    waitForResumption();
                }
                if (isDebugEnabled) {
                    logger.debug("Attempting to peek a batch of {} events", Integer.valueOf(this.batchSize));
                }
                while (true) {
                    if (!stopped()) {
                        if (this.isPaused) {
                            waitForResumption();
                        }
                        boolean shouldSendVersionEvents = shouldSendVersionEvents(this.dispatcher);
                        boolean interrupted = Thread.interrupted();
                        try {
                            try {
                                if (this.resetLastPeekedEvents) {
                                    resetLastPeekedEvents();
                                    this.resetLastPeekedEvents = false;
                                }
                                list = this.queue.peek(this.batchSize, batchTimeInterval);
                                if (interrupted) {
                                    Thread.currentThread().interrupt();
                                }
                                if (!list.isEmpty()) {
                                    beforeExecute();
                                    try {
                                        ArrayList<GatewaySenderEventImpl> arrayList3 = new ArrayList(list);
                                        if (this.exception != null && this.exception.getCause() != null && (this.exception.getCause() instanceof IllegalStateException)) {
                                            Iterator it = arrayList3.iterator();
                                            while (it.hasNext()) {
                                                if (((GatewaySenderEventImpl) it.next()).isSerializedValueNotAvailable()) {
                                                    it.remove();
                                                }
                                            }
                                            this.exception = null;
                                        }
                                        for (GatewayEventFilter gatewayEventFilter : this.sender.getGatewayEventFilters()) {
                                            Iterator it2 = arrayList3.iterator();
                                            while (it2.hasNext()) {
                                                GatewayQueueEvent gatewayQueueEvent = (GatewayQueueEvent) it2.next();
                                                if (!shouldSendVersionEvents && gatewayQueueEvent.getOperation() == Operation.UPDATE_VERSION_STAMP) {
                                                    if (isTraceEnabled) {
                                                        logger.trace("Update Event Version event: {} removed from Gateway Sender queue: {}", gatewayQueueEvent, this.sender);
                                                    }
                                                    it2.remove();
                                                    statistics.incEventsNotQueued();
                                                } else if (!gatewayEventFilter.beforeTransmit(gatewayQueueEvent)) {
                                                    if (isDebugEnabled) {
                                                        logger.debug("{}: Did not transmit event due to filtering: {}", this.sender.getId(), gatewayQueueEvent);
                                                    }
                                                    it2.remove();
                                                    statistics.incEventsFiltered();
                                                }
                                            }
                                        }
                                        Iterator it3 = arrayList3.iterator();
                                        while (it3.hasNext()) {
                                            GatewaySenderEventImpl gatewaySenderEventImpl = (GatewaySenderEventImpl) it3.next();
                                            if (gatewaySenderEventImpl.isConcurrencyConflict()) {
                                                it3.remove();
                                                logger.debug("The CME event: {} is removed from Gateway Sender queue: {}", gatewaySenderEventImpl, this.sender);
                                                statistics.incEventsNotQueued();
                                            }
                                        }
                                        if (getSender().isParallel() && (getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)) {
                                            for (GatewaySenderEventImpl gatewaySenderEventImpl2 : arrayList3) {
                                                PartitionedRegion region = getQueue() instanceof ConcurrentParallelGatewaySenderQueue ? ((ConcurrentParallelGatewaySenderQueue) getQueue()).getRegion(gatewaySenderEventImpl2.getRegionPath()) : ((ParallelGatewaySenderQueue) getQueue()).getRegion(gatewaySenderEventImpl2.getRegionPath());
                                                int bucketId = gatewaySenderEventImpl2.getBucketId();
                                                if (region != null && ((localBucketById = region.getDataStore().getLocalBucketById(Integer.valueOf(bucketId))) == null || !localBucketById.getBucketAdvisor().isPrimary())) {
                                                    gatewaySenderEventImpl2.setPossibleDuplicate(true);
                                                    if (isDebugEnabled) {
                                                        logger.debug("Bucket id: {} is no longer primary on this node. The event: {} will be dispatched from this node with possibleDuplicate set to true.", Integer.valueOf(bucketId), gatewaySenderEventImpl2);
                                                    }
                                                }
                                            }
                                        }
                                        arrayList2.clear();
                                        if (!(this.dispatcher instanceof GatewaySenderEventCallbackDispatcher)) {
                                            this.batchIdToEventsMap.put(Integer.valueOf(getBatchId()), new List[]{list, arrayList3});
                                            arrayList = addPDXEvent();
                                            arrayList2.addAll(arrayList);
                                            if (!arrayList.isEmpty()) {
                                                this.batchIdToPDXEventsMap.put(Integer.valueOf(getBatchId()), arrayList);
                                            }
                                        }
                                        arrayList2.addAll(arrayList3);
                                        List conflate = conflate(arrayList2);
                                        if (isDebugEnabled) {
                                            logBatchFine("During normal processing, dispatching the following ", conflate);
                                        }
                                        boolean dispatchBatch = this.dispatcher.dispatchBatch(conflate, this.sender.isRemoveFromQueueOnException(), false);
                                        if (dispatchBatch) {
                                            if (isDebugEnabled) {
                                                logger.debug("During normal processing, successfully dispatched {} events (batch #{})", Integer.valueOf(conflate.size()), Integer.valueOf(getBatchId()));
                                            }
                                            removeEventFromFailureMap(Integer.valueOf(getBatchId()));
                                        } else if (!skipFailureLogging(Integer.valueOf(getBatchId()))) {
                                            logger.warn("During normal processing, unsuccessfully dispatched {} events (batch #{})", new Object[]{Integer.valueOf(arrayList3.size()), Integer.valueOf(getBatchId())});
                                        }
                                        if (stopped()) {
                                            afterExecute();
                                            break;
                                        }
                                        if (dispatchBatch) {
                                            if (this.dispatcher instanceof GatewaySenderEventCallbackDispatcher) {
                                                handleSuccessfulBatchDispatch(conflate, list);
                                            } else {
                                                incrementBatchId();
                                            }
                                            Iterator it4 = arrayList.iterator();
                                            while (it4.hasNext()) {
                                                ((GatewaySenderEventImpl) it4.next()).isDispatched = true;
                                            }
                                            increaseNumEventsDispatched(conflate.size());
                                        } else if (this.dispatcher instanceof GatewaySenderEventCallbackDispatcher) {
                                            handleUnSuccessfulBatchDispatch(list);
                                            this.resetLastPeekedEvents = true;
                                        } else {
                                            handleUnSuccessfulBatchDispatch(list);
                                            if (!this.resetLastPeekedEvents) {
                                                while (!this.dispatcher.dispatchBatch(conflate, this.sender.isRemoveFromQueueOnException(), true)) {
                                                    if (isDebugEnabled) {
                                                        logger.debug("During normal processing, unsuccessfully dispatched {} events (batch #{})", Integer.valueOf(conflate.size()), Integer.valueOf(getBatchId()));
                                                    }
                                                    if (stopped() || this.resetLastPeekedEvents) {
                                                        break;
                                                    }
                                                    try {
                                                        if (this.threadMonitoring != null) {
                                                            this.threadMonitoring.updateThreadStatus();
                                                        }
                                                        Thread.sleep(100L);
                                                    } catch (InterruptedException e) {
                                                        Thread.currentThread().interrupt();
                                                    }
                                                }
                                                incrementBatchId();
                                            }
                                        }
                                        if (logger.isDebugEnabled()) {
                                            logger.debug("Finished processing events (batch #{})", Integer.valueOf(getBatchId() - 1));
                                        }
                                        afterExecute();
                                    } catch (Throwable th) {
                                        afterExecute();
                                        throw th;
                                    }
                                }
                            } catch (InterruptedException e2) {
                                this.sender.getCancelCriterion().checkCancelInProgress(e2);
                                if (1 != 0) {
                                    Thread.currentThread().interrupt();
                                }
                            }
                        } catch (Throwable th2) {
                            if (interrupted) {
                                Thread.currentThread().interrupt();
                            }
                            throw th2;
                        }
                    } else if (isDebugEnabled) {
                        logger.debug("GatewaySenderEventProcessor is stopped. Returning without peeking events.");
                    }
                }
            } catch (VirtualMachineError e3) {
                SystemFailure.initiateFailure(e3);
                throw e3;
            } catch (CancelException e4) {
                logger.debug("Caught cancel exception", e4);
                setIsStopped(true);
            } catch (RegionDestroyedException e5) {
                this.resetLastPeekedEvents = true;
                if (logger.isDebugEnabled()) {
                    logger.debug("Observed RegionDestroyedException on Queue's region.");
                }
            } catch (Throwable th3) {
                SystemFailure.checkFailure();
                if (stopped()) {
                    return;
                }
                if (list != null) {
                    handleUnSuccessfulBatchDispatch(list);
                }
                this.resetLastPeekedEvents = true;
                if (!(th3 instanceof GatewaySenderException) || (!(th3.getCause() instanceof IOException) && !(th3 instanceof GatewaySenderConfigurationException))) {
                    logger.warn("An Exception occurred. The dispatcher will continue.", th3);
                }
            }
        }
    }

    private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher gatewaySenderEventDispatcher) {
        return false;
    }

    public List conflate(List<GatewaySenderEventImpl> list) {
        List<GatewaySenderEventImpl> list2;
        if (!this.sender.isBatchConflationEnabled() || list.size() <= 1) {
            list2 = list;
        } else {
            if (logger.isDebugEnabled()) {
                logEvents("original", list);
            }
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            list2 = new ArrayList();
            for (GatewaySenderEventImpl gatewaySenderEventImpl : list) {
                if (gatewaySenderEventImpl.shouldBeConflated()) {
                    ConflationKey conflationKey = new ConflationKey(gatewaySenderEventImpl.getRegionPath(), gatewaySenderEventImpl.getKeyToConflate(), gatewaySenderEventImpl.getOperation());
                    if (!gatewaySenderEventImpl.equals((GatewaySenderEventImpl) linkedHashMap.get(conflationKey))) {
                        linkedHashMap.remove(conflationKey);
                        linkedHashMap.put(conflationKey, gatewaySenderEventImpl);
                    }
                } else {
                    linkedHashMap.put(new ConflationKey(gatewaySenderEventImpl.getRegionPath(), gatewaySenderEventImpl.getKeyToConflate(), gatewaySenderEventImpl.getOperation(), gatewaySenderEventImpl.getEventId()), gatewaySenderEventImpl);
                }
            }
            Iterator it = linkedHashMap.values().iterator();
            while (it.hasNext()) {
                list2.add((GatewaySenderEventImpl) it.next());
            }
            this.sender.getStatistics().incEventsConflatedFromBatches(list.size() - list2.size());
            if (logger.isDebugEnabled()) {
                logEvents("conflated", list2);
            }
        }
        return list2;
    }

    private void logEvents(String str, List<GatewaySenderEventImpl> list) {
        StringBuilder sb = new StringBuilder();
        sb.append("The batch contains the following ").append(list.size()).append(" ").append(str).append(" events:");
        Iterator<GatewaySenderEventImpl> it = list.iterator();
        while (it.hasNext()) {
            sb.append("\t\n").append(it.next().toSmallString());
        }
        logger.debug(sb);
    }

    private List<GatewaySenderEventImpl> addPDXEvent() throws IOException {
        ArrayList arrayList = new ArrayList();
        InternalCache cache = this.sender.getCache();
        Region region = cache.getRegion(PeerTypeRegistration.REGION_NAME);
        if (this.rebuildPdxList) {
            this.pdxEventsMap.clear();
            this.pdxSenderEventsList.clear();
            this.rebuildPdxList = false;
        }
        if (region != null && region.size() != this.pdxEventsMap.size()) {
            for (Map.Entry entry : region.entrySet()) {
                if (!this.pdxEventsMap.containsKey(entry.getKey())) {
                    EntryEventImpl create = EntryEventImpl.create((InternalRegion) region, Operation.UPDATE, entry.getKey(), entry.getValue(), (Object) null, false, (DistributedMember) cache.getMyId());
                    create.disallowOffHeapValues();
                    create.setEventId(new EventID(cache.getInternalDistributedSystem()));
                    ArrayList arrayList2 = new ArrayList();
                    Iterator<GatewaySender> it = cache.getGatewaySenders().iterator();
                    while (it.hasNext()) {
                        arrayList2.add(Integer.valueOf(it.next().getRemoteDSId()));
                    }
                    create.setCallbackArgument(new GatewaySenderEventCallbackArgument(create.getRawCallbackArgument(), this.sender.getMyDSId(), arrayList2));
                    GatewaySenderEventImpl gatewaySenderEventImpl = new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE, create, null);
                    this.pdxEventsMap.put(entry.getKey(), gatewaySenderEventImpl);
                    this.pdxSenderEventsList.add(gatewaySenderEventImpl);
                }
            }
        }
        Iterator<GatewaySenderEventImpl> it2 = this.pdxSenderEventsList.iterator();
        while (it2.hasNext()) {
            GatewaySenderEventImpl next = it2.next();
            if (next.isAcked) {
                it2.remove();
            } else if (!next.isDispatched) {
                arrayList.add(next);
            }
        }
        if (!arrayList.isEmpty() && logger.isDebugEnabled()) {
            logger.debug("List of PDX Event to be dispatched : {}", arrayList);
        }
        return arrayList;
    }

    public void checkIfPdxNeedsResend(int i) {
        Region region = this.sender.getCache().getRegion(PeerTypeRegistration.REGION_NAME);
        if (region == null || region.size() <= i) {
            return;
        }
        this.rebuildPdxList = true;
    }

    private void resetLastPeekedEvents() {
        this.batchIdToEventsMap.clear();
        Iterator<Map.Entry<Integer, List<GatewaySenderEventImpl>>> it = this.batchIdToPDXEventsMap.entrySet().iterator();
        while (it.hasNext()) {
            Iterator<GatewaySenderEventImpl> it2 = it.next().getValue().iterator();
            while (it2.hasNext()) {
                it2.next().isDispatched = false;
            }
        }
        this.batchIdToPDXEventsMap.clear();
        if (this.queue instanceof SerialGatewaySenderQueue) {
            ((SerialGatewaySenderQueue) this.queue).resetLastPeeked();
        } else {
            if (!(this.queue instanceof ParallelGatewaySenderQueue)) {
                throw new RuntimeException("resetLastPeekedEvents : no matching queue found " + this);
            }
            ((ParallelGatewaySenderQueue) this.queue).resetLastPeeked();
        }
    }

    private void handleSuccessfulBatchDispatch(List list, List list2) {
        if (list != null) {
            for (GatewayEventFilter gatewayEventFilter : this.sender.getGatewayEventFilters()) {
                for (Object obj : list) {
                    if (obj != null && (obj instanceof GatewaySenderEventImpl)) {
                        try {
                            gatewayEventFilter.afterAcknowledgement((GatewaySenderEventImpl) obj);
                        } catch (Exception e) {
                            logger.fatal(String.format("Exception occurred while handling call to %s.afterAcknowledgement for event %s:", gatewayEventFilter.toString(), obj), e);
                        }
                    }
                }
            }
        }
        list.clear();
        eventQueueRemove(list2.size());
        this.sender.getStatistics();
        int eventQueueSize = eventQueueSize();
        if (this.eventQueueSizeWarning && eventQueueSize <= AbstractGatewaySender.QUEUE_SIZE_THRESHOLD) {
            logger.info("The event queue size has dropped below {} events.", Integer.valueOf(AbstractGatewaySender.QUEUE_SIZE_THRESHOLD));
            this.eventQueueSizeWarning = false;
        }
        incrementBatchId();
    }

    private void handleUnSuccessfulBatchDispatch(List list) {
        this.sender.getStatistics().incBatchesRedistributed();
        Iterator it = list.iterator();
        while (it.hasNext() && !this.isStopped) {
            Object next = it.next();
            if (next != null && (next instanceof GatewaySenderEventImpl)) {
                ((GatewaySenderEventImpl) next).setPossibleDuplicate(true);
            }
        }
    }

    public void handleException() {
        this.sender.getStatistics().incBatchesRedistributed();
        this.resetLastPeekedEvents = true;
    }

    public void handleSuccessBatchAck(int i) {
        List<GatewaySenderEventImpl> remove = this.batchIdToPDXEventsMap.remove(Integer.valueOf(i));
        if (remove != null) {
            Iterator<GatewaySenderEventImpl> it = remove.iterator();
            while (it.hasNext()) {
                it.next().isAcked = true;
            }
        }
        List<GatewaySenderEventImpl>[] remove2 = this.batchIdToEventsMap.remove(Integer.valueOf(i));
        if (remove2 != null) {
            List<GatewaySenderEventImpl> list = remove2[1];
            for (GatewayEventFilter gatewayEventFilter : this.sender.getGatewayEventFilters()) {
                for (GatewaySenderEventImpl gatewaySenderEventImpl : list) {
                    try {
                        gatewayEventFilter.afterAcknowledgement(gatewaySenderEventImpl);
                    } catch (Exception e) {
                        logger.fatal(String.format("Exception occurred while handling call to %s.afterAcknowledgement for event %s:", gatewayEventFilter.toString(), gatewaySenderEventImpl), e);
                    }
                }
            }
            List<GatewaySenderEventImpl> list2 = remove2[0];
            if (logger.isDebugEnabled()) {
                logger.debug("Removing events from the queue {}", Integer.valueOf(list2.size()));
            }
            eventQueueRemove(list2.size());
            GatewaySenderStats statistics = this.sender.getStatistics();
            if (this.sender.getAlertThreshold() > 0) {
                long currentTimeMillis = System.currentTimeMillis();
                for (GatewaySenderEventImpl gatewaySenderEventImpl2 : list2) {
                    if (gatewaySenderEventImpl2 != null && (gatewaySenderEventImpl2 instanceof GatewaySenderEventImpl)) {
                        GatewaySenderEventImpl gatewaySenderEventImpl3 = gatewaySenderEventImpl2;
                        if (gatewaySenderEventImpl3.getCreationTime() + this.sender.getAlertThreshold() < currentTimeMillis) {
                            logger.warn("{} event for region={} key={} value={} was in the queue for {} milliseconds", new Object[]{gatewaySenderEventImpl3.getOperation(), gatewaySenderEventImpl3.getRegionPath(), gatewaySenderEventImpl3.getKey(), gatewaySenderEventImpl3.getValueAsString(true), Long.valueOf(currentTimeMillis - gatewaySenderEventImpl3.getCreationTime())});
                            statistics.incEventsExceedingAlertThreshold();
                        }
                    }
                }
            }
        }
    }

    public void handleUnSuccessBatchAck(int i) {
        this.sender.getStatistics().incBatchesRedistributed();
        List<GatewaySenderEventImpl>[] listArr = this.batchIdToEventsMap.get(Integer.valueOf(i));
        if (listArr != null) {
            Iterator<GatewaySenderEventImpl> it = listArr[0].iterator();
            while (it.hasNext() && !this.isStopped) {
                GatewaySenderEventImpl next = it.next();
                if (next != null && (next instanceof GatewaySenderEventImpl)) {
                    next.setPossibleDuplicate(true);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForResumption() throws InterruptedException {
        synchronized (this.pausedLock) {
            if (this.isPaused) {
                if (logger.isDebugEnabled()) {
                    logger.debug("GatewaySenderEventProcessor is paused. Waiting for Resumption");
                }
                this.isDispatcherWaiting = true;
                this.pausedLock.notifyAll();
                while (this.isPaused) {
                    this.pausedLock.wait();
                }
                this.isDispatcherWaiting = false;
            }
        }
    }

    public abstract void initializeEventDispatcher();

    public GatewaySenderEventDispatcher getDispatcher() {
        return this.dispatcher;
    }

    public Map<Integer, List<GatewaySenderEventImpl>[]> getBatchIdToEventsMap() {
        return this.batchIdToEventsMap;
    }

    public Map<Integer, List<GatewaySenderEventImpl>> getBatchIdToPDXEventsMap() {
        return this.batchIdToPDXEventsMap;
    }

    public void run() {
        try {
            setRunningStatus();
            processQueue();
        } catch (VirtualMachineError e) {
            SystemFailure.initiateFailure(e);
            throw e;
        } catch (CancelException e2) {
            if (isStopped()) {
                return;
            }
            logger.info("A cancellation occurred. Stopping the dispatcher.");
            setIsStopped(true);
        } catch (Throwable th) {
            SystemFailure.checkFailure();
            logger.fatal("Message dispatch failed due to unexpected exception..", th);
        }
    }

    public void setRunningStatus() throws Exception {
        GemFireException gemFireException = null;
        try {
            initializeEventDispatcher();
        } catch (GemFireException e) {
            gemFireException = e;
        }
        synchronized (this.runningStateLock) {
            if (gemFireException != null) {
                setException(gemFireException);
                setIsStopped(true);
            } else {
                setIsStopped(false);
            }
            this.runningStateLock.notifyAll();
        }
        if (gemFireException != null) {
            throw gemFireException;
        }
    }

    public void setException(GemFireException gemFireException) {
        this.exception = gemFireException;
    }

    public Exception getException() {
        return this.exception;
    }

    public void stopProcessing() {
        if (isAlive()) {
            resumeDispatching();
            if (logger.isDebugEnabled()) {
                logger.debug("{}: Notifying the dispatcher to terminate", this);
            }
            if (!this.sender.isPrimary()) {
                this.sender.getSenderAdvisor().notifyPrimaryLock();
            } else if (AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME == -1) {
                while (this.queue.size() != 0) {
                    try {
                        Thread.sleep(5000L);
                        if (logger.isDebugEnabled()) {
                            logger.debug("{}: Waiting for the queue to get empty.", this);
                        }
                    } catch (InterruptedException e) {
                    } catch (CancelException e2) {
                    }
                }
            } else {
                try {
                    Thread.sleep(AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME * 1000);
                } catch (InterruptedException e3) {
                }
            }
            setIsStopped(true);
            this.dispatcher.stop();
            if (isAlive()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: Joining with the dispatcher thread upto limit of 5 seconds", this);
                }
                try {
                    join(5000L);
                    if (isAlive()) {
                        logger.warn("{}:Dispatcher still alive even after join of 5 seconds.", this);
                        this.dispatcher.stop();
                        this.batchIdToEventsMap.clear();
                    }
                } catch (InterruptedException e4) {
                    Thread.currentThread().interrupt();
                    logger.warn("{}: InterruptedException in joining with dispatcher thread.", this);
                }
            }
            closeProcessor();
            if (logger.isDebugEnabled()) {
                logger.debug("Stopped dispatching: {}", this);
            }
        }
    }

    public void closeProcessor() {
        if (logger.isDebugEnabled()) {
            logger.debug("Closing dispatcher");
        }
        try {
            if (this.sender.isPrimary() && this.queue.size() > 0) {
                logger.warn("Destroying GatewayEventDispatcher with actively queued data.");
            }
            this.queue.close();
            if (logger.isDebugEnabled()) {
                logger.debug("Closed dispatcher");
            }
        } catch (CancelException e) {
            this.queue.close();
            if (logger.isDebugEnabled()) {
                logger.debug("Closed dispatcher");
            }
        } catch (CacheException e2) {
            this.queue.close();
            if (logger.isDebugEnabled()) {
                logger.debug("Closed dispatcher");
            }
        } catch (RegionDestroyedException e3) {
            this.queue.close();
            if (logger.isDebugEnabled()) {
                logger.debug("Closed dispatcher");
            }
        } catch (Throwable th) {
            this.queue.close();
            if (logger.isDebugEnabled()) {
                logger.debug("Closed dispatcher");
            }
            throw th;
        }
    }

    protected void destroyProcessor() {
        if (logger.isDebugEnabled()) {
            logger.debug("Destroying dispatcher");
        }
        try {
            if (this.queue.peek() != null) {
                logger.warn("Destroying GatewayEventDispatcher with actively queued data.");
            }
        } catch (InterruptedException e) {
        } catch (CacheException e2) {
            this.queue.getRegion().localDestroyRegion();
            if (logger.isDebugEnabled()) {
                logger.debug("Destroyed dispatcher");
                return;
            }
            return;
        } catch (Throwable th) {
            this.queue.getRegion().localDestroyRegion();
            if (logger.isDebugEnabled()) {
                logger.debug("Destroyed dispatcher");
            }
            throw th;
        }
        this.queue.getRegion().localDestroyRegion();
        if (logger.isDebugEnabled()) {
            logger.debug("Destroyed dispatcher");
        }
    }

    public void removeCacheListener() {
    }

    public void logBatchFine(String str, List<GatewaySenderEventImpl> list) {
        if (list != null) {
            StringBuffer stringBuffer = new StringBuffer();
            stringBuffer.append(str);
            stringBuffer.append(list.size()).append(" events");
            stringBuffer.append(" (batch #" + getBatchId());
            stringBuffer.append("):\n");
            for (GatewaySenderEventImpl gatewaySenderEventImpl : list) {
                stringBuffer.append("\tEvent ").append(gatewaySenderEventImpl.getEventId()).append(":");
                stringBuffer.append(gatewaySenderEventImpl.getKey()).append("->");
                stringBuffer.append(gatewaySenderEventImpl.getValueAsString(true)).append(",");
                stringBuffer.append(gatewaySenderEventImpl.getShadowKey());
                stringBuffer.append("\n");
            }
            logger.debug(stringBuffer);
        }
    }

    public long getNumEventsDispatched() {
        return this.numEventsDispatched;
    }

    public void increaseNumEventsDispatched(long j) {
        this.numEventsDispatched += j;
    }

    public void clear(PartitionedRegion partitionedRegion, int i) {
        ((ParallelGatewaySenderQueue) this.queue).clear(partitionedRegion, i);
    }

    public void notifyEventProcessorIfRequired(int i) {
        ((ParallelGatewaySenderQueue) this.queue).notifyEventProcessorIfRequired();
    }

    public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int i) {
        return ((ParallelGatewaySenderQueue) this.queue).getBucketToTempQueueMap().get(Integer.valueOf(i));
    }

    public PartitionedRegion getRegion(String str) {
        return ((ParallelGatewaySenderQueue) this.queue).getRegion(str);
    }

    public void removeShadowPR(String str) {
        ((ParallelGatewaySenderQueue) this.queue).removeShadowPR(str);
    }

    public void conflateEvent(Conflatable conflatable, int i, Long l) {
        ((ParallelGatewaySenderQueue) this.queue).conflateEvent(conflatable, i, l);
    }

    public void addShadowPartitionedRegionForUserPR(PartitionedRegion partitionedRegion) {
        ((ParallelGatewaySenderQueue) this.queue).addShadowPartitionedRegionForUserPR(partitionedRegion);
    }

    public void addShadowPartitionedRegionForUserRR(DistributedRegion distributedRegion) {
        ((ParallelGatewaySenderQueue) this.queue).addShadowPartitionedRegionForUserRR(distributedRegion);
    }

    protected void beforeExecute() {
        if (this.threadMonitoring != null) {
            this.threadMonitoring.startMonitor(ThreadsMonitoring.Mode.AGSExecutor);
        }
    }

    protected void afterExecute() {
        if (this.threadMonitoring != null) {
            this.threadMonitoring.endMonitor();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void enqueueEvent(GatewayQueueEvent gatewayQueueEvent);

    public String printUnprocessedEvents() {
        return null;
    }

    public String printUnprocessedTokens() {
        return null;
    }
}
