package org.neo4j.kernel.impl.api.index;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.neo4j.function.Predicates;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException;
import org.neo4j.kernel.api.index.IndexConfiguration;
import org.neo4j.kernel.api.index.IndexDescriptor;
import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.NodePropertyUpdate;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.impl.api.index.MultipleIndexPopulator;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.schema.PopulationProgress;
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;

/* loaded from: input_file:org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.class */
public class BatchingMultipleIndexPopulator extends MultipleIndexPopulator {
    public static final String QUEUE_THRESHOLD_NAME = "queue_threshold";
    static final String TASK_QUEUE_SIZE_NAME = "task_queue_size";
    static final String AWAIT_TIMEOUT_MINUTES_NAME = "await_timeout_minutes";
    static final String BATCH_SIZE_NAME = "batch_size";
    private static final String EOL = System.lineSeparator();
    private static final String FLUSH_THREAD_NAME_PREFIX = "Index Population Flush Thread";
    private final int QUEUE_THRESHOLD;
    private final int TASK_QUEUE_SIZE;
    private final int AWAIT_TIMEOUT_MINUTES;
    private final int BATCH_SIZE;
    private final AtomicLong activeTasks;
    private final ExecutorService executor;
    private final Map<MultipleIndexPopulator.IndexPopulation, List<NodePropertyUpdate>> batchedUpdates;

    /* loaded from: input_file:org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator$BatchingIndexPopulation.class */
    private class BatchingIndexPopulation extends MultipleIndexPopulator.IndexPopulation {
        BatchingIndexPopulation(IndexPopulator indexPopulator, IndexDescriptor indexDescriptor, IndexConfiguration indexConfiguration, SchemaIndexProvider.Descriptor descriptor, FlippableIndexProxy flippableIndexProxy, FailedIndexProxyFactory failedIndexProxyFactory, String str) {
            super(indexPopulator, indexDescriptor, indexConfiguration, descriptor, flippableIndexProxy, failedIndexProxyFactory, str);
        }

        @Override // org.neo4j.kernel.impl.api.index.MultipleIndexPopulator.IndexPopulation
        protected void addApplicable(Collection<NodePropertyUpdate> collection) throws IOException, IndexEntryConflictException {
            BatchingMultipleIndexPopulator.this.batchUpdate(this, collection);
        }
    }

    /* loaded from: input_file:org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator$BatchingStoreScan.class */
    private class BatchingStoreScan<E extends Exception> implements StoreScan<E> {
        final StoreScan<E> delegate;

        BatchingStoreScan(StoreScan<E> storeScan) {
            this.delegate = storeScan;
        }

