/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service.reads;

import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.transform.MorePartitions;
import org.apache.cassandra.db.transform.MoreRows;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.ExcludingBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.reads.DataResolver;
import org.apache.cassandra.service.reads.ReadCallback;
import org.apache.cassandra.service.reads.ShortReadRowsProtection;
import org.apache.cassandra.service.reads.repair.NoopReadRepair;
import org.apache.cassandra.tracing.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShortReadPartitionsProtection
extends Transformation<UnfilteredRowIterator>
implements MorePartitions<UnfilteredPartitionIterator> {
    private static final Logger logger = LoggerFactory.getLogger(ShortReadPartitionsProtection.class);
    private final ReadCommand command;
    private final Replica source;
    private final DataLimits.Counter singleResultCounter;
    private final DataLimits.Counter mergedResultCounter;
    private DecoratedKey lastPartitionKey;
    private boolean partitionsFetched;
    private final long queryStartNanoTime;

    public ShortReadPartitionsProtection(ReadCommand command, Replica source, DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter, long queryStartNanoTime) {
        this.command = command;
        this.source = source;
        this.singleResultCounter = singleResultCounter;
        this.mergedResultCounter = mergedResultCounter;
        this.queryStartNanoTime = queryStartNanoTime;
    }

    @Override
    public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) {
        this.partitionsFetched = true;
        this.lastPartitionKey = partition.partitionKey();
        ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(this.command.metadata().keyspace), partition.partitionKey().getToken(), this.source);
        ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
        ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(), this.command, this.source, cmd -> this.executeReadCommand((ReadCommand)cmd, sharedReplicaPlan), this.singleResultCounter, this.mergedResultCounter);
        return Transformation.apply(MoreRows.extend(partition, protection), protection);
    }

    @Override
    public UnfilteredPartitionIterator moreContents() {
        assert (!this.mergedResultCounter.isDone());
        assert (!this.command.limits().isUnlimited());
        assert (!this.command.isLimitedToOnePartition());
        if (!this.singleResultCounter.isDone() && this.command.limits().perPartitionCount() == Integer.MAX_VALUE) {
            return null;
        }
        if (!this.partitionsFetched) {
            return null;
        }
        this.partitionsFetched = false;
        int toQuery = this.command.limits().count() != Integer.MAX_VALUE ? this.command.limits().count() - this.counted(this.mergedResultCounter) : this.command.limits().perPartitionCount();
        ColumnFamilyStore.metricsFor((TableId)this.command.metadata().id).shortReadProtectionRequests.mark();
        Tracing.trace("Requesting {} extra rows from {} for short read protection", (Object)toQuery, (Object)this.source);
        logger.info("Requesting {} extra rows from {} for short read protection", (Object)toQuery, (Object)this.source);
        return this.makeAndExecuteFetchAdditionalPartitionReadCommand(toQuery);
    }

    private int counted(DataLimits.Counter counter) {
        return this.command.limits().isGroupByLimit() ? counter.rowCounted() : counter.counted();
    }

    private UnfilteredPartitionIterator makeAndExecuteFetchAdditionalPartitionReadCommand(int toQuery) {
        PartitionRangeReadCommand cmd = (PartitionRangeReadCommand)this.command;
        DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
        AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange();
        Range<PartitionPosition> newBounds = bounds.inclusiveRight() ? new Range<DecoratedKey>(this.lastPartitionKey, (DecoratedKey)bounds.right) : new ExcludingBounds<DecoratedKey>(this.lastPartitionKey, (DecoratedKey)bounds.right);
        DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
        ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(this.command.metadata().keyspace), cmd.dataRange().keyRange(), this.source);
        return this.executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), ReplicaPlan.shared(replicaPlan));
    }

    private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, ReplicaPlan.Shared<E, P> replicaPlan) {
        DataResolver<E, P> resolver = new DataResolver<E, P>(cmd, replicaPlan, NoopReadRepair.instance, this.queryStartNanoTime);
        ReadCallback<E, P> handler = new ReadCallback<E, P>(resolver, cmd, replicaPlan, this.queryStartNanoTime);
        if (this.source.isSelf()) {
            StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
        } else {
            if (this.source.isTransient()) {
                cmd = cmd.copyAsTransientQuery(this.source);
            }
            MessagingService.instance().sendWithCallback(cmd.createMessage(false), this.source.endpoint(), handler);
        }
        handler.awaitResults();
        assert (resolver.getMessages().size() == 1);
        return ((ReadResponse)resolver.getMessages().get((int)0).payload).makeIterator(this.command);
    }
}

