package org.apache.geode.internal.cache.wan.parallel;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import org.apache.geode.GemFireException;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.class */
public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor {
    protected static final Logger logger = LogService.getLogger();
    protected ParallelGatewaySenderEventProcessor[] processors;
    private GemFireException ex;
    final int nDispatcher;

    public ConcurrentParallelGatewaySenderEventProcessor(AbstractGatewaySender abstractGatewaySender) {
        super(LoggingThreadGroup.createThreadGroup("Event Processor for GatewaySender_" + abstractGatewaySender.getId()), "Event Processor for GatewaySender_" + abstractGatewaySender.getId(), abstractGatewaySender);
        this.ex = null;
        logger.info("ConcurrentParallelGatewaySenderEventProcessor: dispatcher threads {}", Integer.valueOf(abstractGatewaySender.getDispatcherThreads()));
        this.nDispatcher = abstractGatewaySender.getDispatcherThreads();
        Set<Region> hashSet = new HashSet<>();
        for (LocalRegion localRegion : ((GemFireCacheImpl) abstractGatewaySender.getCache()).getApplicationRegions()) {
            if (localRegion.getAllGatewaySenderIds().contains(abstractGatewaySender.getId())) {
                hashSet.add(localRegion);
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("The target PRs are {} Dispatchers: {}", hashSet, Integer.valueOf(this.nDispatcher));
        }
        createProcessors(abstractGatewaySender.getDispatcherThreads(), hashSet);
        this.queue = new ConcurrentParallelGatewaySenderQueue(abstractGatewaySender, this.processors);
        setDaemon(true);
    }

    protected void createProcessors(int i, Set<Region> set) {
        this.processors = new ParallelGatewaySenderEventProcessor[this.sender.getDispatcherThreads()];
        if (logger.isDebugEnabled()) {
            logger.debug("Creating AsyncEventProcessor");
        }
        for (int i2 = 0; i2 < this.sender.getDispatcherThreads(); i2++) {
            this.processors[i2] = new ParallelGatewaySenderEventProcessor(this.sender, set, i2, this.sender.getDispatcherThreads());
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    protected void initializeMessageQueue(String str) {
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void enqueueEvent(EnumListenerEvent enumListenerEvent, EntryEvent entryEvent, Object obj) throws IOException, CacheException {
        entryEvent.getRegion();
        int bucketID = ((EntryEventImpl) entryEvent).getEventId().getBucketID();
        if (bucketID < 0) {
            return;
        }
        this.processors[bucketID % this.nDispatcher].enqueueEvent(enumListenerEvent, entryEvent, obj);
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor, java.lang.Thread, java.lang.Runnable
    public void run() {
        boolean isDebugEnabled = logger.isDebugEnabled();
        for (int i = 0; i < this.processors.length; i++) {
            if (isDebugEnabled) {
                logger.debug("Starting the ParallelProcessors {}", Integer.valueOf(i));
            }
            this.processors[i].start();
        }
        try {
            waitForRunningStatus();
        } catch (GatewaySenderException e) {
            this.ex = e;
        }
        synchronized (this.runningStateLock) {
            if (this.ex != null) {
                setException(this.ex);
                setIsStopped(true);
            } else {
                setIsStopped(false);
            }
            this.runningStateLock.notifyAll();
        }
        for (ParallelGatewaySenderEventProcessor parallelGatewaySenderEventProcessor : this.processors) {
            try {
                parallelGatewaySenderEventProcessor.join();
            } catch (InterruptedException e2) {
                if (isDebugEnabled) {
                    logger.debug("Got InterruptedException while waiting for child threads to finish.");
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private void waitForRunningStatus() {
        for (ParallelGatewaySenderEventProcessor parallelGatewaySenderEventProcessor : this.processors) {
            synchronized (parallelGatewaySenderEventProcessor.runningStateLock) {
                while (parallelGatewaySenderEventProcessor.getException() == null && parallelGatewaySenderEventProcessor.isStopped()) {
                    try {
                        parallelGatewaySenderEventProcessor.runningStateLock.wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                Exception exception = parallelGatewaySenderEventProcessor.getException();
                if (exception != null) {
                    throw new GatewaySenderException(LocalizedStrings.Sender_COULD_NOT_START_GATEWAYSENDER_0_BECAUSE_OF_EXCEPTION_1.toLocalizedString(Long.valueOf(getId()), exception.getMessage()), exception.getCause());
                }
            }
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void stopProcessing() {
        if (isAlive()) {
            setIsStopped(true);
            final LoggingThreadGroup createThreadGroup = LoggingThreadGroup.createThreadGroup("ConcurrentParallelGatewaySenderEventProcessor Logger Group", logger);
            ThreadFactory threadFactory = new ThreadFactory() { // from class: org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor.1
                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    Thread thread = new Thread(createThreadGroup, runnable, "ConcurrentParallelGatewaySenderEventProcessor Stopper Thread");
                    thread.setDaemon(true);
                    return thread;
                }
            };
            ArrayList arrayList = new ArrayList();
            for (ParallelGatewaySenderEventProcessor parallelGatewaySenderEventProcessor : this.processors) {
                arrayList.add(new AbstractGatewaySenderEventProcessor.SenderStopperCallable(parallelGatewaySenderEventProcessor));
            }
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.processors.length, threadFactory);
            try {
                Iterator it = newFixedThreadPool.invokeAll(arrayList).iterator();
                while (it.hasNext()) {
                    try {
                        Boolean bool = (Boolean) ((Future) it.next()).get();
                        if (logger.isDebugEnabled()) {
                            logger.debug("ConcurrentParallelGatewaySenderEventProcessor: {} stopped dispatching: {}", bool.booleanValue() ? "Successfully" : "Unsuccesfully", this);
                        }
                    } catch (ExecutionException e) {
                        logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_0_CAUGHT_EXCEPTION_WHILE_STOPPING_1, this.sender), e.getCause());
                    }
                }
                newFixedThreadPool.shutdown();
                closeProcessor();
                if (logger.isDebugEnabled()) {
                    logger.debug("ConcurrentParallelGatewaySenderEventProcessor: Stopped dispatching: {}", this);
                }
            } catch (InterruptedException e2) {
                throw new InternalGemFireException(e2);
            } catch (RejectedExecutionException e3) {
                throw e3;
            }
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void closeProcessor() {
        for (ParallelGatewaySenderEventProcessor parallelGatewaySenderEventProcessor : this.processors) {
            parallelGatewaySenderEventProcessor.closeProcessor();
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void pauseDispatching() {
        for (ParallelGatewaySenderEventProcessor parallelGatewaySenderEventProcessor : this.processors) {
            parallelGatewaySenderEventProcessor.pauseDispatching();
        }
        super.pauseDispatching();
        if (logger.isDebugEnabled()) {
            logger.debug("ConcurrentParallelGatewaySenderEventProcessor: Paused dispatching: {}", this);
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void waitForDispatcherToPause() {
        for (ParallelGatewaySenderEventProcessor parallelGatewaySenderEventProcessor : this.processors) {
            parallelGatewaySenderEventProcessor.waitForDispatcherToPause();
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void resumeDispatching() {
        for (ParallelGatewaySenderEventProcessor parallelGatewaySenderEventProcessor : this.processors) {
            parallelGatewaySenderEventProcessor.resumeDispatching();
        }
        super.resumeDispatching();
        if (logger.isDebugEnabled()) {
            logger.debug("ConcurrentParallelGatewaySenderEventProcessor: Resumed dispatching: {}", this);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void waitForResumption() throws InterruptedException {
        super.waitForResumption();
    }

    public List<ParallelGatewaySenderEventProcessor> getProcessors() {
        LinkedList linkedList = new LinkedList();
        for (int i = 0; i < this.processors.length; i++) {
            linkedList.add(this.processors[i]);
        }
        return linkedList;
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public RegionQueue getQueue() {
        return this.queue;
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public GatewaySenderEventDispatcher getDispatcher() {
        return this.processors[0].getDispatcher();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void rebalance() {
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void initializeEventDispatcher() {
    }
}
