package org.apache.geode.distributed.internal.streaming;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.GemFireRethrowable;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.PRQueryTraceInfo;
import org.apache.geode.cache.query.internal.QueryMonitor;
import org.apache.geode.cache.query.internal.StructImpl;
import org.apache.geode.cache.query.internal.types.StructTypeImpl;
import org.apache.geode.cache.query.types.ObjectType;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.PartitionedRegionQueryEvaluator;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.util.BlobHelper;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/geode/distributed/internal/streaming/StreamingOperation.class */
public abstract class StreamingOperation {
    public static final int MSG_OVERHEAD = 200;
    public final InternalDistributedSystem sys;
    private static final Logger logger = LogService.getLogger();
    public static final GemFireRethrowable CHUNK_FULL = new GemFireRethrowable();

    /* loaded from: input_file:org/apache/geode/distributed/internal/streaming/StreamingOperation$RequestStreamingMessage.class */
    public static abstract class RequestStreamingMessage extends PooledDistributionMessage implements MessageWithReply {
        protected int processorId;

        @Override // org.apache.geode.distributed.internal.DistributionMessage, org.apache.geode.distributed.internal.MessageWithReply
        public int getProcessorId() {
            return this.processorId;
        }

        @Override // org.apache.geode.distributed.internal.DistributionMessage
        protected void process(DistributionManager distributionManager) {
            Object nextReplyObject;
            Throwable th = null;
            Object obj = null;
            HeapDataOutputStream heapDataOutputStream = new HeapDataOutputStream(distributionManager.getSystem().getConfig().getSocketBufferSize() - 200, getSender().getVersionObject());
            boolean z = false;
            boolean z2 = false;
            int i = 0;
            do {
                int i2 = 0;
                if (obj == null) {
                    try {
                        nextReplyObject = getNextReplyObject();
                    } catch (VirtualMachineError e) {
                        SystemFailure.initiateFailure(e);
                        throw e;
                    } catch (Throwable th2) {
                        SystemFailure.checkFailure();
                        th = th2;
                    }
                } else {
                    nextReplyObject = obj;
                    obj = null;
                }
                if (nextReplyObject != Token.END_OF_STREAM) {
                    i2 = 1;
                    BlobHelper.serializeTo(nextReplyObject, heapDataOutputStream);
                    do {
                        heapDataOutputStream.disallowExpansion(StreamingOperation.CHUNK_FULL);
                        nextReplyObject = getNextReplyObject();
                        if (nextReplyObject != Token.END_OF_STREAM) {
                            try {
                                BlobHelper.serializeTo(nextReplyObject, heapDataOutputStream);
                                i2++;
                            } catch (GemFireRethrowable e2) {
                                obj = nextReplyObject;
                            }
                        }
                    } while (nextReplyObject != Token.END_OF_STREAM);
                }
                try {
                    int i3 = i;
                    i++;
                    replyWithData(distributionManager, heapDataOutputStream, i2, i3, nextReplyObject == Token.END_OF_STREAM);
                    if (nextReplyObject == Token.END_OF_STREAM) {
                        z = true;
                    }
                    heapDataOutputStream.reset();
                } catch (CancelException e3) {
                    z2 = true;
                }
            } while (nextReplyObject != Token.END_OF_STREAM);
            if (th != null) {
                replyWithException(distributionManager, new ReplyException(th));
            } else if (!z && !z2) {
                throw new InternalGemFireError(LocalizedStrings.StreamingOperation_THIS_SHOULDNT_HAPPEN.toLocalizedString());
            }
        }

        protected abstract Object getNextReplyObject() throws InterruptedException;

        protected void replyWithData(DistributionManager distributionManager, HeapDataOutputStream heapDataOutputStream, int i, int i2, boolean z) {
            StreamingReplyMessage.send(getSender(), this.processorId, (ReplyException) null, distributionManager, heapDataOutputStream, i, i2, z);
        }

        protected void replyWithException(DistributionManager distributionManager, ReplyException replyException) {
            StreamingReplyMessage.send(getSender(), this.processorId, replyException, (DM) distributionManager, (HeapDataOutputStream) null, 0, 0, true);
        }

