package org.apache.geode.internal.cache.partitioned;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.geode.CancelException;
import org.apache.geode.GemFireRethrowable;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.streaming.StreamingOperation;
import org.apache.geode.internal.CopyOnWriteHashSet;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PrimaryBucketException;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.Versioning;
import org.apache.geode.internal.util.BlobHelper;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/geode/internal/cache/partitioned/StreamingPartitionOperation.class */
public abstract class StreamingPartitionOperation extends StreamingOperation {
    private static final Logger logger = LogService.getLogger();
    protected final int regionId;

    /* loaded from: input_file:org/apache/geode/internal/cache/partitioned/StreamingPartitionOperation$StreamingPartitionMessage.class */
    public static abstract class StreamingPartitionMessage extends PartitionMessage {
        transient HeapDataOutputStream outStream;
        transient int replyMsgNum;
        transient boolean replyLastMsg;
        transient int numObjectsInChunk;

        public StreamingPartitionMessage() {
            this.outStream = null;
            this.replyMsgNum = 0;
            this.replyLastMsg = true;
            this.numObjectsInChunk = 0;
        }

        public StreamingPartitionMessage(Set set, int i, ReplyProcessor21 replyProcessor21) {
            super(set, i, replyProcessor21);
            this.outStream = null;
            this.replyMsgNum = 0;
            this.replyLastMsg = true;
            this.numObjectsInChunk = 0;
        }

