/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service.reads.repair;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractFuture;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.locator.AbstractReplicaCollection;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.InOurDcTester;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.reads.repair.BlockingReadRepairs;
import org.apache.cassandra.service.reads.repair.ReadRepairDiagnostics;
import org.apache.cassandra.tracing.Tracing;

public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
extends AbstractFuture<Object>
implements RequestCallback<Object> {
    private final DecoratedKey key;
    private final P replicaPlan;
    private final Map<Replica, Mutation> pendingRepairs;
    private final CountDownLatch latch;
    private final Predicate<InetAddressAndPort> shouldBlockOn;
    private volatile long mutationsSentTime;

    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) {
        this(key, repairs, maxBlockFor, replicaPlan, (Predicate<InetAddressAndPort>)(((ReplicaPlan)replicaPlan).consistencyLevel().isDatacenterLocal() ? InOurDcTester.endpoints() : Predicates.alwaysTrue()));
    }

    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan, Predicate<InetAddressAndPort> shouldBlockOn) {
        this.key = key;
        this.pendingRepairs = new ConcurrentHashMap<Replica, Mutation>(repairs);
        this.replicaPlan = replicaPlan;
        this.shouldBlockOn = shouldBlockOn;
        int blockFor = maxBlockFor;
        for (Replica participant : ((ReplicaPlan)replicaPlan).contacts()) {
            if (repairs.containsKey(participant) || !shouldBlockOn.test(participant.endpoint())) continue;
            --blockFor;
        }
        this.latch = new CountDownLatch(Math.max(blockFor, 0));
    }

    @VisibleForTesting
    long waitingOn() {
        return this.latch.getCount();
    }

    @VisibleForTesting
    void ack(InetAddressAndPort from) {
        if (this.shouldBlockOn.test(from)) {
            this.pendingRepairs.remove(((ReplicaPlan.ForRead)this.replicaPlan).getReplicaFor(from));
            this.latch.countDown();
        }
    }

    @Override
    public void onResponse(Message<Object> msg) {
        this.ack(msg.from());
    }

    private static PartitionUpdate extractUpdate(Mutation mutation) {
        return (PartitionUpdate)Iterables.getOnlyElement(mutation.getPartitionUpdates());
    }

    private PartitionUpdate mergeUnackedUpdates() {
        ArrayList updates = Lists.newArrayList((Iterable)Iterables.transform(this.pendingRepairs.values(), BlockingPartitionRepair::extractUpdate));
        return updates.isEmpty() ? null : PartitionUpdate.merge(updates);
    }

    @VisibleForTesting
    protected void sendRR(Message<Mutation> message, InetAddressAndPort endpoint) {
        MessagingService.instance().sendWithCallback(message, endpoint, this);
    }

    public void sendInitialRepairs() {
        this.mutationsSentTime = System.nanoTime();
        Replicas.assertFull(this.pendingRepairs.keySet());
        for (Map.Entry<Replica, Mutation> entry : this.pendingRepairs.entrySet()) {
            Replica destination = entry.getKey();
            Preconditions.checkArgument((boolean)destination.isFull(), (String)"Can't send repairs to transient replicas: %s", (Object)destination);
            Mutation mutation = entry.getValue();
            TableId tableId = BlockingPartitionRepair.extractUpdate((Mutation)mutation).metadata().id;
            Tracing.trace("Sending read-repair-mutation to {}", (Object)destination);
            this.sendRR(Message.out(Verb.READ_REPAIR_REQ, mutation), destination.endpoint());
            ColumnFamilyStore.metricsFor((TableId)tableId).readRepairRequests.mark();
            if (!this.shouldBlockOn.test(destination.endpoint())) {
                this.pendingRepairs.remove(destination);
            }
            ReadRepairDiagnostics.sendInitialRepair(this, destination.endpoint(), mutation);
        }
    }

    public boolean awaitRepairs(long timeout, TimeUnit timeoutUnit) {
        long elapsed = System.nanoTime() - this.mutationsSentTime;
        long remaining = timeoutUnit.toNanos(timeout) - elapsed;
        try {
            return this.latch.await(remaining, TimeUnit.NANOSECONDS);
        }
        catch (InterruptedException e) {
            throw new AssertionError((Object)e);
        }
    }

    private static int msgVersionIdx(int version) {
        return version - 10;
    }

    public void maybeSendAdditionalWrites(long timeout, TimeUnit timeoutUnit) {
        if (this.awaitRepairs(timeout, timeoutUnit)) {
            return;
        }
        Object newCandidates = ((ReplicaPlan.ForRead)this.replicaPlan).uncontactedCandidates();
        if (((AbstractReplicaCollection)newCandidates).isEmpty()) {
            return;
        }
        PartitionUpdate update = this.mergeUnackedUpdates();
        if (update == null) {
            return;
        }
        ReadRepairMetrics.speculatedWrite.mark();
        Mutation[] versionedMutations = new Mutation[BlockingPartitionRepair.msgVersionIdx(12) + 1];
        for (Replica replica : newCandidates) {
            int versionIdx = BlockingPartitionRepair.msgVersionIdx(MessagingService.instance().versions.get(replica.endpoint()));
            Mutation mutation = versionedMutations[versionIdx];
            if (mutation == null) {
                versionedMutations[versionIdx] = mutation = BlockingReadRepairs.createRepairMutation(update, ((ReplicaPlan)this.replicaPlan).consistencyLevel(), replica.endpoint(), true);
            }
            if (mutation == null) {
                ReadRepairDiagnostics.speculatedWriteOversized(this, replica.endpoint());
                continue;
            }
            Tracing.trace("Sending speculative read-repair-mutation to {}", (Object)replica);
            this.sendRR(Message.out(Verb.READ_REPAIR_REQ, mutation), replica.endpoint());
            ReadRepairDiagnostics.speculatedWrite(this, replica.endpoint(), mutation);
        }
    }

    Keyspace getKeyspace() {
        return ((ReplicaPlan)this.replicaPlan).keyspace();
    }

    DecoratedKey getKey() {
        return this.key;
    }

    ConsistencyLevel getConsistency() {
        return ((ReplicaPlan)this.replicaPlan).consistencyLevel();
    }
}