        @Override // org.apache.geode.distributed.internal.DistributionMessage, org.apache.geode.internal.DataSerializableFixedID
        public void fromData(DataInput dataInput) throws IOException, ClassNotFoundException {
            super.fromData(dataInput);
            this.processorId = dataInput.readInt();
        }

        @Override // org.apache.geode.distributed.internal.DistributionMessage, org.apache.geode.internal.DataSerializableFixedID
        public void toData(DataOutput dataOutput) throws IOException {
            super.toData(dataOutput);
            dataOutput.writeInt(this.processorId);
        }

        @Override // org.apache.geode.distributed.internal.DistributionMessage
        public String toString() {
            StringBuffer stringBuffer = new StringBuffer();
            stringBuffer.append(getClass().getName());
            stringBuffer.append("'; sender=");
            stringBuffer.append(getSender());
            stringBuffer.append("; processorId=");
            stringBuffer.append(this.processorId);
            stringBuffer.append(")");
            return stringBuffer.toString();
        }
    }

    /* loaded from: input_file:org/apache/geode/distributed/internal/streaming/StreamingOperation$StreamingProcessor.class */
    public class StreamingProcessor extends ReplyProcessor21 {
        protected volatile boolean abort;
        private final Map statusMap;
        protected final AtomicInteger msgsBeingProcessed;
        private volatile boolean finishedWaiting;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/geode/distributed/internal/streaming/StreamingOperation$StreamingProcessor$Status.class */
        public class Status {
            int msgsProcessed = 0;
            int numMsgs = 0;

            Status() {
            }

            protected synchronized boolean trackMessage(StreamingReplyMessage streamingReplyMessage) {
                this.msgsProcessed++;
                if (streamingReplyMessage.lastMsg) {
                    this.numMsgs = streamingReplyMessage.msgNum + 1;
                }
                if (StreamingOperation.logger.isDebugEnabled()) {
                    StreamingOperation.logger.debug("Streaming Message Tracking Status: Processor id: {}; Sender: {}; Messages Processed: {}; NumMsgs: {}", Integer.valueOf(StreamingProcessor.this.getProcessorId()), streamingReplyMessage.getSender(), Integer.valueOf(this.msgsProcessed), Integer.valueOf(this.numMsgs));
                }
                return this.msgsProcessed == this.numMsgs;
            }
        }

        public StreamingProcessor(InternalDistributedSystem internalDistributedSystem, InternalDistributedMember internalDistributedMember) {
            super(internalDistributedSystem, internalDistributedMember);
            this.abort = false;
            this.statusMap = new HashMap();
            this.msgsBeingProcessed = new AtomicInteger();
            this.finishedWaiting = false;
        }

        public StreamingProcessor(InternalDistributedSystem internalDistributedSystem, Set set) {
            super(internalDistributedSystem, set);
            this.abort = false;
            this.statusMap = new HashMap();
            this.msgsBeingProcessed = new AtomicInteger();
            this.finishedWaiting = false;
        }

