package org.apache.cassandra.repair;

import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.TraceKeyspace;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.progress.ProgressEvent;
import org.apache.cassandra.utils.progress.ProgressEventNotifier;
import org.apache.cassandra.utils.progress.ProgressEventType;
import org.apache.cassandra.utils.progress.ProgressListener;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.cassandraunit.shaded.com.google.common.collect.ImmutableMap;
import org.cassandraunit.shaded.com.google.common.collect.Iterables;
import org.cassandraunit.shaded.com.google.common.collect.Lists;
import org.cassandraunit.shaded.com.google.common.util.concurrent.AsyncFunction;
import org.cassandraunit.shaded.com.google.common.util.concurrent.FutureCallback;
import org.cassandraunit.shaded.com.google.common.util.concurrent.Futures;
import org.cassandraunit.shaded.com.google.common.util.concurrent.ListenableFuture;
import org.cassandraunit.shaded.com.google.common.util.concurrent.ListeningExecutorService;
import org.cassandraunit.shaded.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/repair/RepairRunnable.class */
public class RepairRunnable extends WrappedRunnable implements ProgressEventNotifier {
    private StorageService storageService;
    private final int cmd;
    private final RepairOption options;
    private final String keyspace;
    private final List<ProgressListener> listeners = new ArrayList();
    private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
    private static final AtomicInteger threadCounter = new AtomicInteger(1);

    public RepairRunnable(StorageService storageService, int i, RepairOption repairOption, String str) {
        this.storageService = storageService;
        this.cmd = i;
        this.options = repairOption;
        this.keyspace = str;
    }

    @Override // org.apache.cassandra.utils.progress.ProgressEventNotifier
    public void addProgressListener(ProgressListener progressListener) {
        this.listeners.add(progressListener);
    }

    @Override // org.apache.cassandra.utils.progress.ProgressEventNotifier
    public void removeProgressListener(ProgressListener progressListener) {
        this.listeners.remove(progressListener);
    }