        public StreamingPartitionMessage(InternalDistributedMember internalDistributedMember, int i, ReplyProcessor21 replyProcessor21) {
            super(internalDistributedMember, i, replyProcessor21);
            this.outStream = null;
            this.replyMsgNum = 0;
            this.replyLastMsg = true;
            this.numObjectsInChunk = 0;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.geode.internal.cache.partitioned.PartitionMessage
        public void sendReply(InternalDistributedMember internalDistributedMember, int i, DistributionManager distributionManager, ReplyException replyException, PartitionedRegion partitionedRegion, long j) {
            if (replyException != null) {
                this.outStream = null;
                this.replyMsgNum = 0;
                this.replyLastMsg = true;
            }
            if (this.replyLastMsg && partitionedRegion != null && j > 0) {
                partitionedRegion.getPrStats().endPartitionMessagesProcessing(j);
            }
            StreamingOperation.StreamingReplyMessage.send(internalDistributedMember, i, replyException, distributionManager, this.outStream, this.numObjectsInChunk, this.replyMsgNum, this.replyLastMsg);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.geode.internal.cache.partitioned.PartitionMessage
        public boolean operateOnPartitionedRegion(ClusterDistributionManager clusterDistributionManager, PartitionedRegion partitionedRegion, long j) throws CacheException, QueryException, ForceReattemptException, InterruptedException {
            Object nextReplyObject;
            Object nextReplyObject2;
            boolean isTraceEnabled = StreamingPartitionOperation.logger.isTraceEnabled();
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            Object obj = null;
            boolean z = false;
            boolean z2 = false;
            this.outStream = new HeapDataOutputStream(clusterDistributionManager.getSystem().getConfig().getSocketBufferSize() - 200, Versioning.getKnownVersionOrDefault(mo236getSender().getVersion(), KnownVersion.CURRENT));
            do {
                if (obj == null) {
                    try {
                        nextReplyObject = getNextReplyObject(partitionedRegion);
                        this.replyLastMsg = nextReplyObject == Token.END_OF_STREAM;
                    } catch (IOException e) {
                        throw new InternalGemFireException(e);
                    }
                } else {
                    nextReplyObject = obj;
                    obj = null;
                }
                if (!this.replyLastMsg) {
                    this.numObjectsInChunk = 1;
                    if (isTraceEnabled) {
                        StreamingPartitionOperation.logger.trace("Writing this object to StreamingPartitionMessage outStream: '{}'", nextReplyObject);
                    }
                    BlobHelper.serializeTo(nextReplyObject, this.outStream);
                    do {
                        this.outStream.disallowExpansion(StreamingOperation.CHUNK_FULL);
                        nextReplyObject2 = getNextReplyObject(partitionedRegion);
                        this.replyLastMsg = nextReplyObject2 == Token.END_OF_STREAM;
                        if (!this.replyLastMsg) {
                            if (isTraceEnabled) {
                                try {
                                    StreamingPartitionOperation.logger.trace("Writing this object to StreamingPartitionMessage outStream: '{}'", nextReplyObject2);
                                } catch (GemFireRethrowable e2) {
                                    obj = nextReplyObject2;
                                }
                            }
                            BlobHelper.serializeTo(nextReplyObject2, this.outStream);
                            this.numObjectsInChunk++;
                        }
                    } while (nextReplyObject2 != Token.END_OF_STREAM);
                }
                try {
                    sendReply(mo236getSender(), this.processorId, clusterDistributionManager, null, partitionedRegion, j);
                    this.replyMsgNum++;
                    if (this.replyLastMsg) {
                        z = true;
                    }
                    this.outStream.reset();
                    this.numObjectsInChunk = 0;
                } catch (CancelException e3) {
                    z2 = true;
                }
            } while (!this.replyLastMsg);
            if (z || z2) {
                return false;
            }
            throw new InternalGemFireError("unexpected condition");
        }

        protected abstract Object getNextReplyObject(PartitionedRegion partitionedRegion) throws CacheException, ForceReattemptException, InterruptedException;

        protected Object getNextReplyObject() {
            throw new UnsupportedOperationException("use getNextReplyObject(PartitionedRegion) instead");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/geode/internal/cache/partitioned/StreamingPartitionOperation$StreamingPartitionResponse.class */
    public class StreamingPartitionResponse extends ReplyProcessor21 {
        protected volatile boolean abort;
        protected final Map statusMap;
        protected final AtomicInteger msgsBeingProcessed;
        private volatile String memberDepartedMessage;
        private final Set<InternalDistributedMember> failedMembers;
        private volatile boolean finishedWaiting;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/geode/internal/cache/partitioned/StreamingPartitionOperation$StreamingPartitionResponse$Status.class */
        public class Status {
            int msgsProcessed = 0;
            int numMsgs = 0;

            Status() {
            }

            protected synchronized boolean trackMessage(StreamingOperation.StreamingReplyMessage streamingReplyMessage) {
                this.msgsProcessed++;
                if (streamingReplyMessage.isLastMessage()) {
                    this.numMsgs = streamingReplyMessage.getMessageNumber() + 1;
                }
                if (StreamingPartitionOperation.logger.isDebugEnabled()) {
                    StreamingPartitionOperation.logger.debug("Streaming Message Tracking Status: Processor id: {}; Sender: {}; Messages Processed: {}; NumMsgs: {}", Integer.valueOf(StreamingPartitionResponse.this.getProcessorId()), streamingReplyMessage.mo236getSender(), Integer.valueOf(this.msgsProcessed), Integer.valueOf(this.numMsgs));
                }
                return this.msgsProcessed == this.numMsgs;
            }
        }

        public StreamingPartitionResponse(InternalDistributedSystem internalDistributedSystem, Set set) {
            super(internalDistributedSystem, set);
            this.abort = false;
            this.statusMap = new HashMap();
            this.msgsBeingProcessed = new AtomicInteger();
            this.memberDepartedMessage = null;
            this.failedMembers = new CopyOnWriteHashSet();
            this.finishedWaiting = false;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.geode.distributed.internal.ReplyProcessor21
        public boolean stopBecauseOfExceptions() {
            return false;
        }

        @Override // org.apache.geode.distributed.internal.ReplyProcessor21
        public void process(DistributionMessage distributionMessage) {
            boolean z;
            if (waitingOnMember(distributionMessage.mo236getSender())) {
                this.msgsBeingProcessed.incrementAndGet();
                try {
                    StreamingOperation.StreamingReplyMessage streamingReplyMessage = (StreamingOperation.StreamingReplyMessage) distributionMessage;
                    List objects = streamingReplyMessage.getObjects();
                    if (objects != null) {
                        boolean z2 = this.abort;
                        if (!z2) {
                            z2 = !StreamingPartitionOperation.this.processChunk(objects, streamingReplyMessage.mo236getSender(), streamingReplyMessage.getMessageNumber(), streamingReplyMessage.isLastMessage());
                            if (z2) {
                                this.abort = true;
                            }
                        }
                        z = z2 || trackMessage(streamingReplyMessage);
                    } else {
                        z = true;
                    }
                    if (z) {
                        super.process(distributionMessage, false);
                    }
                } finally {
                    this.msgsBeingProcessed.decrementAndGet();
                    checkIfDone();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.geode.distributed.internal.ReplyProcessor21
        public synchronized void processException(DistributionMessage distributionMessage, ReplyException replyException) {
            Throwable cause = replyException.getCause();
            if (!(cause instanceof ForceReattemptException) && !(cause instanceof CacheClosedException)) {
                super.processException(distributionMessage, replyException);
                return;
            }
            if (StreamingPartitionOperation.logger.isDebugEnabled()) {
                StreamingPartitionOperation.logger.debug("StreamingPartitionResponse received exception {} for member {} query retry required.", cause, distributionMessage.mo236getSender());
            }
            this.failedMembers.add(distributionMessage.mo236getSender());
        }

        @Override // org.apache.geode.distributed.internal.ReplyProcessor21, org.apache.geode.distributed.internal.MembershipListener
        public void memberDeparted(DistributionManager distributionManager, InternalDistributedMember internalDistributedMember, boolean z) {
            if (internalDistributedMember != null && waitingOnMember(internalDistributedMember)) {
                this.failedMembers.add(internalDistributedMember);
                this.memberDepartedMessage = String.format("Streaming reply processor got memberDeparted event for < %s > crashed, %s", internalDistributedMember, Boolean.valueOf(z));
            }
            super.memberDeparted(distributionManager, internalDistributedMember, z);
        }

        public Set<InternalDistributedMember> waitForCacheOrQueryException() throws CacheException, QueryException {
            try {
                waitForRepliesUninterruptibly();
                return this.failedMembers;
            } catch (ReplyException e) {
                Throwable cause = e.getCause();
                if (cause instanceof CacheException) {
                    throw ((CacheException) cause);
                }
                if (cause instanceof RegionDestroyedException) {
                    throw ((RegionDestroyedException) cause);
                }
                if (cause instanceof QueryException) {
                    throw ((QueryException) cause);
                }
                if (cause instanceof PrimaryBucketException) {
                    throw new PrimaryBucketException("Peer failed primary test", cause);
                }
                e.handleCause();
                throw e;
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.geode.distributed.internal.ReplyProcessor21
        public boolean stillWaiting() {
            if (this.finishedWaiting) {
                return false;
            }
            if (this.msgsBeingProcessed.get() > 0 && numMembers() > 0) {
                return true;
            }
            this.finishedWaiting = this.finishedWaiting || this.abort || !super.stillWaiting();
            return !this.finishedWaiting;
        }

        @Override // org.apache.geode.distributed.internal.ReplyProcessor21
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("<");
            sb.append(getClass().getName());
            sb.append(" ");
            sb.append(getProcessorId());
            if (this.members == null) {
                sb.append(" (null memebrs)");
            } else {
                sb.append(" waiting for ");
                sb.append(numMembers());
                sb.append(" replies");
                sb.append(this.exception == null ? "" : " exception: " + this.exception);
                sb.append(" from ");
                sb.append(membersToString());
            }
            sb.append("; waiting for ");
            sb.append(this.msgsBeingProcessed.get());
            sb.append(" messages in the process of being processed>");
            return sb.toString();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public boolean trackMessage(StreamingOperation.StreamingReplyMessage streamingReplyMessage) {
            Status status;
            synchronized (this) {
                status = (Status) this.statusMap.get(streamingReplyMessage.mo236getSender());
                if (status == null) {
                    status = new Status();
                    this.statusMap.put(streamingReplyMessage.mo236getSender(), status);
                }
            }
            return status.trackMessage(streamingReplyMessage);
        }

        public void removeFailedSenders(Set set) {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                removeMember((InternalDistributedMember) it.next(), true);
            }
        }
    }

    public StreamingPartitionOperation(InternalDistributedSystem internalDistributedSystem, int i) {
        super(internalDistributedSystem);
        this.regionId = i;
    }

    @Override // org.apache.geode.distributed.internal.streaming.StreamingOperation
    public void getDataFromAll(Set set) {
        throw new UnsupportedOperationException("call getPartitionedDataFrom instead");
    }

    public Set<InternalDistributedMember> getPartitionedDataFrom(Set set) throws TimeoutException, InterruptedException, QueryException, ForceReattemptException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        if (set.isEmpty()) {
            return Collections.emptySet();
        }
        StreamingPartitionResponse streamingPartitionResponse = new StreamingPartitionResponse(this.sys, set);
        this.sys.getDistributionManager().putOutgoing(createRequestMessage(set, streamingPartitionResponse));
        return streamingPartitionResponse.waitForCacheOrQueryException();
    }

    @Override // org.apache.geode.distributed.internal.streaming.StreamingOperation
    protected abstract DistributionMessage createRequestMessage(Set set, ReplyProcessor21 replyProcessor21);
}