        @Override // org.apache.geode.distributed.internal.ReplyProcessor21
        public void process(DistributionMessage distributionMessage) {
            boolean z;
            if (waitingOnMember(distributionMessage.getSender())) {
                this.msgsBeingProcessed.incrementAndGet();
                try {
                    StreamingReplyMessage streamingReplyMessage = (StreamingReplyMessage) distributionMessage;
                    List objects = streamingReplyMessage.getObjects();
                    if (objects != null) {
                        boolean z2 = this.abort;
                        if (!z2) {
                            z2 = !StreamingOperation.this.processChunk(objects, streamingReplyMessage.getSender(), streamingReplyMessage.msgNum, streamingReplyMessage.lastMsg);
                            if (z2) {
                                this.abort = true;
                            }
                        }
                        z = z2 || trackMessage(streamingReplyMessage);
                    } else {
                        z = true;
                    }
                    if (z) {
                        super.process(distributionMessage, false);
                    }
                } finally {
                    this.msgsBeingProcessed.decrementAndGet();
                    checkIfDone();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.geode.distributed.internal.ReplyProcessor21
        public boolean stillWaiting() {
            if (this.finishedWaiting) {
                return false;
            }
            if (this.msgsBeingProcessed.get() > 0) {
                return true;
            }
            this.finishedWaiting = this.finishedWaiting || this.abort || !super.stillWaiting();
            return !this.finishedWaiting;
        }

        @Override // org.apache.geode.distributed.internal.ReplyProcessor21
        public String toString() {
            return "<" + getClass().getName() + " " + getProcessorId() + " waiting for " + numMembers() + " replies" + (this.exception == null ? "" : " exception: " + this.exception) + " from " + membersToString() + "; waiting for " + this.msgsBeingProcessed.get() + " messages in the process of being processed>";
        }

        protected boolean trackMessage(StreamingReplyMessage streamingReplyMessage) {
            Status status;
            synchronized (this) {
                status = (Status) this.statusMap.get(streamingReplyMessage.getSender());
                if (status == null) {
                    status = new Status();
                    this.statusMap.put(streamingReplyMessage.getSender(), status);
                }
            }
            return status.trackMessage(streamingReplyMessage);
        }
    }

    /* loaded from: input_file:org/apache/geode/distributed/internal/streaming/StreamingOperation$StreamingReplyMessage.class */
    public static final class StreamingReplyMessage extends ReplyMessage {
        protected int msgNum;
        protected boolean lastMsg;
        private transient HeapDataOutputStream chunkStream;
        private transient int numObjects;
        private transient List objectList = null;
        private boolean pdxReadSerialized = false;
        private transient boolean isCanceled = false;

        public static void send(InternalDistributedMember internalDistributedMember, int i, ReplyException replyException, DM dm, HeapDataOutputStream heapDataOutputStream, int i2, int i3, boolean z) {
            send(internalDistributedMember, i, replyException, dm, heapDataOutputStream, i2, i3, z, false);
        }

        public static void send(InternalDistributedMember internalDistributedMember, int i, ReplyException replyException, DM dm, HeapDataOutputStream heapDataOutputStream, int i2, int i3, boolean z, boolean z2) {
            StreamingReplyMessage streamingReplyMessage = new StreamingReplyMessage();
            streamingReplyMessage.processorId = i;
            if (replyException != null) {
                streamingReplyMessage.setException(replyException);
                StreamingOperation.logger.debug("Replying with exception: {}", streamingReplyMessage, replyException);
            }
            streamingReplyMessage.chunkStream = heapDataOutputStream;
            streamingReplyMessage.numObjects = i2;
            streamingReplyMessage.setRecipient(internalDistributedMember);
            streamingReplyMessage.msgNum = i3;
            streamingReplyMessage.lastMsg = z;
            streamingReplyMessage.pdxReadSerialized = z2;
            dm.putOutgoing(streamingReplyMessage);
        }

        public int getMessageNumber() {
            return this.msgNum;
        }

        public boolean isLastMessage() {
            return this.lastMsg;
        }

        public boolean isCanceled() {
            return this.isCanceled;
        }

        public List getObjects() {
            return this.objectList;
        }

        @Override // org.apache.geode.distributed.internal.ReplyMessage, org.apache.geode.internal.DataSerializableFixedID
        public int getDSFID() {
            return -66;
        }

        @Override // org.apache.geode.distributed.internal.ReplyMessage, org.apache.geode.distributed.internal.DistributionMessage, org.apache.geode.internal.DataSerializableFixedID
        public void fromData(DataInput dataInput) throws IOException, ClassNotFoundException {
            super.fromData(dataInput);
            int readInt = dataInput.readInt();
            this.msgNum = dataInput.readInt();
            this.lastMsg = dataInput.readBoolean();
            this.pdxReadSerialized = dataInput.readBoolean();
            boolean z = InternalDataSerializer.getVersionForDataStream(dataInput).compareTo(Version.GFE_81) > 0;
            if (readInt == -1) {
                this.objectList = null;
                return;
            }
            this.numObjects = readInt;
            this.objectList = new ArrayList(readInt);
            if (this.pdxReadSerialized) {
                DefaultQuery.setPdxReadSerialized(true);
            }
            try {
                ReplyProcessor21 processor = ReplyProcessor21.getProcessor(this.processorId);
                boolean z2 = processor instanceof PartitionedRegionQueryEvaluator.StreamingQueryPartitionResponse;
                ObjectType objectType = null;
                if (z2) {
                    objectType = ((PartitionedRegionQueryEvaluator.StreamingQueryPartitionResponse) processor).getResultType();
                }
                boolean z3 = false;
                int i = 0;
                while (true) {
                    if (i < readInt) {
                        if (DefaultQuery.testHook != null) {
                            DefaultQuery.testHook.doTestHook(3);
                        }
                        if (z2 && QueryMonitor.isLowMemory()) {
                            z3 = true;
                            break;
                        }
                        Object readObject = DataSerializer.readObject(dataInput);
                        if (z2 && objectType != null && objectType.isStructType()) {
                            boolean z4 = z;
                            if (z4 && i == 0) {
                                z4 = !(readObject instanceof PRQueryTraceInfo);
                            }
                            if (z4) {
                                readObject = new StructImpl((StructTypeImpl) objectType, (Object[]) readObject);
                            }
                        }
                        this.objectList.add(readObject);
                        i++;
                    } else {
                        break;
                    }
                }
                if (z3) {
                    this.isCanceled = true;
                    if (DefaultQuery.testHook != null) {
                        DefaultQuery.testHook.doTestHook(2);
                    }
                }
            } finally {
                if (this.pdxReadSerialized) {
                    DefaultQuery.setPdxReadSerialized(false);
                }
            }
        }

        @Override // org.apache.geode.distributed.internal.ReplyMessage, org.apache.geode.distributed.internal.DistributionMessage, org.apache.geode.internal.DataSerializableFixedID
        public void toData(DataOutput dataOutput) throws IOException {
            super.toData(dataOutput);
            if (this.chunkStream == null) {
                dataOutput.writeInt(-1);
            } else {
                dataOutput.writeInt(this.numObjects);
            }
            dataOutput.writeInt(this.msgNum);
            dataOutput.writeBoolean(this.lastMsg);
            dataOutput.writeBoolean(this.pdxReadSerialized);
            if (this.chunkStream == null || this.numObjects <= 0) {
                return;
            }
            this.chunkStream.sendTo(dataOutput);
        }

        @Override // org.apache.geode.distributed.internal.ReplyMessage, org.apache.geode.distributed.internal.DistributionMessage
        public String toString() {
            StringBuffer stringBuffer = new StringBuffer();
            stringBuffer.append(getClass().getName());
            stringBuffer.append("(processorId=");
            stringBuffer.append(this.processorId);
            stringBuffer.append(" from ");
            stringBuffer.append(getSender());
            ReplyException exception = getException();
            if (exception != null) {
                stringBuffer.append(" with exception ");
                stringBuffer.append(exception);
            }
            stringBuffer.append(";numObjects=");
            stringBuffer.append(this.numObjects);
            stringBuffer.append(";msgNum ");
            stringBuffer.append(this.msgNum);
            stringBuffer.append(";lastMsg=");
            stringBuffer.append(this.lastMsg);
            if (this.objectList != null) {
                stringBuffer.append(";objectList(size=");
                stringBuffer.append(this.objectList.size());
                stringBuffer.append(")");
            } else {
                stringBuffer.append(";chunkStream=");
                stringBuffer.append(this.chunkStream);
            }
            stringBuffer.append(")");
            return stringBuffer.toString();
        }
    }

    public StreamingOperation(InternalDistributedSystem internalDistributedSystem) {
        this.sys = internalDistributedSystem;
    }

    public void getDataFromAll(Set set) throws TimeoutException, InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        if (set.isEmpty()) {
            return;
        }
        StreamingProcessor streamingProcessor = new StreamingProcessor(this.sys, set);
        this.sys.getDistributionManager().putOutgoing(createRequestMessage(set, streamingProcessor));
        try {
            streamingProcessor.waitForRepliesUninterruptibly();
        } catch (InternalGemFireException e) {
            Throwable cause = e.getCause();
            if (!(cause instanceof TimeoutException)) {
                throw e;
            }
            throw ((TimeoutException) cause);
        } catch (ReplyException e2) {
            e2.handleAsUnexpected();
        }
    }

    protected abstract DistributionMessage createRequestMessage(Set set, ReplyProcessor21 replyProcessor21);

    public boolean processChunk(List list, InternalDistributedMember internalDistributedMember, int i, boolean z) {
        return processData(list, internalDistributedMember, i, z);
    }

    protected abstract boolean processData(List list, InternalDistributedMember internalDistributedMember, int i, boolean z);
}
