package ca.uhn.fhir.jpa.dao.expunge;

import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.StopWatch;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Slice;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:ca/uhn/fhir/jpa/dao/expunge/PartitionRunner.class */
public class PartitionRunner {
    private static final Logger ourLog;
    private static final int MAX_POOL_SIZE = 1000;
    private final DaoConfig myDaoConfig;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Autowired
    public PartitionRunner(DaoConfig daoConfig) {
        this.myDaoConfig = daoConfig;
    }

    public void runInPartitionedThreads(Slice<Long> slice, Consumer<List<Long>> consumer) {
        List<Callable<Void>> buildCallableTasks = buildCallableTasks(slice, consumer);
        if (buildCallableTasks.size() == 0) {
            return;
        }
        if (buildCallableTasks.size() == 1) {
            try {
                buildCallableTasks.get(0).call();
                return;
            } catch (Exception e) {
                ourLog.error("Error while expunging.", e);
                throw new InternalErrorException(e);
            }
        }
        ExecutorService buildExecutor = buildExecutor(buildCallableTasks.size());
        try {
            try {
                try {
                    Iterator it = buildExecutor.invokeAll(buildCallableTasks).iterator();
                    while (it.hasNext()) {
                        ((Future) it.next()).get();
                    }
                    buildExecutor.shutdown();
                } catch (Throwable th) {
                    buildExecutor.shutdown();
                    throw th;
                }
            } catch (InterruptedException e2) {
                ourLog.error("Interrupted while expunging.", e2);
                Thread.currentThread().interrupt();
                buildExecutor.shutdown();
            }
        } catch (ExecutionException e3) {
            ourLog.error("Error while expunging.", e3);
            throw new InternalErrorException(e3);
        }
    }

    private List<Callable<Void>> buildCallableTasks(Slice<Long> slice, Consumer<List<Long>> consumer) {
        ArrayList arrayList = new ArrayList();
        for (List list : Lists.partition(slice.getContent(), this.myDaoConfig.getExpungeBatchSize())) {
            if (list.size() > 0) {
                arrayList.add(() -> {
                    ourLog.info("Expunging any search results pointing to {} resources", Integer.valueOf(list.size()));
                    consumer.accept(list);
                    return null;
                });
            }
        }
        return arrayList;
    }

    private ExecutorService buildExecutor(int i) {
        int min = Math.min(i, this.myDaoConfig.getExpungeThreadCount());
        if (!$assertionsDisabled && min <= 0) {
            throw new AssertionError();
        }
        ourLog.info("Expunging with {} threads", Integer.valueOf(min));
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(MAX_POOL_SIZE);
        return new ThreadPoolExecutor(min, MAX_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, linkedBlockingQueue, new BasicThreadFactory.Builder().namingPattern("expunge-%d").daemon(false).priority(5).build(), (runnable, threadPoolExecutor) -> {
            ourLog.info("Note: Expunge executor queue is full ({} elements), waiting for a slot to become available!", Integer.valueOf(linkedBlockingQueue.size()));
            StopWatch stopWatch = new StopWatch();
            try {
                linkedBlockingQueue.put(runnable);
                ourLog.info("Slot become available after {}ms", Long.valueOf(stopWatch.getMillis()));
            } catch (InterruptedException e) {
                throw new RejectedExecutionException("Task " + runnable.toString() + " rejected from " + e.toString());
            }
        });
    }

    static {
        $assertionsDisabled = !PartitionRunner.class.desiredAssertionStatus();
        ourLog = LoggerFactory.getLogger(ExpungeService.class);
    }
}
