package org.apache.geode.internal.cache.wan;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.TreeSet;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.util.Gateway;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionAdvisee;
import org.apache.geode.distributed.internal.DistributionAdvisor;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.UpdateAttributesProcessor;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.StaticSerialization;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.class */
public class GatewaySenderAdvisor extends DistributionAdvisor {
    private static final Logger logger = LogService.getLogger();
    private DistributedLockService lockService;
    private volatile boolean isPrimary;
    private final Object primaryLock;
    private final String lockToken;
    private Thread lockObtainingThread;
    private final AbstractGatewaySender sender;

    /* loaded from: input_file:org/apache/geode/internal/cache/wan/GatewaySenderAdvisor$GatewaySenderProfile.class */
    public static class GatewaySenderProfile extends DistributionAdvisor.Profile {
        public String Id;
        public long startTime;
        public int remoteDSId;
        public boolean isRunning;
        public boolean isPrimary;
        public boolean isParallel;
        public boolean isBatchConflationEnabled;
        public boolean isPersistenceEnabled;
        public int alertThreshold;
        public boolean manualStart;
        public ArrayList<String> eventFiltersClassNames;
        public ArrayList<String> transFiltersClassNames;
        public ArrayList<String> senderEventListenerClassNames;
        public boolean isDiskSynchronous;
        public int dispatcherThreads;
        public GatewaySender.OrderPolicy orderPolicy;
        public ServerLocation serverLocation;
        public boolean enforceThreadsConnectSameReceiver;

        @Immutable
        private static final KnownVersion[] serializationVersions = {KnownVersion.GEODE_1_14_0};

        public GatewaySenderProfile(InternalDistributedMember internalDistributedMember, int i) {
            super(internalDistributedMember, i);
            this.eventFiltersClassNames = new ArrayList<>();
            this.transFiltersClassNames = new ArrayList<>();
            this.senderEventListenerClassNames = new ArrayList<>();
            this.enforceThreadsConnectSameReceiver = false;
        }

        public GatewaySenderProfile() {
            this.eventFiltersClassNames = new ArrayList<>();
            this.transFiltersClassNames = new ArrayList<>();
            this.senderEventListenerClassNames = new ArrayList<>();
            this.enforceThreadsConnectSameReceiver = false;
        }

        @Override // org.apache.geode.distributed.internal.DistributionAdvisor.Profile
        public void fromData(DataInput dataInput, DeserializationContext deserializationContext) throws IOException, ClassNotFoundException {
            fromDataPre_GEODE_1_14_0_0(dataInput, deserializationContext);
            this.enforceThreadsConnectSameReceiver = dataInput.readBoolean();
        }

        public void fromDataPre_GEODE_1_14_0_0(DataInput dataInput, DeserializationContext deserializationContext) throws IOException, ClassNotFoundException {
            super.fromData(dataInput, deserializationContext);
            this.Id = DataSerializer.readString(dataInput);
            this.startTime = dataInput.readLong();
            this.remoteDSId = dataInput.readInt();
            this.isRunning = dataInput.readBoolean();
            this.isPrimary = dataInput.readBoolean();
            this.isParallel = dataInput.readBoolean();
            this.isBatchConflationEnabled = dataInput.readBoolean();
            this.isPersistenceEnabled = dataInput.readBoolean();
            this.alertThreshold = dataInput.readInt();
            this.manualStart = dataInput.readBoolean();
            this.eventFiltersClassNames = DataSerializer.readArrayList(dataInput);
            this.transFiltersClassNames = DataSerializer.readArrayList(dataInput);
            this.senderEventListenerClassNames = DataSerializer.readArrayList(dataInput);
            this.isDiskSynchronous = dataInput.readBoolean();
            this.dispatcherThreads = dataInput.readInt();
            if (StaticSerialization.getVersionForDataStream(dataInput).isOlderThan(KnownVersion.GFE_90)) {
                Gateway.OrderPolicy orderPolicy = (Gateway.OrderPolicy) DataSerializer.readObject(dataInput);
                if (orderPolicy == null) {
                    this.orderPolicy = null;
                } else if (orderPolicy.name().equals(GatewaySender.OrderPolicy.KEY.name())) {
                    this.orderPolicy = GatewaySender.OrderPolicy.KEY;
                } else if (orderPolicy.name().equals(GatewaySender.OrderPolicy.THREAD.name())) {
                    this.orderPolicy = GatewaySender.OrderPolicy.THREAD;
                } else {
                    this.orderPolicy = GatewaySender.OrderPolicy.PARTITION;
                }
            } else {
                this.orderPolicy = (GatewaySender.OrderPolicy) DataSerializer.readObject(dataInput);
            }
            if (DataSerializer.readPrimitiveBoolean(dataInput)) {
                this.serverLocation = new ServerLocation();
                InternalDataSerializer.invokeFromData(this.serverLocation, dataInput);
            }
        }

