package org.apache.geode.internal.cache;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.SerialDistributionMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/geode/internal/cache/StateFlushOperation.class */
public class StateFlushOperation {
    private static final Logger logger = LogService.getLogger();
    private DistributedRegion region;
    private DistributionManager dm;

    /* loaded from: input_file:org/apache/geode/internal/cache/StateFlushOperation$StateFlushReplyProcessor.class */
    public static class StateFlushReplyProcessor extends ReplyProcessor21 {
        InternalDistributedMember targetMember;
        int originalCount;
        boolean targetMemberHasLeft;

        public StateFlushReplyProcessor(DistributionManager distributionManager, Set set, DistributedMember distributedMember) {
            super(distributionManager, set);
            this.targetMember = (InternalDistributedMember) distributedMember;
            this.originalCount = set.size();
            this.targetMemberHasLeft = this.targetMemberHasLeft || !distributionManager.isCurrentMember((InternalDistributedMember) distributedMember);
        }

        public void messageNotSentTo(Set set) {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                memberDeparted(null, (InternalDistributedMember) it.next(), true);
            }
        }

        @Override // org.apache.geode.distributed.internal.ReplyProcessor21, org.apache.geode.distributed.internal.MembershipListener
        public void memberDeparted(DistributionManager distributionManager, InternalDistributedMember internalDistributedMember, boolean z) {
            super.memberDeparted(distributionManager, internalDistributedMember, z);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.geode.distributed.internal.ReplyProcessor21
        public void processActiveMembers(Set set) {
            super.processActiveMembers(set);
            if (set.contains(this.targetMember)) {
                return;
            }
            this.targetMemberHasLeft = true;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.geode.distributed.internal.ReplyProcessor21
        public boolean stillWaiting() {
            this.targetMemberHasLeft = this.targetMemberHasLeft || !getDistributionManager().isCurrentMember(this.targetMember);
            return super.stillWaiting() && !this.targetMemberHasLeft;
        }

        @Override // org.apache.geode.distributed.internal.ReplyProcessor21
        public String toString() {
            return "<" + shortName() + " " + getProcessorId() + " targeting " + this.targetMember + " waiting for " + numMembers() + " replies out of " + this.originalCount + " " + (this.exception == null ? "" : " exception: " + this.exception) + " from " + membersToString() + ">";
        }
    }

    /* loaded from: input_file:org/apache/geode/internal/cache/StateFlushOperation$StateMarkerMessage.class */
    public static class StateMarkerMessage extends DistributionMessage implements MessageWithReply {
        public boolean flushNewOps;
        protected DistributedMember relayRecipient;
        protected int processorId;
        protected int processorType;
        protected String regionPath;
        protected DistributedRegion region;
        protected transient boolean severeAlertEnabled;
        protected boolean allRegions;

        @Override // org.apache.geode.distributed.internal.DistributionMessage, org.apache.geode.distributed.internal.MessageWithReply
        public int getProcessorId() {
            return this.processorId;
        }

        @Override // org.apache.geode.distributed.internal.DistributionMessage
        public int getProcessorType() {
            return this.processorType;
        }

        private DistributedRegion getRegion(ClusterDistributionManager clusterDistributionManager) {
            if (this.region != null) {
                return this.region;
            }
            int threadInitLevelRequirement = LocalRegion.setThreadInitLevelRequirement(1);
            try {
                InternalRegion regionByPathForProcessing = clusterDistributionManager.getExistingCache().getRegionByPathForProcessing(this.regionPath);
                if (regionByPathForProcessing instanceof DistributedRegion) {
                    this.region = (DistributedRegion) regionByPathForProcessing;
                }
                return this.region;
            } finally {
                LocalRegion.setThreadInitLevelRequirement(threadInitLevelRequirement);
            }
        }

        private Set<DistributedRegion> getAllRegions(ClusterDistributionManager clusterDistributionManager) {
            int threadInitLevelRequirement = LocalRegion.setThreadInitLevelRequirement(1);
            try {
                InternalCache existingCache = clusterDistributionManager.getExistingCache();
                HashSet hashSet = new HashSet();
                for (InternalRegion internalRegion : existingCache.getAllRegions()) {
                    if ((internalRegion instanceof DistributedRegion) && !((LocalRegion) internalRegion).isDestroyed) {
                        hashSet.add((DistributedRegion) internalRegion);
                    }
                }
                return hashSet;
            } finally {
                LocalRegion.setThreadInitLevelRequirement(threadInitLevelRequirement);
            }
        }

        @Override // org.apache.geode.distributed.internal.DistributionMessage
        protected void process(ClusterDistributionManager clusterDistributionManager) {
            StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Processing {}", this);
            try {
                if (clusterDistributionManager.getDistributionManagerId().equals(this.relayRecipient)) {
                    try {
                        for (DistributedRegion distributedRegion : getRegions(clusterDistributionManager)) {
                            if (distributedRegion != null) {
                                if (!this.allRegions || !distributedRegion.doesNotDistribute()) {
                                    waitForCurrentOperations(distributedRegion, distributedRegion.isInitialized());
                                }
                            }
                        }
                        StateStabilizedMessage stateStabilizedMessage = new StateStabilizedMessage();
                        stateStabilizedMessage.sendingMember = this.relayRecipient;
                        stateStabilizedMessage.setRecipient(getSender());
                        stateStabilizedMessage.setProcessorId(this.processorId);
                        if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                            StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", stateStabilizedMessage);
                        }
                        clusterDistributionManager.putOutgoing(stateStabilizedMessage);
                        return;
                    } catch (CancelException e) {
                        StateStabilizedMessage stateStabilizedMessage2 = new StateStabilizedMessage();
                        stateStabilizedMessage2.sendingMember = this.relayRecipient;
                        stateStabilizedMessage2.setRecipient(getSender());
                        stateStabilizedMessage2.setProcessorId(this.processorId);
                        if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                            StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", stateStabilizedMessage2);
                        }
                        clusterDistributionManager.putOutgoing(stateStabilizedMessage2);
                        return;
                    } catch (Exception e2) {
                        StateFlushOperation.logger.fatal(String.format("%s Exception caught while determining channel state", this), e2);
                        StateStabilizedMessage stateStabilizedMessage3 = new StateStabilizedMessage();
                        stateStabilizedMessage3.sendingMember = this.relayRecipient;
                        stateStabilizedMessage3.setRecipient(getSender());
                        stateStabilizedMessage3.setProcessorId(this.processorId);
                        if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                            StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", stateStabilizedMessage3);
                        }
                        clusterDistributionManager.putOutgoing(stateStabilizedMessage3);
                        return;
                    }
                }
                StateStabilizationMessage stateStabilizationMessage = new StateStabilizationMessage();
                stateStabilizationMessage.setRecipient((InternalDistributedMember) this.relayRecipient);
                stateStabilizationMessage.requestingMember = getSender();
                stateStabilizationMessage.processorId = this.processorId;
                try {
                    try {
                        for (DistributedRegion distributedRegion2 : getRegions(clusterDistributionManager)) {
                            if (distributedRegion2 == null && StateFlushOperation.logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
                                StateFlushOperation.logger.trace(LogMarker.DM_VERBOSE, "Region not found - skipping channel state assessment");
                            }
                            if (distributedRegion2 != null) {
                                if (!this.allRegions || !distributedRegion2.doesNotDistribute()) {
                                    boolean isInitialized = distributedRegion2.isInitialized();
                                    waitForCurrentOperations(distributedRegion2, isInitialized);
                                    boolean z = distributedRegion2.getMulticastEnabled() && distributedRegion2.getSystem().getConfig().getMcastPort() != 0;
                                    if (isInitialized) {
                                        Map<String, Long> messageState = clusterDistributionManager.getMembershipManager().getMessageState(this.relayRecipient, z);
                                        if (stateStabilizationMessage.channelState != null) {
                                            stateStabilizationMessage.channelState.putAll(messageState);
                                        } else {
                                            stateStabilizationMessage.channelState = messageState;
                                        }
                                        if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE) && stateStabilizationMessage.channelState != null && stateStabilizationMessage.channelState.size() > 0) {
                                            StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "channel states: {}", stateStabilizationMessage.channelStateDescription(stateStabilizationMessage.channelState));
                                        }
                                    }
                                }
                            }
                        }
                        if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                            StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", stateStabilizationMessage);
                        }
                        clusterDistributionManager.putOutgoing(stateStabilizationMessage);
                    } catch (Throwable th) {
                        if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                            StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", stateStabilizationMessage);
                        }
                        clusterDistributionManager.putOutgoing(stateStabilizationMessage);
                        throw th;
                    }
                } catch (CancelException e3) {
                    if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                        StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", stateStabilizationMessage);
                    }
                    clusterDistributionManager.putOutgoing(stateStabilizationMessage);
                } catch (Exception e4) {
                    StateFlushOperation.logger.fatal(String.format("%s Exception caught while determining channel state", this), e4);
                    if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                        StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", stateStabilizationMessage);
                    }
                    clusterDistributionManager.putOutgoing(stateStabilizationMessage);
                }
            } catch (Throwable th2) {
                StateStabilizedMessage stateStabilizedMessage4 = new StateStabilizedMessage();
                stateStabilizedMessage4.sendingMember = this.relayRecipient;
                stateStabilizedMessage4.setRecipient(getSender());
                stateStabilizedMessage4.setProcessorId(this.processorId);
                if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                    StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", stateStabilizedMessage4);
                }
                clusterDistributionManager.putOutgoing(stateStabilizedMessage4);
                throw th2;
            }
        }

        private void waitForCurrentOperations(DistributedRegion distributedRegion, boolean z) {
            if (z) {
                if (this.flushNewOps) {
                    distributedRegion.getDistributionAdvisor().forceNewMembershipVersion();
                }
                try {
                    distributedRegion.getDistributionAdvisor().waitForCurrentOperations();
                } catch (RegionDestroyedException e) {
                }
            }
        }

        private Set<DistributedRegion> getRegions(ClusterDistributionManager clusterDistributionManager) {
            return this.allRegions ? getAllRegions(clusterDistributionManager) : Collections.singleton(getRegion(clusterDistributionManager));
        }

        @Override // org.apache.geode.distributed.internal.DistributionMessage, org.apache.geode.internal.DataSerializableFixedID
        public void toData(DataOutput dataOutput) throws IOException {
            super.toData(dataOutput);
            DataSerializer.writeObject(this.relayRecipient, dataOutput);
            dataOutput.writeInt(this.processorId);
            dataOutput.writeInt(this.processorType);
            dataOutput.writeBoolean(this.allRegions);
            if (this.allRegions) {
                return;
            }
            DataSerializer.writeString(this.regionPath, dataOutput);
        }

        @Override // org.apache.geode.internal.DataSerializableFixedID
        public int getDSFID() {
            return -80;
        }

        @Override // org.apache.geode.distributed.internal.DistributionMessage, org.apache.geode.internal.DataSerializableFixedID
        public void fromData(DataInput dataInput) throws IOException, ClassNotFoundException {
            super.fromData(dataInput);
            this.relayRecipient = (DistributedMember) DataSerializer.readObject(dataInput);
            this.processorId = dataInput.readInt();
            this.processorType = dataInput.readInt();
            this.allRegions = dataInput.readBoolean();
            if (this.allRegions) {
                return;
            }
            this.regionPath = DataSerializer.readString(dataInput);
        }

        @Override // org.apache.geode.distributed.internal.DistributionMessage
        public String toString() {
            return "StateMarkerMessage(requestingMember=" + getSender() + ",processorId=" + this.processorId + ",target=" + this.relayRecipient + ",region=" + this.regionPath + ")";
        }

        @Override // org.apache.geode.distributed.internal.DistributionMessage
        public boolean isSevereAlertCompatible() {
            return this.severeAlertEnabled;
        }
    }

    /* loaded from: input_file:org/apache/geode/internal/cache/StateFlushOperation$StateStabilizationMessage.class */
    public static class StateStabilizationMessage extends SerialDistributionMessage {
        protected DistributedMember requestingMember;
        protected int processorId;
        protected Map channelState;
        protected boolean isSingleFlushTo;

        public String channelStateDescription(Object obj) {
            if (!(obj instanceof Map)) {
                return "unknown channelState content";
            }
            StringBuilder sb = new StringBuilder(200);
            Iterator it = ((Map) obj).entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                sb.append(entry.getKey()).append('=').append(entry.getValue());
                if (it.hasNext()) {
                    sb.append(", ");
                }
            }
            return sb.toString();
        }

        @Override // org.apache.geode.distributed.internal.DistributionMessage
        protected void process(final ClusterDistributionManager clusterDistributionManager) {
            clusterDistributionManager.getWaitingThreadPool().execute(new Runnable() { // from class: org.apache.geode.internal.cache.StateFlushOperation.StateStabilizationMessage.1
                @Override // java.lang.Runnable
                public void run() {
                    boolean interrupted;
                    if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                        StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Processing {}", this);
                    }
                    try {
                        try {
                            try {
                                try {
                                    if (StateStabilizationMessage.this.channelState != null) {
                                        if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE) && StateStabilizationMessage.this.channelState.size() > 0) {
                                            StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Waiting for channel states:  {}", StateStabilizationMessage.this.channelStateDescription(StateStabilizationMessage.this.channelState));
                                        }
                                        while (true) {
                                            clusterDistributionManager.getCancelCriterion().checkCancelInProgress(null);
                                            interrupted = Thread.interrupted();
                                            try {
                                                clusterDistributionManager.getMembershipManager().waitForMessageState(StateStabilizationMessage.this.getSender(), StateStabilizationMessage.this.channelState);
                                                break;
                                            } catch (InterruptedException e) {
                                                if (1 != 0) {
                                                    Thread.currentThread().interrupt();
                                                }
                                            } catch (Throwable th) {
                                                if (interrupted) {
                                                    Thread.currentThread().interrupt();
                                                }
                                                throw th;
                                            }
                                        }
                                        if (interrupted) {
                                            Thread.currentThread().interrupt();
                                        }
                                    }
                                    StateStabilizedMessage stateStabilizedMessage = new StateStabilizedMessage();
                                    stateStabilizedMessage.setRecipient((InternalDistributedMember) StateStabilizationMessage.this.requestingMember);
                                    if (StateStabilizationMessage.this.isSingleFlushTo) {
                                        stateStabilizedMessage.sendingMember = clusterDistributionManager.getDistributionManagerId();
                                    } else {
                                        stateStabilizedMessage.sendingMember = StateStabilizationMessage.this.getSender();
                                    }
                                    stateStabilizedMessage.setProcessorId(StateStabilizationMessage.this.processorId);
                                    if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                                        StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", stateStabilizedMessage);
                                    }
                                    if (StateStabilizationMessage.this.requestingMember.equals(clusterDistributionManager.getDistributionManagerId())) {
                                        stateStabilizedMessage.dmProcess(clusterDistributionManager);
                                    } else {
                                        clusterDistributionManager.putOutgoing(stateStabilizedMessage);
                                    }
                                } catch (ThreadDeath e2) {
                                    throw e2;
                                }
                            } catch (VirtualMachineError e3) {
                                SystemFailure.initiateFailure(e3);
                                throw e3;
                            }
                        } catch (Throwable th2) {
                            SystemFailure.checkFailure();
                            StateFlushOperation.logger.fatal("Exception caught while waiting for channel state", th2);
                            StateStabilizedMessage stateStabilizedMessage2 = new StateStabilizedMessage();
                            stateStabilizedMessage2.setRecipient((InternalDistributedMember) StateStabilizationMessage.this.requestingMember);
                            if (StateStabilizationMessage.this.isSingleFlushTo) {
                                stateStabilizedMessage2.sendingMember = clusterDistributionManager.getDistributionManagerId();
                            } else {
                                stateStabilizedMessage2.sendingMember = StateStabilizationMessage.this.getSender();
                            }
                            stateStabilizedMessage2.setProcessorId(StateStabilizationMessage.this.processorId);
                            if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                                StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", stateStabilizedMessage2);
                            }
                            if (StateStabilizationMessage.this.requestingMember.equals(clusterDistributionManager.getDistributionManagerId())) {
                                stateStabilizedMessage2.dmProcess(clusterDistributionManager);
                            } else {
                                clusterDistributionManager.putOutgoing(stateStabilizedMessage2);
                            }
                        }
                    } catch (Throwable th3) {
                        StateStabilizedMessage stateStabilizedMessage3 = new StateStabilizedMessage();
                        stateStabilizedMessage3.setRecipient((InternalDistributedMember) StateStabilizationMessage.this.requestingMember);
                        if (StateStabilizationMessage.this.isSingleFlushTo) {
                            stateStabilizedMessage3.sendingMember = clusterDistributionManager.getDistributionManagerId();
                        } else {
                            stateStabilizedMessage3.sendingMember = StateStabilizationMessage.this.getSender();
                        }
                        stateStabilizedMessage3.setProcessorId(StateStabilizationMessage.this.processorId);
                        if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                            StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", stateStabilizedMessage3);
                        }
                        if (StateStabilizationMessage.this.requestingMember.equals(clusterDistributionManager.getDistributionManagerId())) {
                            stateStabilizedMessage3.dmProcess(clusterDistributionManager);
                        } else {
                            clusterDistributionManager.putOutgoing(stateStabilizedMessage3);
                        }
                        throw th3;
                    }
                }
            });
        }

        @Override // org.apache.geode.distributed.internal.DistributionMessage, org.apache.geode.internal.DataSerializableFixedID
        public void toData(DataOutput dataOutput) throws IOException {
            super.toData(dataOutput);
            dataOutput.writeInt(this.processorId);
            DataSerializer.writeHashMap(this.channelState, dataOutput);
            DataSerializer.writeObject(this.requestingMember, dataOutput);
            dataOutput.writeBoolean(this.isSingleFlushTo);
        }

        @Override // org.apache.geode.internal.DataSerializableFixedID
        public int getDSFID() {
            return -79;
        }

        @Override // org.apache.geode.distributed.internal.DistributionMessage, org.apache.geode.internal.DataSerializableFixedID
        public void fromData(DataInput dataInput) throws IOException, ClassNotFoundException {
            super.fromData(dataInput);
            this.processorId = dataInput.readInt();
            this.channelState = DataSerializer.readHashMap(dataInput);
            this.requestingMember = (DistributedMember) DataSerializer.readObject(dataInput);
            this.isSingleFlushTo = dataInput.readBoolean();
        }

        @Override // org.apache.geode.distributed.internal.DistributionMessage
        public String toString() {
            return "StateStabilizationMessage(recipients=" + getRecipientsDescription() + ",requestingMember=" + this.requestingMember + ",processorId=" + this.processorId + ")";
        }
    }

    /* loaded from: input_file:org/apache/geode/internal/cache/StateFlushOperation$StateStabilizedMessage.class */
    public static class StateStabilizedMessage extends ReplyMessage {
        protected DistributedMember sendingMember;

        @Override // org.apache.geode.distributed.internal.DistributionMessage
        public InternalDistributedMember getSender() {
            return (InternalDistributedMember) this.sendingMember;
        }

        @Override // org.apache.geode.distributed.internal.ReplyMessage
        public void process(DistributionManager distributionManager, ReplyProcessor21 replyProcessor21) {
            if (StateFlushOperation.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                StateFlushOperation.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Processing {}", this);
            }
            super.process(distributionManager, replyProcessor21);
        }

        @Override // org.apache.geode.distributed.internal.ReplyMessage, org.apache.geode.distributed.internal.DistributionMessage, org.apache.geode.internal.DataSerializableFixedID
        public void toData(DataOutput dataOutput) throws IOException {
            super.toData(dataOutput);
            DataSerializer.writeObject(this.sendingMember, dataOutput);
        }

        @Override // org.apache.geode.distributed.internal.ReplyMessage, org.apache.geode.internal.DataSerializableFixedID
        public int getDSFID() {
            return -78;
        }

        @Override // org.apache.geode.distributed.internal.ReplyMessage, org.apache.geode.distributed.internal.DistributionMessage, org.apache.geode.internal.DataSerializableFixedID
        public void fromData(DataInput dataInput) throws IOException, ClassNotFoundException {
            super.fromData(dataInput);
            this.sendingMember = (DistributedMember) DataSerializer.readObject(dataInput);
        }

        @Override // org.apache.geode.distributed.internal.ReplyMessage, org.apache.geode.distributed.internal.DistributionMessage
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("StateStabilizedMessage ");
            sb.append(this.processorId);
            if (super.getSender() != null) {
                sb.append(" from ");
                sb.append(super.getSender());
            }
            if (getRecipients().length > 0) {
                String recipientsDescription = getRecipientsDescription();
                sb.append(" to ");
                sb.append(recipientsDescription);
            }
            sb.append(" on behalf of ");
            sb.append(this.sendingMember);
            ReplyException exception = getException();
            if (exception != null) {
                sb.append(" with exception ");
                sb.append(exception);
            }
            return sb.toString();
        }
    }

    public static void flushTo(Set<InternalDistributedMember> set, DistributedRegion distributedRegion) {
        DistributionManager distributionManager = distributedRegion.getDistributionManager();
        if (distributedRegion.isInitialized()) {
            distributedRegion.getDistributionAdvisor().forceNewMembershipVersion();
            try {
                distributedRegion.getDistributionAdvisor().waitForCurrentOperations();
            } catch (RegionDestroyedException e) {
                return;
            }
        }
        HashSet hashSet = new HashSet();
        for (InternalDistributedMember internalDistributedMember : set) {
            StateStabilizationMessage stateStabilizationMessage = new StateStabilizationMessage();
            stateStabilizationMessage.isSingleFlushTo = true;
            stateStabilizationMessage.requestingMember = distributionManager.getDistributionManagerId();
            stateStabilizationMessage.setRecipient(internalDistributedMember);
            ReplyProcessor21 replyProcessor21 = new ReplyProcessor21(distributionManager, internalDistributedMember);
            stateStabilizationMessage.processorId = replyProcessor21.getProcessorId();
            stateStabilizationMessage.channelState = distributionManager.getMembershipManager().getMessageState(internalDistributedMember, false);
            if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE) && stateStabilizationMessage.channelState != null && stateStabilizationMessage.channelState.size() > 0) {
                logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "channel states: {}", stateStabilizationMessage.channelStateDescription(stateStabilizationMessage.channelState));
            }
            if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {}", stateStabilizationMessage);
            }
            distributionManager.putOutgoing(stateStabilizationMessage);
            hashSet.add(replyProcessor21);
        }
        if (distributedRegion.getRegionMap().getARMLockTestHook() != null) {
            distributedRegion.getRegionMap().getARMLockTestHook().beforeStateFlushWait();
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            try {
                ((ReplyProcessor21) it.next()).waitForReplies();
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    public StateFlushOperation(DistributedRegion distributedRegion) {
        this.region = distributedRegion;
        this.dm = distributedRegion.getDistributionManager();
    }

    public StateFlushOperation(DistributionManager distributionManager) {
        this.dm = distributionManager;
    }

    public boolean flush(Set set, DistributedMember distributedMember, int i, boolean z) throws InterruptedException {
        Set set2 = set;
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        InternalDistributedMember distributionManagerId = this.dm.getDistributionManagerId();
        if (!set2.contains(distributedMember) && !distributionManagerId.equals(distributedMember)) {
            set2 = new HashSet(set);
            set2.add(distributedMember);
        }
        StateMarkerMessage stateMarkerMessage = new StateMarkerMessage();
        stateMarkerMessage.relayRecipient = distributedMember;
        stateMarkerMessage.processorType = i;
        stateMarkerMessage.flushNewOps = z;
        if (this.region == null) {
            stateMarkerMessage.allRegions = true;
        } else {
            stateMarkerMessage.regionPath = this.region.getFullPath();
        }
        stateMarkerMessage.setRecipients(set2);
        StateFlushReplyProcessor stateFlushReplyProcessor = new StateFlushReplyProcessor(this.dm, set2, distributedMember);
        stateMarkerMessage.processorId = stateFlushReplyProcessor.getProcessorId();
        if (this.region != null && this.region.isUsedForPartitionedRegionBucket() && this.region.getDistributionConfig().getAckSevereAlertThreshold() > 0) {
            stateMarkerMessage.severeAlertEnabled = true;
            stateFlushReplyProcessor.enableSevereAlertProcessing();
        }
        if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
            logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Sending {} with processor {}", stateMarkerMessage, stateFlushReplyProcessor);
        }
        Set<InternalDistributedMember> putOutgoing = this.dm.putOutgoing(stateMarkerMessage);
        if (putOutgoing != null) {
            if (putOutgoing.contains(distributedMember)) {
                if (!logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                    return false;
                }
                logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "failed to send StateMarkerMessage to target {}; returning from flush without waiting for replies", distributedMember);
                return false;
            }
            stateFlushReplyProcessor.messageNotSentTo(putOutgoing);
        }
        try {
            stateFlushReplyProcessor.waitForReplies();
            if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
                logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "Finished processing {}", stateMarkerMessage);
            }
            return true;
        } catch (ReplyException e) {
            logger.warn("state flush terminated with exception", e);
            return false;
        }
    }
}
