package org.apache.cassandra.db;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/db/BatchlogManager.class */
public class BatchlogManager implements BatchlogManagerMBean {
    private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
    private static final long REPLAY_INTERVAL = 60000;
    private static final int PAGE_SIZE = 128;
    private final AtomicLong totalBatchesReplayed = new AtomicLong();
    private final AtomicBoolean isReplaying = new AtomicBoolean();
    private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
    public static final BatchlogManager instance = new BatchlogManager();
    public static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");

    public void start() {
        try {
            ManagementFactory.getPlatformMBeanServer().registerMBean(this, new ObjectName(MBEAN_NAME));
            batchlogTasks.scheduleWithFixedDelay(new WrappedRunnable() { // from class: org.apache.cassandra.db.BatchlogManager.1
                @Override // org.apache.cassandra.utils.WrappedRunnable
                public void runMayThrow() throws ExecutionException, InterruptedException {
                    BatchlogManager.this.replayAllFailedBatches();
                }
            }, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.cassandra.db.BatchlogManagerMBean
    public int countAllBatches() {
        return (int) process("SELECT count(*) FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF).one().getLong("count");
    }

    @Override // org.apache.cassandra.db.BatchlogManagerMBean
    public long getTotalBatchesReplayed() {
        return this.totalBatchesReplayed.longValue();
    }

    @Override // org.apache.cassandra.db.BatchlogManagerMBean
    public void forceBatchlogReplay() {
        batchlogTasks.execute(new WrappedRunnable() { // from class: org.apache.cassandra.db.BatchlogManager.2
            @Override // org.apache.cassandra.utils.WrappedRunnable
            public void runMayThrow() throws ExecutionException, InterruptedException {
                BatchlogManager.this.replayAllFailedBatches();
            }
        });
    }

    public static RowMutation getBatchlogMutationFor(Collection<RowMutation> collection, UUID uuid) {
        return getBatchlogMutationFor(collection, uuid, FBUtilities.timestampMicros());
    }

    @VisibleForTesting
    static RowMutation getBatchlogMutationFor(Collection<RowMutation> collection, UUID uuid, long j) {
        ByteBuffer decompose = LongType.instance.decompose(Long.valueOf(j / 1000));
        ByteBuffer serializeRowMutations = serializeRowMutations(collection);
        ArrayBackedSortedColumns create = ArrayBackedSortedColumns.factory.create(CFMetaData.BatchlogCf);
        create.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, j));
        create.addColumn(new Column(columnName("data"), serializeRowMutations, j));
        create.addColumn(new Column(columnName("written_at"), decompose, j));
        return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), create);
    }

    private static ByteBuffer serializeRowMutations(Collection<RowMutation> collection) {
        FastByteArrayOutputStream fastByteArrayOutputStream = new FastByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(fastByteArrayOutputStream);
        try {
            dataOutputStream.writeInt(collection.size());
            Iterator<RowMutation> it = collection.iterator();
            while (it.hasNext()) {
                RowMutation.serializer.serialize(it.next(), (DataOutput) dataOutputStream, 6);
            }
            return ByteBuffer.wrap(fastByteArrayOutputStream.toByteArray());
        } catch (IOException e) {
            throw new AssertionError();
        }
    }