        @Override // org.apache.geode.distributed.internal.DistributionAdvisor.Profile
        public void toData(DataOutput dataOutput, SerializationContext serializationContext) throws IOException {
            toDataPre_GEODE_1_14_0_0(dataOutput, serializationContext);
            dataOutput.writeBoolean(this.enforceThreadsConnectSameReceiver);
        }

        public void toDataPre_GEODE_1_14_0_0(DataOutput dataOutput, SerializationContext serializationContext) throws IOException {
            super.toData(dataOutput, serializationContext);
            DataSerializer.writeString(this.Id, dataOutput);
            dataOutput.writeLong(this.startTime);
            dataOutput.writeInt(this.remoteDSId);
            dataOutput.writeBoolean(this.isRunning);
            dataOutput.writeBoolean(this.isPrimary);
            dataOutput.writeBoolean(this.isParallel);
            dataOutput.writeBoolean(this.isBatchConflationEnabled);
            dataOutput.writeBoolean(this.isPersistenceEnabled);
            dataOutput.writeInt(this.alertThreshold);
            dataOutput.writeBoolean(this.manualStart);
            DataSerializer.writeArrayList(this.eventFiltersClassNames, dataOutput);
            DataSerializer.writeArrayList(this.transFiltersClassNames, dataOutput);
            DataSerializer.writeArrayList(this.senderEventListenerClassNames, dataOutput);
            dataOutput.writeBoolean(this.isDiskSynchronous);
            dataOutput.writeInt(this.dispatcherThreads);
            if (!StaticSerialization.getVersionForDataStream(dataOutput).isOlderThan(KnownVersion.GFE_90) || this.orderPolicy == null) {
                DataSerializer.writeObject(this.orderPolicy, dataOutput);
            } else {
                String name = this.orderPolicy.name();
                if (name.equals(Gateway.OrderPolicy.KEY.name())) {
                    DataSerializer.writeObject(Gateway.OrderPolicy.KEY, dataOutput);
                } else if (name.equals(Gateway.OrderPolicy.THREAD.name())) {
                    DataSerializer.writeObject(Gateway.OrderPolicy.THREAD, dataOutput);
                } else {
                    DataSerializer.writeObject(Gateway.OrderPolicy.PARTITION, dataOutput);
                }
            }
            boolean z = this.serverLocation != null;
            DataSerializer.writePrimitiveBoolean(z, dataOutput);
            if (z) {
                InternalDataSerializer.invokeToData(this.serverLocation, dataOutput);
            }
        }

        @Override // org.apache.geode.distributed.internal.DistributionAdvisor.Profile
        public KnownVersion[] getSerializationVersions() {
            return serializationVersions;
        }

        @Override // org.apache.geode.distributed.internal.DistributionAdvisor.Profile
        public int getDSFID() {
            return 2144;
        }

        @Override // org.apache.geode.distributed.internal.DistributionAdvisor.Profile
        public void processIncoming(ClusterDistributionManager clusterDistributionManager, String str, boolean z, boolean z2, List<DistributionAdvisor.Profile> list) {
            InternalCache cache = clusterDistributionManager.getCache();
            if (cache != null) {
                handleDistributionAdvisee((AbstractGatewaySender) cache.getGatewaySender(str), z, z2, list);
            }
        }

