package org.apache.cassandra.hints;

import com.google.common.util.concurrent.RateLimiter;
import java.io.File;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/cassandra/hints/HintsDispatchExecutor.class */
public final class HintsDispatchExecutor {
    private static final Logger logger = LoggerFactory.getLogger(HintsDispatchExecutor.class);
    private final File hintsDirectory;
    private final ExecutorService executor;
    private final AtomicBoolean isPaused;
    private final Map<UUID, Future> scheduledDispatches = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/cassandra/hints/HintsDispatchExecutor$DispatchHintsTask.class */
    private final class DispatchHintsTask implements Runnable {
        private final HintsStore store;
        private final UUID hostId;
        private final RateLimiter rateLimiter;

        DispatchHintsTask(HintsStore hintsStore, UUID uuid) {
            this.store = hintsStore;
            this.hostId = uuid;
            this.rateLimiter = RateLimiter.create(DatabaseDescriptor.getHintedHandoffThrottleInKB() / Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1) == 0 ? Double.MAX_VALUE : r0 * 1024);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                dispatch();
            } finally {
                HintsDispatchExecutor.this.scheduledDispatches.remove(this.hostId);
            }
        }

        private void dispatch() {
            HintsDescriptor poll;
            while (!HintsDispatchExecutor.this.isPaused.get() && (poll = this.store.poll()) != null) {
                try {
                    dispatch(poll);
                } catch (FSReadError e) {
                    HintsDispatchExecutor.logger.error("Failed to dispatch hints file {}: file is corrupted ({})", poll.fileName(), e);
                    this.store.cleanUp(poll);
                    this.store.blacklist(poll);
                    throw e;
                }
            }
        }

        private void dispatch(HintsDescriptor hintsDescriptor) {
            HintsDispatchExecutor.logger.debug("Dispatching hints file {}", hintsDescriptor.fileName());
            File file = new File(HintsDispatchExecutor.this.hintsDirectory, hintsDescriptor.fileName());
            Long orElse = this.store.getDispatchOffset(hintsDescriptor).orElse(null);
            HintsDispatcher create = HintsDispatcher.create(file, this.rateLimiter, this.hostId, HintsDispatchExecutor.this.isPaused);
            Throwable th = null;
            try {
                if (orElse != null) {
                    create.seek(orElse.longValue());
                }
                if (create.dispatch()) {
                    if (!file.delete()) {
                        HintsDispatchExecutor.logger.error("Failed to delete hints file {}", hintsDescriptor.fileName());
                    }
                    this.store.cleanUp(hintsDescriptor);
                    HintsDispatchExecutor.logger.info("Finished hinted handoff of file {} to endpoint {}", hintsDescriptor.fileName(), this.hostId);
                } else {
                    this.store.markDispatchOffset(hintsDescriptor, create.dispatchOffset());
                    this.store.offerFirst(hintsDescriptor);
                    HintsDispatchExecutor.logger.info("Finished hinted handoff of file {} to endpoint {}, partially", hintsDescriptor.fileName(), this.hostId);
                }
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HintsDispatchExecutor(File file, int i, AtomicBoolean atomicBoolean) {
        this.hintsDirectory = file;
        this.isPaused = atomicBoolean;
        this.executor = new JMXEnabledThreadPoolExecutor(1, i, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new NamedThreadFactory("HintsDispatcher", 1), "internal");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdownBlocking() {
        this.scheduledDispatches.clear();
        this.executor.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isScheduled(HintsStore hintsStore) {
        return this.scheduledDispatches.containsKey(hintsStore.hostId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future dispatch(HintsStore hintsStore) {
        return dispatch(hintsStore, hintsStore.hostId);
    }

    Future dispatch(HintsStore hintsStore, UUID uuid) {
        return this.scheduledDispatches.computeIfAbsent(hintsStore.hostId, uuid2 -> {
            return this.executor.submit(new DispatchHintsTask(hintsStore, uuid));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completeDispatchBlockingly(HintsStore hintsStore) {
        Future future = this.scheduledDispatches.get(hintsStore.hostId);
        if (future != null) {
            try {
                future.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