    @VisibleForTesting
    void replayAllFailedBatches() throws ExecutionException, InterruptedException {
        if (this.isReplaying.compareAndSet(false, true)) {
            logger.debug("Started replayAllFailedBatches");
            RateLimiter create = RateLimiter.create(DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size() == 0 ? Double.MAX_VALUE : r0 * 1024);
            try {
                UntypedResultSet process = process("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF, 128);
                while (!process.isEmpty()) {
                    UUID processBatchlogPage = processBatchlogPage(process, create);
                    if (process.size() < 128) {
                        break;
                    } else {
                        process = process("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(%s) LIMIT %d", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF, processBatchlogPage, 128);
                    }
                }
                cleanup();
                this.isReplaying.set(false);
                logger.debug("Finished replayAllFailedBatches");
            } catch (Throwable th) {
                this.isReplaying.set(false);
                throw th;
            }
        }
    }

    private UUID processBatchlogPage(UntypedResultSet untypedResultSet, RateLimiter rateLimiter) {
        UUID uuid = null;
        Iterator<UntypedResultSet.Row> it = untypedResultSet.iterator();
        while (it.hasNext()) {
            UntypedResultSet.Row next = it.next();
            uuid = next.getUUID("id");
            long j = next.getLong("written_at");
            int i = next.has("version") ? next.getInt("version") : 6;
            if (System.currentTimeMillis() >= j + (DatabaseDescriptor.getWriteRpcTimeout() * 2)) {
                replayBatch(uuid, next.getBytes("data"), j, i, rateLimiter);
            }
        }
        return uuid;
    }

    private void replayBatch(UUID uuid, ByteBuffer byteBuffer, long j, int i, RateLimiter rateLimiter) {
        logger.debug("Replaying batch {}", uuid);
        try {
            replaySerializedMutations(byteBuffer, j, i, rateLimiter);
        } catch (IOException e) {
            logger.warn("Skipped batch replay of {} due to {}", uuid, e);
        }
        deleteBatch(uuid);
        this.totalBatchesReplayed.incrementAndGet();
    }

    private void deleteBatch(UUID uuid) {
        RowMutation rowMutation = new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid));
        rowMutation.delete(SystemKeyspace.BATCHLOG_CF, FBUtilities.timestampMicros());
        rowMutation.apply();
    }

    private void replaySerializedMutations(ByteBuffer byteBuffer, long j, int i, RateLimiter rateLimiter) throws IOException {
        DataInputStream dataInputStream = new DataInputStream(ByteBufferUtil.inputStream(byteBuffer));
        int readInt = dataInputStream.readInt();
        ArrayList arrayList = new ArrayList(readInt);
        for (int i2 = 0; i2 < readInt; i2++) {
            RowMutation deserialize = RowMutation.serializer.deserialize((DataInput) dataInputStream, i);
            for (UUID uuid : deserialize.getColumnFamilyIds()) {
                if (j <= SystemKeyspace.getTruncatedAt(uuid)) {
                    deserialize = deserialize.without(uuid);
                }
            }
            if (!deserialize.isEmpty()) {
                arrayList.add(deserialize);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        replayMutations(arrayList, j, i, rateLimiter);
    }

    private void replayMutations(List<RowMutation> list, long j, int i, RateLimiter rateLimiter) throws IOException {
        int calculateHintTTL = calculateHintTTL(list, j);
        if (calculateHintTTL <= 0) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (RowMutation rowMutation : list) {
            String keyspaceName = rowMutation.getKeyspaceName();
            Token token = StorageService.getPartitioner().getToken(rowMutation.key());
            int serializedSize = (int) RowMutation.serializer.serializedSize(rowMutation, i);
            for (InetAddress inetAddress : Iterables.concat(StorageService.instance.getNaturalEndpoints(keyspaceName, token), StorageService.instance.getTokenMetadata().pendingEndpointsFor(token, keyspaceName))) {
                rateLimiter.acquire(serializedSize);
                if (inetAddress.equals(FBUtilities.getBroadcastAddress())) {
                    rowMutation.apply();
                } else if (FailureDetector.instance.isAlive(inetAddress)) {
                    arrayList.add(inetAddress);
                } else {
                    arrayList2.add(inetAddress);
                }
            }
            if (!arrayList.isEmpty()) {
                arrayList2.addAll(attemptDirectDelivery(rowMutation, arrayList));
            }
            Iterator it = arrayList2.iterator();
            while (it.hasNext()) {
                StorageProxy.writeHintForMutation(rowMutation, j, calculateHintTTL, (InetAddress) it.next());
            }
            arrayList.clear();
            arrayList2.clear();
        }
    }

    private Set<InetAddress> attemptDirectDelivery(RowMutation rowMutation, List<InetAddress> list) throws IOException {
        ArrayList arrayList = new ArrayList();
        final Set<InetAddress> synchronizedSet = Collections.synchronizedSet(new HashSet());
        for (final InetAddress inetAddress : list) {
            WriteResponseHandler writeResponseHandler = new WriteResponseHandler(inetAddress, WriteType.UNLOGGED_BATCH, new Runnable() { // from class: org.apache.cassandra.db.BatchlogManager.3
                @Override // java.lang.Runnable
                public void run() {
                    synchronizedSet.remove(inetAddress);
                }
            });
            MessagingService.instance().sendRR((MessageOut<? extends IMutation>) rowMutation.createMessage(), inetAddress, (AbstractWriteResponseHandler) writeResponseHandler, false);
            arrayList.add(writeResponseHandler);
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                ((WriteResponseHandler) it.next()).get();
            } catch (WriteTimeoutException e) {
                logger.debug("Timed out replaying a batched mutation to a node, will write a hint");
            }
        }
        return synchronizedSet;
    }

    private int calculateHintTTL(List<RowMutation> list, long j) {
        int i = Integer.MAX_VALUE;
        Iterator<RowMutation> it = list.iterator();
        while (it.hasNext()) {
            i = Math.min(i, HintedHandOffManager.calculateHintTTL(it.next()));
        }
        return i - ((int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - j));
    }

    private static ByteBuffer columnName(String str) {
        return CFMetaData.BatchlogCf.getCfDef().getColumnNameBuilder().add(UTF8Type.instance.decompose(str)).build();
    }

    private void cleanup() throws ExecutionException, InterruptedException {
        ColumnFamilyStore columnFamilyStore = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.BATCHLOG_CF);
        columnFamilyStore.forceBlockingFlush();
        ArrayList arrayList = new ArrayList();
        Iterator<SSTableReader> it = columnFamilyStore.getSSTables().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().descriptor);
        }
        if (arrayList.isEmpty()) {
            return;
        }
        CompactionManager.instance.submitUserDefined(columnFamilyStore, arrayList, CompactionManager.GC_ALL).get();
    }

    private static UntypedResultSet process(String str, Object... objArr) {
        return QueryProcessor.processInternal(String.format(str, objArr));
    }
}
