package org.apache.cassandra.hints;

import com.google.common.util.concurrent.RateLimiter;
import java.io.File;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/cassandra/hints/HintsDispatchExecutor.class */
public final class HintsDispatchExecutor {
    private static final Logger logger = LoggerFactory.getLogger(HintsDispatchExecutor.class);
    private final File hintsDirectory;
    private final ExecutorService executor;
    private final AtomicBoolean isPaused;
    private final Map<UUID, Future> scheduledDispatches = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/hints/HintsDispatchExecutor$DispatchHintsTask.class */
    public final class DispatchHintsTask implements Runnable {
        private final HintsStore store;
        private final UUID hostId;
        private final RateLimiter rateLimiter;

        DispatchHintsTask(HintsStore hintsStore, UUID uuid) {
            this.store = hintsStore;
            this.hostId = uuid;
            this.rateLimiter = RateLimiter.create(DatabaseDescriptor.getHintedHandoffThrottleInKB() / Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1) == 0 ? Double.MAX_VALUE : r0 * 1024);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                dispatch();
            } finally {
                HintsDispatchExecutor.this.scheduledDispatches.remove(this.hostId);
            }
        }

        private void dispatch() {
            HintsDescriptor poll;
            while (!HintsDispatchExecutor.this.isPaused.get() && (poll = this.store.poll()) != null) {
                try {
                    if (!dispatch(poll)) {
                        return;
                    }
                } catch (FSReadError e) {
                    HintsDispatchExecutor.logger.error("Failed to dispatch hints file {}: file is corrupted ({})", poll.fileName(), e);
                    this.store.cleanUp(poll);
                    this.store.blacklist(poll);
                    throw e;
                }
            }
        }

        private boolean dispatch(HintsDescriptor hintsDescriptor) {
            HintsDispatchExecutor.logger.trace("Dispatching hints file {}", hintsDescriptor.fileName());
            InetAddress endpointForHostId = StorageService.instance.getEndpointForHostId(this.hostId);
            if (endpointForHostId != null) {
                return deliver(hintsDescriptor, endpointForHostId);
            }
            convert(hintsDescriptor);
            return true;
        }

        private boolean deliver(HintsDescriptor hintsDescriptor, InetAddress inetAddress) {
            File file = new File(HintsDispatchExecutor.this.hintsDirectory, hintsDescriptor.fileName());
            Long orElse = this.store.getDispatchOffset(hintsDescriptor).orElse(null);
            HintsDispatcher create = HintsDispatcher.create(file, this.rateLimiter, inetAddress, hintsDescriptor.hostId, HintsDispatchExecutor.this.isPaused);
            Throwable th = null;
            try {
                if (orElse != null) {
                    create.seek(orElse.longValue());
                }
                if (create.dispatch()) {
                    this.store.delete(hintsDescriptor);
                    this.store.cleanUp(hintsDescriptor);
                    HintsDispatchExecutor.logger.info("Finished hinted handoff of file {} to endpoint {}: {}", new Object[]{hintsDescriptor.fileName(), inetAddress, this.hostId});
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                    return true;
                }
                this.store.markDispatchOffset(hintsDescriptor, create.dispatchOffset());
                this.store.offerFirst(hintsDescriptor);
                HintsDispatchExecutor.logger.info("Finished hinted handoff of file {} to endpoint {}: {}, partially", new Object[]{hintsDescriptor.fileName(), inetAddress, this.hostId});
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        create.close();
                    }
                }
                return false;
            } catch (Throwable th4) {
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th4;
            }
        }

        private void convert(HintsDescriptor hintsDescriptor) {
            HintsReader open = HintsReader.open(new File(HintsDispatchExecutor.this.hintsDirectory, hintsDescriptor.fileName()), this.rateLimiter);
            Throwable th = null;
            try {
                open.forEach(page -> {
                    Iterator<Hint> hintsIterator = page.hintsIterator();
                    HintsService hintsService = HintsService.instance;
                    hintsService.getClass();
                    hintsIterator.forEachRemaining(hintsService::writeForAllReplicas);
                });
                this.store.delete(hintsDescriptor);
                this.store.cleanUp(hintsDescriptor);
                HintsDispatchExecutor.logger.info("Finished converting hints file {}", hintsDescriptor.fileName());
                if (open != null) {
                    if (0 == 0) {
                        open.close();
                        return;
                    }
                    try {
                        open.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        open.close();
                    }
                }
                throw th3;
            }
        }
    }

    /* loaded from: input_file:org/apache/cassandra/hints/HintsDispatchExecutor$TransferHintsTask.class */
    private final class TransferHintsTask implements Runnable {
        private final HintsCatalog catalog;
        private final Supplier<UUID> hostIdSupplier;

        private TransferHintsTask(HintsCatalog hintsCatalog, Supplier<UUID> supplier) {
            this.catalog = hintsCatalog;
            this.hostIdSupplier = supplier;
        }

        @Override // java.lang.Runnable
        public void run() {
            UUID uuid = this.hostIdSupplier.get();
            InetAddress endpointForHostId = StorageService.instance.getEndpointForHostId(uuid);
            HintsDispatchExecutor.logger.info("Transferring all hints to {}: {}", endpointForHostId, uuid);
            if (transfer(uuid)) {
                return;
            }
            HintsDispatchExecutor.logger.warn("Failed to transfer all hints to {}: {}; will retry in {} seconds", new Object[]{endpointForHostId, uuid, 10});
            try {
                TimeUnit.SECONDS.sleep(10L);
                UUID uuid2 = this.hostIdSupplier.get();
                HintsDispatchExecutor.logger.info("Transferring all hints to {}: {}", endpointForHostId, uuid2);
                if (transfer(uuid2)) {
                    return;
                }
                HintsDispatchExecutor.logger.error("Failed to transfer all hints to {}: {}", endpointForHostId, uuid2);
                throw new RuntimeException("Failed to transfer all hints to " + uuid2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        private boolean transfer(UUID uuid) {
            this.catalog.stores().map(hintsStore -> {
                return new DispatchHintsTask(hintsStore, uuid);
            }).forEach((v0) -> {
                v0.run();
            });
            return !this.catalog.hasFiles();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HintsDispatchExecutor(File file, int i, AtomicBoolean atomicBoolean) {
        this.hintsDirectory = file;
        this.isPaused = atomicBoolean;
        this.executor = new JMXEnabledThreadPoolExecutor(i, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new NamedThreadFactory("HintsDispatcher", 1), "internal");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdownBlocking() {
        this.scheduledDispatches.clear();
        this.executor.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isScheduled(HintsStore hintsStore) {
        return this.scheduledDispatches.containsKey(hintsStore.hostId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future dispatch(HintsStore hintsStore) {
        return dispatch(hintsStore, hintsStore.hostId);
    }

    Future dispatch(HintsStore hintsStore, UUID uuid) {
        return this.scheduledDispatches.computeIfAbsent(uuid, uuid2 -> {
            return this.executor.submit(new DispatchHintsTask(hintsStore, uuid));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future transfer(HintsCatalog hintsCatalog, Supplier<UUID> supplier) {
        return this.executor.submit(new TransferHintsTask(hintsCatalog, supplier));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completeDispatchBlockingly(HintsStore hintsStore) {
        Future future = this.scheduledDispatches.get(hintsStore.hostId);
        if (future != null) {
            try {
                future.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void interruptDispatch(UUID uuid) {
        Future remove = this.scheduledDispatches.remove(uuid);
        if (null != remove) {
            remove.cancel(true);
        }
    }
}