        @Override // org.apache.geode.distributed.internal.DistributionAdvisor.Profile
        public void fillInToString(StringBuilder sb) {
            super.fillInToString(sb);
            sb.append("; id=").append(this.Id);
            sb.append("; remoteDSName=").append(this.remoteDSId);
            sb.append("; isRunning=").append(this.isRunning);
            sb.append("; isPrimary=").append(this.isPrimary);
        }
    }

    private GatewaySenderAdvisor(DistributionAdvisee distributionAdvisee) {
        super(distributionAdvisee);
        this.primaryLock = new Object();
        this.sender = (AbstractGatewaySender) distributionAdvisee;
        this.lockToken = getDLockServiceName() + "-token";
    }

    public static GatewaySenderAdvisor createGatewaySenderAdvisor(DistributionAdvisee distributionAdvisee) {
        GatewaySenderAdvisor gatewaySenderAdvisor = new GatewaySenderAdvisor(distributionAdvisee);
        gatewaySenderAdvisor.initialize();
        return gatewaySenderAdvisor;
    }

    public String getDLockServiceName() {
        return getClass().getName() + PartitionedRegion.BUCKET_NAME_SEPARATOR + this.sender.getId();
    }

    public Thread getLockObtainingThread() {
        return this.lockObtainingThread;
    }

    @Override // org.apache.geode.distributed.internal.DistributionAdvisor
    protected DistributionAdvisor.Profile instantiateProfile(InternalDistributedMember internalDistributedMember, int i) {
        return new GatewaySenderProfile(internalDistributedMember, i);
    }

    @Override // org.apache.geode.distributed.internal.DistributionAdvisor
    public void profileCreated(DistributionAdvisor.Profile profile) {
        if (profile instanceof GatewaySenderProfile) {
            checkCompatibility((GatewaySenderProfile) profile);
        }
    }