    protected void fireProgressEvent(String str, ProgressEvent progressEvent) {
        Iterator<ProgressListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().progress(str, progressEvent);
        }
    }

    protected void fireErrorAndComplete(String str, int i, int i2, String str2) {
        fireProgressEvent(str, new ProgressEvent(ProgressEventType.ERROR, i, i2, String.format("Repair command #%d failed with error %s", Integer.valueOf(this.cmd), str2)));
        fireProgressEvent(str, new ProgressEvent(ProgressEventType.COMPLETE, i, i2, String.format("Repair command #%d finished with error", Integer.valueOf(this.cmd))));
    }

    @Override // org.apache.cassandra.utils.WrappedRunnable
    protected void runMayThrow() throws Exception {
        TraceState traceState;
        final UUID timeUUID = UUIDGen.getTimeUUID();
        final String str = "repair:" + this.cmd;
        final AtomicInteger atomicInteger = new AtomicInteger();
        final int size = 4 + this.options.getRanges().size();
        try {
            Iterable<ColumnFamilyStore> validColumnFamilies = this.storageService.getValidColumnFamilies(false, false, this.keyspace, (String[]) this.options.getColumnFamilies().toArray(new String[this.options.getColumnFamilies().size()]));
            atomicInteger.incrementAndGet();
            final long currentTimeMillis = System.currentTimeMillis();
            String format = String.format("Starting repair command #%d (%s), repairing keyspace %s with %s", Integer.valueOf(this.cmd), timeUUID, this.keyspace, this.options);
            logger.info(format);
            if (this.options.isTraced()) {
                StringBuilder sb = new StringBuilder();
                for (ColumnFamilyStore columnFamilyStore : validColumnFamilies) {
                    sb.append(", ").append(columnFamilyStore.keyspace.getName()).append(Directories.SECONDARY_INDEX_NAME_SEPARATOR).append(columnFamilyStore.name);
                }
                UUID newSession = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
                traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", this.keyspace, RepairOption.COLUMNFAMILIES_KEY, sb.substring(2)));
                String str2 = format + " tracing with " + newSession;
                fireProgressEvent(str, new ProgressEvent(ProgressEventType.START, 0, 100, str2));
                Tracing.traceRepair(str2, new Object[0]);
                traceState.enableActivityNotification(str);
                Iterator<ProgressListener> it = this.listeners.iterator();
                while (it.hasNext()) {
                    traceState.addProgressListener(it.next());
                }
                Thread createQueryThread = createQueryThread(this.cmd, newSession);
                createQueryThread.setName("RepairTracePolling");
                createQueryThread.start();
            } else {
                fireProgressEvent(str, new ProgressEvent(ProgressEventType.START, 0, 100, format));
                traceState = null;
            }
            final HashSet hashSet = new HashSet();
            ArrayList arrayList = new ArrayList();
            Collection<Range<Token>> localRanges = this.storageService.getLocalRanges(this.keyspace);
            try {
                for (Range<Token> range : this.options.getRanges()) {
                    Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(this.keyspace, localRanges, range, this.options.getDataCenters(), this.options.getHosts());
                    addRangeToNeighbors(arrayList, range, neighbors);
                    hashSet.addAll(neighbors);
                }
                atomicInteger.incrementAndGet();
                ArrayList arrayList2 = new ArrayList();
                try {
                    Iterables.addAll(arrayList2, validColumnFamilies);
                    atomicInteger.incrementAndGet();
                    String[] strArr = new String[arrayList2.size()];
                    for (int i = 0; i < arrayList2.size(); i++) {
                        strArr[i] = ((ColumnFamilyStore) arrayList2.get(i)).name;
                    }
                    SystemDistributedKeyspace.startParentRepair(timeUUID, this.keyspace, strArr, this.options);
                    try {
                        ActiveRepairService.instance.prepareForRepair(timeUUID, FBUtilities.getBroadcastAddress(), hashSet, this.options, arrayList2);
                        long repairedAt = ActiveRepairService.instance.getParentRepairSession(timeUUID).getRepairedAt();
                        atomicInteger.incrementAndGet();
                        final ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(this.options.getJobThreads(), 2147483647L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory("Repair#" + this.cmd), "internal"));
                        ArrayList arrayList3 = new ArrayList(this.options.getRanges().size());
                        for (Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> pair : arrayList) {
                            final RepairSession submitRepairSession = ActiveRepairService.instance.submitRepairSession(timeUUID, (Collection) pair.right, this.keyspace, this.options.getParallelism(), pair.left, repairedAt, this.options.isPullRepair(), listeningDecorator, strArr);
                            if (submitRepairSession != null) {
                                Futures.addCallback(submitRepairSession, new FutureCallback<RepairSessionResult>() { // from class: org.apache.cassandra.repair.RepairRunnable.1
                                    @Override // org.cassandraunit.shaded.com.google.common.util.concurrent.FutureCallback
                                    public void onSuccess(RepairSessionResult repairSessionResult) {
                                        String format2 = String.format("Repair session %s for range %s finished", submitRepairSession.getId(), submitRepairSession.getRanges().toString());
                                        RepairRunnable.logger.info(format2);
                                        RepairRunnable.this.fireProgressEvent(str, new ProgressEvent(ProgressEventType.PROGRESS, atomicInteger.incrementAndGet(), size, format2));
                                    }

                                    @Override // org.cassandraunit.shaded.com.google.common.util.concurrent.FutureCallback
                                    public void onFailure(Throwable th) {
                                        String format2 = String.format("Repair session %s for range %s failed with error %s", submitRepairSession.getId(), submitRepairSession.getRanges().toString(), th.getMessage());
                                        RepairRunnable.logger.error(format2, th);
                                        RepairRunnable.this.fireProgressEvent(str, new ProgressEvent(ProgressEventType.PROGRESS, atomicInteger.incrementAndGet(), size, format2));
                                    }
                                });
                                arrayList3.add(submitRepairSession);
                            }
                        }
                        final ArrayList arrayList4 = new ArrayList();
                        final AtomicBoolean atomicBoolean = new AtomicBoolean();
                        final TraceState traceState2 = traceState;
                        Futures.addCallback(Futures.transform(Futures.successfulAsList(arrayList3), new AsyncFunction<List<RepairSessionResult>, Object>() { // from class: org.apache.cassandra.repair.RepairRunnable.2
                            @Override // org.cassandraunit.shaded.com.google.common.util.concurrent.AsyncFunction
                            public ListenableFuture apply(List<RepairSessionResult> list) {
                                for (RepairSessionResult repairSessionResult : list) {
                                    if (repairSessionResult != null) {
                                        arrayList4.addAll(repairSessionResult.ranges);
                                    } else {
                                        atomicBoolean.compareAndSet(false, true);
                                    }
                                }
                                return ActiveRepairService.instance.finishParentSession(timeUUID, hashSet, arrayList4);
                            }
                        }), new FutureCallback<Object>() { // from class: org.apache.cassandra.repair.RepairRunnable.3
                            @Override // org.cassandraunit.shaded.com.google.common.util.concurrent.FutureCallback
                            public void onSuccess(Object obj) {
                                SystemDistributedKeyspace.successfulParentRepair(timeUUID, arrayList4);
                                if (atomicBoolean.get()) {
                                    RepairRunnable.this.fireProgressEvent(str, new ProgressEvent(ProgressEventType.ERROR, atomicInteger.get(), size, "Some repair failed"));
                                } else {
                                    RepairRunnable.this.fireProgressEvent(str, new ProgressEvent(ProgressEventType.SUCCESS, atomicInteger.get(), size, "Repair completed successfully"));
                                }
                                repairComplete();
                            }

                            @Override // org.cassandraunit.shaded.com.google.common.util.concurrent.FutureCallback
                            public void onFailure(Throwable th) {
                                RepairRunnable.this.fireProgressEvent(str, new ProgressEvent(ProgressEventType.ERROR, atomicInteger.get(), size, th.getMessage()));
                                SystemDistributedKeyspace.failParentRepair(timeUUID, th);
                                repairComplete();
                            }

                            private void repairComplete() {
                                String format2 = String.format("Repair command #%d finished in %s", Integer.valueOf(RepairRunnable.this.cmd), DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - currentTimeMillis, true, true));
                                RepairRunnable.this.fireProgressEvent(str, new ProgressEvent(ProgressEventType.COMPLETE, atomicInteger.get(), size, format2));
                                RepairRunnable.logger.info(format2);
                                if (RepairRunnable.this.options.isTraced() && traceState2 != null) {
                                    Iterator it2 = RepairRunnable.this.listeners.iterator();
                                    while (it2.hasNext()) {
                                        traceState2.removeProgressListener((ProgressListener) it2.next());
                                    }
                                    Tracing.instance.set(traceState2);
                                    Tracing.traceRepair(format2, new Object[0]);
                                    Tracing.instance.stopSession();
                                }
                                listeningDecorator.shutdownNow();
                            }
                        });
                    } catch (Throwable th) {
                        SystemDistributedKeyspace.failParentRepair(timeUUID, th);
                        fireErrorAndComplete(str, atomicInteger.get(), size, th.getMessage());
                    }
                } catch (IllegalArgumentException e) {
                    fireErrorAndComplete(str, atomicInteger.get(), size, e.getMessage());
                }
            } catch (IllegalArgumentException e2) {
                logger.error("Repair failed:", e2);
                fireErrorAndComplete(str, atomicInteger.get(), size, e2.getMessage());
            }
        } catch (IllegalArgumentException e3) {
            logger.error("Repair failed:", e3);
            fireErrorAndComplete(str, atomicInteger.get(), size, e3.getMessage());
        }
    }

    private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> list, Range<Token> range, Set<InetAddress> set) {
        for (int i = 0; i < list.size(); i++) {
            Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> pair = list.get(i);
            if (pair.left.containsAll(set)) {
                ((Collection) pair.right).add(range);
                return;
            }
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(range);
        list.add(Pair.create(set, arrayList));
    }

    private Thread createQueryThread(final int i, final UUID uuid) {
        return NamedThreadFactory.createThread(new WrappedRunnable() { // from class: org.apache.cassandra.repair.RepairRunnable.4
            @Override // org.apache.cassandra.utils.WrappedRunnable
            public void runMayThrow() throws Exception {
                TraceState traceState = Tracing.instance.get(uuid);
                if (traceState == null) {
                    throw new Exception("no tracestate");
                }
                SelectStatement selectStatement = (SelectStatement) QueryProcessor.parseStatement(String.format("select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;", SchemaConstants.TRACE_KEYSPACE_NAME, TraceKeyspace.EVENTS)).prepare(ClientState.forInternalCalls()).statement;
                ByteBuffer bytes = ByteBufferUtil.bytes(uuid);
                InetAddress broadcastAddress = FBUtilities.getBroadcastAddress();
                HashSet[] hashSetArr = {new HashSet(), new HashSet()};
                char c = 0;
                long currentTimeMillis = System.currentTimeMillis();
                long j = 125;
                boolean z = false;
                while (true) {
                    TraceState.Status waitActivity = traceState.waitActivity(j);
                    if (waitActivity == TraceState.Status.STOPPED) {
                        return;
                    }
                    if (waitActivity == TraceState.Status.IDLE) {
                        j = z ? Math.min(j * 2, 1024000L) : j;
                        z = !z;
                    } else {
                        j = 125;
                        z = false;
                    }
                    ByteBuffer bytes2 = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(currentTimeMillis - 1000));
                    long currentTimeMillis2 = System.currentTimeMillis();
                    Iterator<UntypedResultSet.Row> it = UntypedResultSet.create(selectStatement.execute(QueryState.forInternalCalls(), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(bytes, bytes2, ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(currentTimeMillis2)))), System.nanoTime()).result).iterator();
                    while (it.hasNext()) {
                        UntypedResultSet.Row next = it.next();
                        if (!broadcastAddress.equals(next.getInetAddress("source"))) {
                            UUID uuid2 = next.getUUID("event_id");
                            if (uuid2.timestamp() > (currentTimeMillis2 - 1000) * 10000) {
                                hashSetArr[c].add(uuid2);
                            }
                            if (!hashSetArr[c == 0 ? (char) 1 : (char) 0].contains(uuid2)) {
                                RepairRunnable.this.fireProgressEvent("repair:" + i, new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, String.format("%s: %s", next.getInetAddress("source"), next.getString("activity"))));
                            }
                        }
                    }
                    currentTimeMillis = currentTimeMillis2;
                    c = c == 0 ? (char) 1 : (char) 0;
                    hashSetArr[c].clear();
                }
            }
        }, "Repair-Runnable-" + threadCounter.incrementAndGet());
    }
}
