package org.apache.geode.internal.cache.wan.serial;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.geode.GemFireException;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.cache.server.ClientSubscriptionConfig;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.class */
public class ConcurrentSerialGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor {
    private static final Logger logger = LogService.getLogger();
    protected final List<SerialGatewaySenderEventProcessor> processors;
    protected final AbstractGatewaySender sender;
    private GemFireException ex;
    private final Set<RegionQueue> queues;

    public ConcurrentSerialGatewaySenderEventProcessor(AbstractGatewaySender abstractGatewaySender, ThreadsMonitoring threadsMonitoring) {
        super("Event Processor for GatewaySender_" + abstractGatewaySender.getId(), abstractGatewaySender, threadsMonitoring);
        this.processors = new ArrayList();
        this.ex = null;
        this.sender = abstractGatewaySender;
        initializeMessageQueue(abstractGatewaySender.getId());
        this.queues = new HashSet();
        Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
        while (it.hasNext()) {
            this.queues.add(it.next().getQueue());
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor, org.apache.geode.internal.cache.wan.GatewaySenderEventProcessor
    public int getTotalQueueSize() {
        int i = 0;
        Iterator<RegionQueue> it = this.queues.iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    protected void initializeMessageQueue(String str) {
        for (int i = 0; i < this.sender.getDispatcherThreads(); i++) {
            this.processors.add(new SerialGatewaySenderEventProcessor(this.sender, str + ClientSubscriptionConfig.DEFAULT_OVERFLOW_DIRECTORY + i, getThreadMonitorObj()));
            if (logger.isDebugEnabled()) {
                logger.debug("Created the SerialGatewayEventProcessor_{}->{}", Integer.valueOf(i), this.processors.get(i));
            }
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public int eventQueueSize() {
        int i = 0;
        Iterator<RegionQueue> it = this.queues.iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void enqueueEvent(EnumListenerEvent enumListenerEvent, EntryEvent entryEvent, Object obj) throws IOException, CacheException {
        enqueueEvent(enumListenerEvent, entryEvent, obj, Math.abs(getHashCode((EntryEventImpl) entryEvent) % this.processors.size()));
    }

    public void setModifiedEventId(EntryEventImpl entryEventImpl, int i) {
        EventID eventId = entryEventImpl.getEventId();
        if (logger.isDebugEnabled()) {
            logger.debug("The original EventId is {}", eventId);
        }
        long createFakeThreadIDForParallelGateway = ThreadIdentifier.createFakeThreadIDForParallelGateway(i, eventId.getThreadID(), 0);
        EventID eventID = new EventID(eventId.getMembershipID(), createFakeThreadIDForParallelGateway, eventId.getSequenceID());
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Generated event id for event with key={}, index={}, original event id={}, threadId={}, new event id={}, newThreadId={}:index=" + this.sender.getEventIdIndex(), this, entryEventImpl.getKey(), Integer.valueOf(i), eventId, ThreadIdentifier.toDisplayString(eventId.getThreadID()), eventID, ThreadIdentifier.toDisplayString(createFakeThreadIDForParallelGateway));
        }
        entryEventImpl.setEventId(eventID);
    }

    public void enqueueEvent(EnumListenerEvent enumListenerEvent, EntryEvent entryEvent, Object obj, int i) throws CacheException, IOException {
        SerialGatewaySenderEventProcessor serialGatewaySenderEventProcessor = this.processors.get(i);
        if (this.sender.getOrderPolicy() != GatewaySender.OrderPolicy.KEY && this.sender.getOrderPolicy() != GatewaySender.OrderPolicy.PARTITION) {
            serialGatewaySenderEventProcessor.enqueueEvent(enumListenerEvent, entryEvent, obj);
            return;
        }
        EntryEventImpl entryEventImpl = new EntryEventImpl((EntryEventImpl) entryEvent);
        try {
            setModifiedEventId(entryEventImpl, i);
            serialGatewaySenderEventProcessor.enqueueEvent(enumListenerEvent, entryEventImpl, obj);
            entryEventImpl.release();
        } catch (Throwable th) {
            entryEventImpl.release();
            throw th;
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void run() {
        for (int i = 0; i < this.processors.size(); i++) {
            if (logger.isDebugEnabled()) {
                logger.debug("Starting the serialProcessor {}", Integer.valueOf(i));
            }
            this.processors.get(i).start();
        }
        try {
            waitForRunningStatus();
        } catch (GatewaySenderException e) {
            this.ex = e;
        }
        synchronized (getRunningStateLock()) {
            if (this.ex != null) {
                setException(this.ex);
                setIsStopped(true);
            } else {
                setIsStopped(false);
            }
            getRunningStateLock().notifyAll();
        }
        Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
        while (it.hasNext()) {
            try {
                it.next().join();
            } catch (InterruptedException e2) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Got InterruptedException while waiting for child threads to finish.");
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void rebalance() {
        throw new UnsupportedOperationException();
    }

    private void waitForRunningStatus() {
        for (SerialGatewaySenderEventProcessor serialGatewaySenderEventProcessor : this.processors) {
            synchronized (serialGatewaySenderEventProcessor.getRunningStateLock()) {
                while (serialGatewaySenderEventProcessor.getException() == null && serialGatewaySenderEventProcessor.isStopped()) {
                    try {
                        serialGatewaySenderEventProcessor.getRunningStateLock().wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                Exception exception = serialGatewaySenderEventProcessor.getException();
                if (exception != null) {
                    throw new GatewaySenderException(String.format("Could not start a gateway sender %s because of exception %s", this.sender.getId(), exception.getMessage()), exception.getCause());
                }
            }
        }
    }

    private int getHashCode(EntryEventImpl entryEventImpl) {
        int i = 0;
        switch (this.sender.getOrderPolicy()) {
            case KEY:
                i = entryEventImpl.getKey().hashCode();
                break;
            case THREAD:
                EventID eventId = entryEventImpl.getEventId();
                byte[] membershipID = eventId.getMembershipID();
                long threadID = eventId.getThreadID();
                i = Arrays.hashCode(membershipID) + ((int) (threadID ^ (threadID >>> 32)));
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: Generated hashcode for event with key={}, memberId={}, threadId={}: {}", this, entryEventImpl.getKey(), Arrays.toString(membershipID), Long.valueOf(threadID), Integer.valueOf(i));
                    break;
                }
                break;
            case PARTITION:
                i = PartitionRegionHelper.isPartitionedRegion(entryEventImpl.getRegion()) ? PartitionedRegionHelper.getHashKey(entryEventImpl) : entryEventImpl.getKey().hashCode();
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: Generated partition hashcode for event with key={}: {}", this, entryEventImpl.getKey(), Integer.valueOf(i));
                    break;
                }
                break;
        }
        return i;
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void stopProcessing() {
        if (isAlive()) {
            setIsStopped(true);
            ArrayList arrayList = new ArrayList();
            Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
            while (it.hasNext()) {
                arrayList.add(new AbstractGatewaySenderEventProcessor.SenderStopperCallable(it.next()));
            }
            ExecutorService newFixedThreadPool = LoggingExecutors.newFixedThreadPool("ConcurrentSerialGatewaySenderEventProcessor Stopper Thread", true, this.processors.size());
            try {
                Iterator it2 = newFixedThreadPool.invokeAll(arrayList).iterator();
                while (it2.hasNext()) {
                    try {
                        boolean booleanValue = ((Boolean) ((Future) it2.next()).get()).booleanValue();
                        if (logger.isDebugEnabled()) {
                            logger.debug("ConcurrentSerialGatewaySenderEventProcessor: {} stopped dispatching: {}", booleanValue ? "Successfully" : "Unsuccesfully", this);
                        }
                    } catch (ExecutionException e) {
                        logger.warn("GatewaySender {} caught exception while stopping: {}", new Object[]{this.sender, e.getCause()});
                    }
                }
                newFixedThreadPool.shutdown();
                closeProcessor();
                if (logger.isDebugEnabled()) {
                    logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Stopped dispatching: {}", this);
                }
            } catch (InterruptedException e2) {
                throw new InternalGemFireException(e2.getMessage());
            }
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void closeProcessor() {
        Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
        while (it.hasNext()) {
            it.next().closeProcessor();
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void pauseDispatching() {
        Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
        while (it.hasNext()) {
            it.next().pauseDispatching();
        }
        super.pauseDispatching();
        if (logger.isDebugEnabled()) {
            logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Paused dispatching: {}", this);
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void resumeDispatching() {
        Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
        while (it.hasNext()) {
            it.next().resumeDispatching();
        }
        super.resumeDispatching();
        if (logger.isDebugEnabled()) {
            logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Resumed dispatching: {}", this);
        }
    }

    public List<SerialGatewaySenderEventProcessor> getProcessors() {
        return new LinkedList(this.processors);
    }

    public Set<RegionQueue> getQueues() {
        return this.queues;
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void removeCacheListener() {
        Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
        while (it.hasNext()) {
            it.next().removeCacheListener();
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void waitForDispatcherToPause() {
        Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
        while (it.hasNext()) {
            it.next().waitForDispatcherToPause();
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public GatewaySenderEventDispatcher getDispatcher() {
        return this.processors.get(0).getDispatcher();
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void initializeEventDispatcher() {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void registerEventDroppedInPrimaryQueue(EntryEventImpl entryEventImpl) {
        getSender().setModifiedEventId(entryEventImpl);
        int abs = Math.abs(getHashCode(entryEventImpl) % this.processors.size());
        setModifiedEventId(entryEventImpl, abs);
        this.processors.get(abs).sendBatchDestroyOperationForDroppedEvent(entryEventImpl, abs);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void enqueueEvent(GatewayQueueEvent gatewayQueueEvent) {
        Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
        while (it.hasNext()) {
            it.next().enqueueEvent(gatewayQueueEvent);
        }
    }

    protected ThreadsMonitoring getThreadMonitorObj() {
        DistributionManager distributionManager = this.sender.getDistributionManager();
        if (distributionManager != null) {
            return distributionManager.getThreadMonitoring();
        }
        return null;
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public String printUnprocessedEvents() {
        return (String) this.processors.stream().map(serialGatewaySenderEventProcessor -> {
            return serialGatewaySenderEventProcessor.printUnprocessedEvents();
        }).collect(Collectors.joining(", "));
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public String printUnprocessedTokens() {
        return (String) this.processors.stream().map(serialGatewaySenderEventProcessor -> {
            return serialGatewaySenderEventProcessor.printUnprocessedTokens();
        }).collect(Collectors.joining(", "));
    }
}
