/*
 * Decompiled with CFR 0.152.
 */
package ca.uhn.fhir.jpa.dao.expunge;

import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.StopWatch;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Slice;

public class PartitionRunner {
    private static final Logger ourLog = LoggerFactory.getLogger(PartitionRunner.class);
    private static final int MAX_POOL_SIZE = 1000;
    private final String myProcessName;
    private final String myThreadPrefix;
    private final int myBatchSize;
    private final int myThreadCount;

    public PartitionRunner(String theProcessName, String theThreadPrefix, int theBatchSize, int theThreadCount) {
        this.myProcessName = theProcessName;
        this.myThreadPrefix = theThreadPrefix;
        this.myBatchSize = theBatchSize;
        this.myThreadCount = theThreadCount;
    }

    public void runInPartitionedThreads(Slice<Long> theResourceIds, Consumer<List<Long>> partitionConsumer) {
        List<Callable<Void>> callableTasks = this.buildCallableTasks(theResourceIds, partitionConsumer);
        if (callableTasks.size() == 0) {
            return;
        }
        if (callableTasks.size() == 1) {
            try {
                callableTasks.get(0).call();
                return;
            }
            catch (Exception e) {
                ourLog.error("Error while " + this.myProcessName, (Throwable)e);
                throw new InternalErrorException((Throwable)e);
            }
        }
        ExecutorService executorService = this.buildExecutor(callableTasks.size());
        try {
            List<Future<Void>> futures = executorService.invokeAll(callableTasks);
            for (Future<Void> future : futures) {
                future.get();
            }
        }
        catch (InterruptedException e) {
            ourLog.error("Interrupted while " + this.myProcessName, (Throwable)e);
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            ourLog.error("Error while " + this.myProcessName, (Throwable)e);
            throw new InternalErrorException((Throwable)e);
        }
        finally {
            executorService.shutdown();
        }
    }

    private List<Callable<Void>> buildCallableTasks(Slice<Long> theResourceIds, Consumer<List<Long>> partitionConsumer) {
        ArrayList<Callable<Void>> retval = new ArrayList<Callable<Void>>();
        ourLog.info("Splitting batch job of {} entries into chunks of {}", (Object)theResourceIds.getContent().size(), (Object)this.myBatchSize);
        List partitions = Lists.partition((List)theResourceIds.getContent(), (int)this.myBatchSize);
        for (List nextPartition : partitions) {
            if (nextPartition.size() <= 0) continue;
            Callable<Void> callableTask = () -> {
                ourLog.info(this.myProcessName + " {} resources", (Object)nextPartition.size());
                partitionConsumer.accept(nextPartition);
                return null;
            };
            retval.add(callableTask);
        }
        return retval;
    }

    private ExecutorService buildExecutor(int numberOfTasks) {
        int threadCount = Math.min(numberOfTasks, this.myThreadCount);
        assert (threadCount > 0);
        ourLog.info(this.myProcessName + " with {} threads", (Object)threadCount);
        LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<Runnable>(1000);
        BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern(this.myThreadPrefix + "-%d").daemon(false).priority(5).build();
        RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
            ourLog.info("Note: " + this.myThreadPrefix + " executor queue is full ({} elements), waiting for a slot to become available!", (Object)executorQueue.size());
            StopWatch sw = new StopWatch();
            try {
                executorQueue.put(theRunnable);
            }
            catch (InterruptedException e) {
                throw new RejectedExecutionException("Task " + theRunnable.toString() + " rejected from " + e);
            }
            ourLog.info("Slot become available after {}ms", (Object)sw.getMillis());
        };
        return new ThreadPoolExecutor(threadCount, 1000, 0L, TimeUnit.MILLISECONDS, executorQueue, (ThreadFactory)threadFactory, rejectedExecutionHandler);
    }
}