        @Override // org.neo4j.kernel.impl.api.index.StoreScan
        public void run() throws Exception {
            try {
                this.delegate.run();
                BatchingMultipleIndexPopulator.this.log.info("Completed node store scan. Flushing all pending deletes." + BatchingMultipleIndexPopulator.EOL + this);
                BatchingMultipleIndexPopulator.this.flushAll();
                BatchingMultipleIndexPopulator.this.shutdownExecutor(false);
            } catch (Throwable th) {
                try {
                    BatchingMultipleIndexPopulator.this.shutdownExecutor(true);
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }

        @Override // org.neo4j.kernel.impl.api.index.StoreScan
        public void stop() {
            this.delegate.stop();
        }

        @Override // org.neo4j.kernel.impl.api.index.StoreScan
        public PopulationProgress getProgress() {
            return this.delegate.getProgress();
        }
    }

    public BatchingMultipleIndexPopulator(IndexStoreView indexStoreView, LogProvider logProvider) {
        super(indexStoreView, logProvider);
        this.QUEUE_THRESHOLD = FeatureToggles.getInteger(getClass(), QUEUE_THRESHOLD_NAME, 20000);
        this.TASK_QUEUE_SIZE = FeatureToggles.getInteger(getClass(), TASK_QUEUE_SIZE_NAME, getNumberOfPopulationWorkers() * 2);
        this.AWAIT_TIMEOUT_MINUTES = FeatureToggles.getInteger(getClass(), AWAIT_TIMEOUT_MINUTES_NAME, 30);
        this.BATCH_SIZE = FeatureToggles.getInteger(getClass(), BATCH_SIZE_NAME, 10000);
        this.activeTasks = new AtomicLong();
        this.batchedUpdates = new HashMap();
        this.executor = createThreadPool();
    }

    BatchingMultipleIndexPopulator(IndexStoreView indexStoreView, ExecutorService executorService, LogProvider logProvider) {
        super(indexStoreView, logProvider);
        this.QUEUE_THRESHOLD = FeatureToggles.getInteger(getClass(), QUEUE_THRESHOLD_NAME, 20000);
        this.TASK_QUEUE_SIZE = FeatureToggles.getInteger(getClass(), TASK_QUEUE_SIZE_NAME, getNumberOfPopulationWorkers() * 2);
        this.AWAIT_TIMEOUT_MINUTES = FeatureToggles.getInteger(getClass(), AWAIT_TIMEOUT_MINUTES_NAME, 30);
        this.BATCH_SIZE = FeatureToggles.getInteger(getClass(), BATCH_SIZE_NAME, 10000);
        this.activeTasks = new AtomicLong();
        this.batchedUpdates = new HashMap();
        this.executor = executorService;
    }

    @Override // org.neo4j.kernel.impl.api.index.MultipleIndexPopulator
    public StoreScan<IndexPopulationFailedKernelException> indexAllNodes() {
        return new BatchingStoreScan(super.indexAllNodes());
    }

    @Override // org.neo4j.kernel.impl.api.index.MultipleIndexPopulator
    protected MultipleIndexPopulator.IndexPopulation createPopulation(IndexPopulator indexPopulator, IndexDescriptor indexDescriptor, IndexConfiguration indexConfiguration, SchemaIndexProvider.Descriptor descriptor, FlippableIndexProxy flippableIndexProxy, FailedIndexProxyFactory failedIndexProxyFactory, String str) {
        return new BatchingIndexPopulation(indexPopulator, indexDescriptor, indexConfiguration, descriptor, flippableIndexProxy, failedIndexProxyFactory, str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.neo4j.kernel.impl.api.index.MultipleIndexPopulator
    public void populateFromQueue(long j) {
        if (this.queue.size() >= this.QUEUE_THRESHOLD) {
            this.log.debug("Populating from queue." + EOL + this);
            flushAll();
            awaitCompletion();
            super.populateFromQueue(j);
            this.log.debug("Drained queue and all batched updates." + EOL + this);
        }
    }

    public String toString() {
        return "BatchingMultipleIndexPopulator{activeTasks=" + this.activeTasks + ", executor=" + this.executor + ", batchedUpdates = " + ((String) this.batchedUpdates.entrySet().stream().map(entry -> {
            return entry.getKey() + " - " + ((List) entry.getValue()).size() + " updates";
        }).collect(Collectors.joining(", ", "[", "]"))) + ", queuedUpdates = " + this.queue.size() + "}";
    }

    private void awaitCompletion() {
        try {
            this.log.debug("Waiting " + this.AWAIT_TIMEOUT_MINUTES + " minutes for all submitted and active flush tasks to complete." + EOL + this);
            Predicates.await(() -> {
                return Boolean.valueOf(this.activeTasks.get() == 0);
            }, this.AWAIT_TIMEOUT_MINUTES, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            handleInterrupt();
        } catch (TimeoutException e2) {
            handleTimeout();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void batchUpdate(MultipleIndexPopulator.IndexPopulation indexPopulation, Collection<NodePropertyUpdate> collection) {
        List<NodePropertyUpdate> computeIfAbsent = this.batchedUpdates.computeIfAbsent(indexPopulation, indexPopulation2 -> {
            return newBatch();
        });
        computeIfAbsent.addAll(collection);
        flushIfNeeded(indexPopulation, computeIfAbsent);
    }

    private void flushIfNeeded(MultipleIndexPopulator.IndexPopulation indexPopulation, List<NodePropertyUpdate> list) {
        if (list.size() >= this.BATCH_SIZE) {
            this.batchedUpdates.remove(indexPopulation);
            flush(indexPopulation, list);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flushAll() {
        Iterator<Map.Entry<MultipleIndexPopulator.IndexPopulation, List<NodePropertyUpdate>>> it = this.batchedUpdates.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<MultipleIndexPopulator.IndexPopulation, List<NodePropertyUpdate>> next = it.next();
            MultipleIndexPopulator.IndexPopulation key = next.getKey();
            List<NodePropertyUpdate> value = next.getValue();
            it.remove();
            if (value != null && !value.isEmpty()) {
                flush(key, value);
            }
        }
    }

    private void flush(MultipleIndexPopulator.IndexPopulation indexPopulation, List<NodePropertyUpdate> list) {
        this.activeTasks.incrementAndGet();
        this.executor.execute(() -> {
            try {
                try {
                    indexPopulation.populator.add(list);
                    this.activeTasks.decrementAndGet();
                } catch (Throwable th) {
                    fail(indexPopulation, th);
                    this.activeTasks.decrementAndGet();
                }
            } catch (Throwable th2) {
                this.activeTasks.decrementAndGet();
                throw th2;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdownExecutor(boolean z) {
        this.log.info((z ? "Forcefully shutting" : "Shutting") + " down executor." + EOL + this);
        if (z) {
            this.executor.shutdownNow();
        } else {
            this.executor.shutdown();
        }
        try {
            if (!this.executor.awaitTermination(this.AWAIT_TIMEOUT_MINUTES, TimeUnit.MINUTES)) {
                handleTimeout();
            }
        } catch (InterruptedException e) {
            handleInterrupt();
        }
    }

    private void handleTimeout() {
        throw new IllegalStateException("Index population tasks were not able to complete in " + this.AWAIT_TIMEOUT_MINUTES + " minutes." + EOL + this + EOL + allStackTraces());
    }

    private void handleInterrupt() {
        Thread.currentThread().interrupt();
        this.log.warn("Interrupted while waiting for index population tasks to complete." + EOL + this);
    }

    private List<NodePropertyUpdate> newBatch() {
        return new ArrayList(this.BATCH_SIZE);
    }

    private ExecutorService createThreadPool() {
        int numberOfPopulationWorkers = getNumberOfPopulationWorkers();
        return new ThreadPoolExecutor(numberOfPopulationWorkers, numberOfPopulationWorkers, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(this.TASK_QUEUE_SIZE), NamedThreadFactory.daemon(FLUSH_THREAD_NAME_PREFIX), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private static String allStackTraces() {
        return (String) Thread.getAllStackTraces().entrySet().stream().map(entry -> {
            return Exceptions.stringify((Thread) entry.getKey(), (StackTraceElement[]) entry.getValue());
        }).collect(Collectors.joining());
    }

    private static int getNumberOfPopulationWorkers() {
        return Math.max(2, Runtime.getRuntime().availableProcessors() - 1);
    }
}