    private void checkCompatibility(GatewaySenderProfile gatewaySenderProfile) {
        if (gatewaySenderProfile.remoteDSId != this.sender.getRemoteDSId()) {
            throw new IllegalStateException(String.format("Cannot create Gateway Sender %s with remote ds id %s because another cache has the same Gateway Sender defined with remote ds id %s.", gatewaySenderProfile.Id, Integer.valueOf(gatewaySenderProfile.remoteDSId), Integer.valueOf(this.sender.remoteDSId)));
        }
        if (gatewaySenderProfile.isParallel && !this.sender.isParallel()) {
            throw new IllegalStateException(String.format("Cannot create Gateway Sender %s as parallel gateway sender because another cache has the same sender as serial gateway sender", gatewaySenderProfile.Id));
        }
        if (!gatewaySenderProfile.isParallel && this.sender.isParallel()) {
            throw new IllegalStateException(String.format("Cannot create Gateway Sender %s as serial gateway sender because another cache has the same sender as parallel gateway sender", gatewaySenderProfile.Id));
        }
        if (gatewaySenderProfile.isBatchConflationEnabled != this.sender.isBatchConflationEnabled()) {
            throw new IllegalStateException(String.format("Cannot create Gateway Sender %s with isBatchConflationEnabled %s because another cache has the same Gateway Sender defined with isBatchConflationEnabled %s", gatewaySenderProfile.Id, Boolean.valueOf(gatewaySenderProfile.isBatchConflationEnabled), Boolean.valueOf(this.sender.isBatchConflationEnabled())));
        }
        if (gatewaySenderProfile.isPersistenceEnabled != this.sender.isPersistenceEnabled()) {
            throw new IllegalStateException(String.format("Cannot create Gateway Sender %s with isPersistentEnabled %s because another cache has the same Gateway Sender defined with isPersistentEnabled %s", gatewaySenderProfile.Id, Boolean.valueOf(gatewaySenderProfile.isPersistenceEnabled), Boolean.valueOf(this.sender.isPersistenceEnabled())));
        }
        if (gatewaySenderProfile.alertThreshold != this.sender.getAlertThreshold()) {
            throw new IllegalStateException(String.format("Cannot create Gateway Sender %s with alertThreshold %s because another cache has the same Gateway Sender defined with alertThreshold %s", gatewaySenderProfile.Id, Integer.valueOf(gatewaySenderProfile.alertThreshold), Integer.valueOf(this.sender.getAlertThreshold())));
        }
        if (!this.sender.isParallel() && gatewaySenderProfile.manualStart != this.sender.isManualStart()) {
            throw new IllegalStateException(String.format("Cannot create Gateway Sender %s with manual start %s because another cache has the same Gateway Sender defined with manual start %s", gatewaySenderProfile.Id, Boolean.valueOf(gatewaySenderProfile.manualStart), Boolean.valueOf(this.sender.isManualStart())));
        }
        if (!gatewaySenderProfile.isParallel && gatewaySenderProfile.orderPolicy != this.sender.getOrderPolicy()) {
            throw new IllegalStateException(String.format("Cannot create Gateway Sender %s with orderPolicy %s because another cache has the same Gateway Sender defined with orderPolicy %s", gatewaySenderProfile.Id, gatewaySenderProfile.orderPolicy, this.sender.getOrderPolicy()));
        }
        ArrayList arrayList = new ArrayList();
        Iterator<GatewayEventFilter> it = this.sender.getGatewayEventFilters().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getClass().getName());
        }
        if (gatewaySenderProfile.eventFiltersClassNames.size() != arrayList.size()) {
            throw new IllegalStateException(String.format("Cannot create Gateway Sender %s with GatewayEventFilters %s because another cache has the same Gateway Sender defined with GatewayEventFilters %s", gatewaySenderProfile.Id, gatewaySenderProfile.eventFiltersClassNames, arrayList));
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            if (!gatewaySenderProfile.eventFiltersClassNames.contains((String) it2.next())) {
                throw new IllegalStateException(String.format("Cannot create Gateway Sender %s with GatewayEventFilters %s because another cache has the same Gateway Sender defined with GatewayEventFilters %s", gatewaySenderProfile.Id, gatewaySenderProfile.eventFiltersClassNames, arrayList));
            }
        }
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        Iterator<GatewayTransportFilter> it3 = this.sender.getGatewayTransportFilters().iterator();
        while (it3.hasNext()) {
            linkedHashSet.add(it3.next().getClass().getName());
        }
        if (gatewaySenderProfile.transFiltersClassNames.size() != linkedHashSet.size()) {
            throw new IllegalStateException(String.format("Cannot create Gateway Sender %s with GatewayTransportFilters %s because another cache has the same Gateway Sender defined with GatewayTransportFilters %s", gatewaySenderProfile.Id, gatewaySenderProfile.transFiltersClassNames, linkedHashSet));
        }
        Iterator<String> it4 = gatewaySenderProfile.transFiltersClassNames.iterator();
        Iterator it5 = linkedHashSet.iterator();
        while (it4.hasNext() && it5.hasNext()) {
            if (!it4.next().equals(it5.next())) {
                throw new IllegalStateException(String.format("Cannot create Gateway Sender %s with GatewayTransportFilters %s because another cache has the same Gateway Sender defined with GatewayTransportFilters %s", gatewaySenderProfile.Id, gatewaySenderProfile.transFiltersClassNames, linkedHashSet));
            }
        }
        ArrayList arrayList2 = new ArrayList();
        Iterator<AsyncEventListener> it6 = this.sender.getAsyncEventListeners().iterator();
        while (it6.hasNext()) {
            arrayList2.add(it6.next().getClass().getName());
        }
        if (gatewaySenderProfile.senderEventListenerClassNames.size() != arrayList2.size()) {
            throw new IllegalStateException(String.format("Cannot create Gateway Sender %s with AsyncEventListeners %s because another cache has the same Gateway Sender defined with AsyncEventListener %s", gatewaySenderProfile.Id, gatewaySenderProfile.senderEventListenerClassNames, arrayList2));
        }
        Iterator it7 = arrayList2.iterator();
        while (it7.hasNext()) {
            if (!gatewaySenderProfile.senderEventListenerClassNames.contains((String) it7.next())) {
                throw new IllegalStateException(String.format("Cannot create Gateway Sender %s with AsyncEventListeners %s because another cache has the same Gateway Sender defined with AsyncEventListener %s", gatewaySenderProfile.Id, gatewaySenderProfile.senderEventListenerClassNames, arrayList2));
            }
        }
        if (gatewaySenderProfile.isDiskSynchronous != this.sender.isDiskSynchronous()) {
            throw new IllegalStateException(String.format("Cannot create Gateway Sender %s with isDiskSynchronous %s because another cache has the same Gateway Sender defined with isDiskSynchronous %s", gatewaySenderProfile.Id, Boolean.valueOf(gatewaySenderProfile.isDiskSynchronous), Boolean.valueOf(this.sender.isDiskSynchronous())));
        }
        if (gatewaySenderProfile.getDistributedMember().getVersion().isNotOlderThan(KnownVersion.GEODE_1_14_0) && gatewaySenderProfile.enforceThreadsConnectSameReceiver != this.sender.getEnforceThreadsConnectSameReceiver()) {
            throw new IllegalStateException(String.format("Cannot create Gateway Sender %s with enforceThreadsConnectSameReceiver %s because another cache has the same Gateway Sender defined with enforceThreadsConnectSameReceiver %s", gatewaySenderProfile.Id, Boolean.valueOf(gatewaySenderProfile.enforceThreadsConnectSameReceiver), Boolean.valueOf(this.sender.getEnforceThreadsConnectSameReceiver())));
        }
    }

    @Override // org.apache.geode.distributed.internal.DistributionAdvisor
    public void profileUpdated(DistributionAdvisor.Profile profile) {
        if (profile instanceof GatewaySenderProfile) {
            GatewaySenderProfile gatewaySenderProfile = (GatewaySenderProfile) profile;
            if (gatewaySenderProfile.isParallel) {
                return;
            }
            if (gatewaySenderProfile.isRunning) {
                if (gatewaySenderProfile.serverLocation != null) {
                    this.sender.setServerLocation(gatewaySenderProfile.serverLocation);
                }
            } else if (advisePrimaryGatewaySender() == null && !this.sender.isPrimary()) {
                if (adviseEldestGatewaySender()) {
                    launchLockObtainingVolunteerThread();
                } else if (logger.isDebugEnabled()) {
                    logger.debug("Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...", this.sender);
                }
            }
        }
    }

    @Override // org.apache.geode.distributed.internal.DistributionAdvisor
    protected void profileRemoved(DistributionAdvisor.Profile profile) {
        if (!(profile instanceof GatewaySenderProfile) || ((GatewaySenderProfile) profile).isParallel || advisePrimaryGatewaySender() != null || this.sender.isPrimary()) {
            return;
        }
        if (adviseEldestGatewaySender()) {
            launchLockObtainingVolunteerThread();
        } else if (logger.isDebugEnabled()) {
            logger.debug("Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...", this.sender);
        }
    }

    public boolean isPrimary() {
        return this.sender.isParallel() || this.isPrimary;
    }

    public void initDLockService() {
        InternalDistributedSystem internalDistributedSystem = this.sender.getCache().getInternalDistributedSystem();
        String dLockServiceName = getDLockServiceName();
        this.lockService = DistributedLockService.getServiceNamed(dLockServiceName);
        if (this.lockService == null) {
            this.lockService = DLockService.create(dLockServiceName, internalDistributedSystem, true, true);
        }
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Obtained DistributedLockService: {}", this, this.lockService);
        }
    }

    public boolean volunteerForPrimary() {
        if (logger.isDebugEnabled()) {
            logger.debug("Sender : {} is volunteering for Primary ", this.sender.getId());
        }
        if (advisePrimaryGatewaySender() != null) {
            return false;
        }
        if (adviseEldestGatewaySender()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Sender : {} no Primary available. So going to acquire distributed lock", this.sender);
            }
            this.lockService.lock(this.lockToken, 10000L, -1L);
            return this.lockService.isHeldByCurrentThread(this.lockToken);
        }
        if (!logger.isDebugEnabled()) {
            return false;
        }
        logger.debug("Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...", this.sender);
        return false;
    }

    private boolean adviseEldestGatewaySender() {
        DistributionAdvisor.Profile[] profileArr = this.profiles;
        TreeSet treeSet = new TreeSet();
        for (DistributionAdvisor.Profile profile : profileArr) {
            GatewaySenderProfile gatewaySenderProfile = (GatewaySenderProfile) profile;
            if (!gatewaySenderProfile.isParallel && gatewaySenderProfile.isRunning) {
                treeSet.add(Long.valueOf(gatewaySenderProfile.startTime));
            }
        }
        return treeSet.isEmpty() || (this.sender.isRunning() && this.sender.startTime <= ((Long) treeSet.first()).longValue());
    }

    public void makePrimary() {
        logger.info("{} : Starting as primary", this.sender);
        AbstractGatewaySenderEventProcessor eventProcessor = this.sender.getEventProcessor();
        if (eventProcessor != null) {
            eventProcessor.removeCacheListener();
        }
        logger.info("{} : Becoming primary gateway sender", this.sender);
        notifyAndBecomePrimary();
        new UpdateAttributesProcessor(this.sender).distribute(false);
    }

    public void notifyAndBecomePrimary() {
        synchronized (this.primaryLock) {
            setIsPrimary(true);
            notifyPrimaryLock();
        }
    }

    public void notifyPrimaryLock() {
        synchronized (this.primaryLock) {
            this.primaryLock.notifyAll();
        }
    }

    public void makeSecondary() {
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Did not obtain the lock on {}. Starting as secondary gateway sender.", this.sender, this.lockToken);
        }
        logger.info("{} starting as secondary because primary gateway sender is available on member :{}", new Object[]{this.sender.getId(), advisePrimaryGatewaySender()});
        this.isPrimary = false;
        new UpdateAttributesProcessor(this.sender).distribute(false);
    }

    public void launchLockObtainingVolunteerThread() {
        this.lockObtainingThread = new LoggingThread("Gateway Sender Primary Lock Acquisition Thread Volunteer", () -> {
            this.sender.getLifeCycleLock().readLock().lock();
            try {
                if (this.sender.isRunning()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("{}: Obtaining the lock on {}", this, this.lockToken);
                    }
                    if (volunteerForPrimary()) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("{}: Obtained the lock on {}", this, this.lockToken);
                        }
                        logger.info("{} is becoming primary gateway Sender.", this);
                        makePrimary();
                    }
                }
            } catch (CancelException e) {
            } catch (Exception e2) {
                if (!this.sender.getStopper().isCancelInProgress()) {
                    logger.fatal(String.format("%s: The thread to obtain the failover lock was interrupted. This gateway sender will never become the primary.", this), e2);
                }
            } finally {
                this.sender.getLifeCycleLock().readLock().unlock();
            }
        });
        this.lockObtainingThread.start();
    }

    public void waitToBecomePrimary(AbstractGatewaySenderEventProcessor abstractGatewaySenderEventProcessor) throws InterruptedException {
        if (isPrimary()) {
            return;
        }
        synchronized (this.primaryLock) {
            logger.info("{} : Waiting to become primary gateway", this.sender.getId());
            while (!isPrimary()) {
                this.primaryLock.wait(1000L);
                if (this.sender.getEventProcessor() != null && abstractGatewaySenderEventProcessor.isStopped()) {
                    logger.info("The event processor is stopped, not to wait for being primary any more.");
                    return;
                }
            }
        }
    }

    public InternalDistributedMember advisePrimaryGatewaySender() {
        for (DistributionAdvisor.Profile profile : this.profiles) {
            GatewaySenderProfile gatewaySenderProfile = (GatewaySenderProfile) profile;
            if (!gatewaySenderProfile.isParallel && gatewaySenderProfile.isPrimary) {
                return gatewaySenderProfile.getDistributedMember();
            }
        }
        return null;
    }

    public void setIsPrimary(boolean z) {
        this.isPrimary = z;
    }

    @Override // org.apache.geode.distributed.internal.DistributionAdvisor
    public void close() {
        new UpdateAttributesProcessor(getAdvisee(), true).distribute(false);
        super.close();
    }
}
