package org.apache.hugegraph.util;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.task.TaskManager;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/util/Consumers.class */
public final class Consumers<V> {
    public static final int THREADS;
    public static final int QUEUE_WORKER_SIZE = 1000;
    public static final long CONSUMER_WAKE_PERIOD = 1;
    private static final Logger LOG;
    private final V QUEUE_END;
    private final ExecutorService executor;
    private final Consumer<V> consumer;
    private final Runnable doneHandle;
    private final Consumer<Throwable> exceptionHandle;
    private final int workers;
    private final List<Future> runningFutures;
    private final int queueSize;
    private final CountDownLatch latch;
    private final BlockingQueue<V> queue;
    private volatile Throwable exception;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/hugegraph/util/Consumers$ExecutorPool.class */
    public static class ExecutorPool {
        private static final int POOL_CAPACITY = 2 * CoreOptions.CPUS;
        private final String threadNamePrefix;
        private final int executorWorkers;
        private final AtomicInteger count = new AtomicInteger();
        private final Queue<ExecutorService> executors = new ArrayBlockingQueue(POOL_CAPACITY);

        public ExecutorPool(String str, int i) {
            this.threadNamePrefix = str;
            this.executorWorkers = i;
        }

        public synchronized ExecutorService getExecutor() {
            ExecutorService poll = this.executors.poll();
            if (poll == null) {
                poll = Consumers.newThreadPool(this.threadNamePrefix + "-" + this.count.incrementAndGet(), this.executorWorkers);
            }
            return poll;
        }

        public synchronized void returnExecutor(ExecutorService executorService) {
            E.checkNotNull(executorService, "executor");
            if (this.executors.offer(executorService)) {
                return;
            }
            try {
                executorService.shutdown();
            } catch (Exception e) {
                Consumers.LOG.warn("close ExecutorService with error:", e);
            }
        }

        public synchronized void destroy() {
            Iterator<ExecutorService> it = this.executors.iterator();
            while (it.hasNext()) {
                try {
                    it.next().shutdownNow();
                } catch (Exception e) {
                    Consumers.LOG.warn("close ExecutorService with error:", e);
                }
            }
            this.executors.clear();
        }
    }

    /* loaded from: input_file:org/apache/hugegraph/util/Consumers$StopExecution.class */
    public static class StopExecution extends HugeException {
        private static final long serialVersionUID = -371829356182454517L;

        public StopExecution(String str) {
            super(str);
        }

        public StopExecution(String str, Object... objArr) {
            super(str, objArr);
        }
    }

    public Consumers(ExecutorService executorService, Consumer<V> consumer) {
        this(executorService, consumer, null);
    }

    public Consumers(ExecutorService executorService, Consumer<V> consumer, Runnable runnable) {
        this(executorService, consumer, runnable, 1000);
    }

    public Consumers(ExecutorService executorService, Consumer<V> consumer, Runnable runnable, int i) {
        this(executorService, consumer, runnable, null, i);
    }

    public Consumers(ExecutorService executorService, Consumer<V> consumer, Runnable runnable, Consumer<Throwable> consumer2, int i) {
        this.QUEUE_END = (V) new Object();
        this.exception = null;
        this.executor = executorService;
        this.consumer = consumer;
        this.doneHandle = runnable;
        this.exceptionHandle = consumer2;
        int corePoolSize = this.executor instanceof ThreadPoolExecutor ? ((ThreadPoolExecutor) this.executor).getCorePoolSize() : THREADS;
        this.workers = corePoolSize;
        this.runningFutures = new ArrayList(corePoolSize);
        this.queueSize = (i * corePoolSize) + 1;
        this.latch = new CountDownLatch(corePoolSize);
        this.queue = new ArrayBlockingQueue(this.queueSize);
    }

    public void start(String str) {
        this.exception = null;
        if (this.executor == null) {
            return;
        }
        LOG.info("Starting {} workers[{}] with queue size {}...", new Object[]{Integer.valueOf(this.workers), str, Integer.valueOf(this.queueSize)});
        for (int i = 0; i < this.workers; i++) {
            this.runningFutures.add(this.executor.submit(new TaskManager.ContextCallable(this::runAndDone)));
        }
    }

    private Void runAndDone() {
        try {
            run();
            return null;
        } catch (Throwable th) {
            if (th instanceof StopExecution) {
                this.queue.clear();
                putQueueEnd();
            } else {
                this.exception = th;
                LOG.error("Error when running task", th);
            }
            exceptionHandle(th);
            return null;
        } finally {
            done();
            this.latch.countDown();
        }
    }

    private void run() {
        LOG.debug("Start to work...");
        do {
        } while (consume());
        LOG.debug("Worker finished");
    }

    private boolean consume() {
        V v = null;
        while (v == null) {
            try {
                v = this.queue.poll(1L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                return false;
            }
        }
        if (v == this.QUEUE_END) {
            putQueueEnd();
            return false;
        }
        this.consumer.accept(v);
        return true;
    }

    private void exceptionHandle(Throwable th) {
        if (this.exceptionHandle == null) {
            return;
        }
        try {
            this.exceptionHandle.accept(th);
        } catch (Throwable th2) {
            if (this.exception == null) {
                this.exception = th2;
            } else {
                LOG.warn("Error while calling exceptionHandle()", th2);
            }
        }
    }

    private void done() {
        if (this.doneHandle == null) {
            return;
        }
        try {
            this.doneHandle.run();
        } catch (Throwable th) {
            if (this.exception == null) {
                this.exception = th;
            } else {
                LOG.warn("Error while calling done()", th);
            }
        }
    }

    private Throwable throwException() {
        if (!$assertionsDisabled && this.exception == null) {
            throw new AssertionError();
        }
        Throwable th = this.exception;
        this.exception = null;
        return th;
    }

    public void provide(V v) throws Throwable {
        if (this.executor == null) {
            if (!$assertionsDisabled && this.exception != null) {
                throw new AssertionError();
            }
            this.consumer.accept(v);
            return;
        }
        if (this.exception != null) {
            throw throwException();
        }
        try {
            this.queue.put(v);
        } catch (InterruptedException e) {
            LOG.warn("Interrupt while queuing QUEUE_END", e);
        }
    }

    private void putQueueEnd() {
        if (this.executor != null) {
            try {
                this.queue.put(this.QUEUE_END);
            } catch (InterruptedException e) {
                LOG.warn("Interrupted while enqueue", e);
            }
        }
    }

    public void await() throws Throwable {
        if (this.executor == null) {
            done();
        } else {
            try {
                putQueueEnd();
                this.latch.await();
            } catch (InterruptedException e) {
                Iterator<Future> it = this.runningFutures.iterator();
                while (it.hasNext()) {
                    it.next().cancel(true);
                }
                this.exception = new HugeException("Interrupted while waiting for consumers", e);
                LOG.warn("Interrupted while waiting for consumers", e);
            }
        }
        if (this.exception != null) {
            throw throwException();
        }
    }

    public ExecutorService executor() {
        return this.executor;
    }

    public static void executeOncePerThread(ExecutorService executorService, int i, Runnable runnable, long j) throws InterruptedException {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ArrayList arrayList = new ArrayList();
        Callable callable = () -> {
            Thread currentThread = Thread.currentThread();
            concurrentHashMap.putIfAbsent(currentThread, 0);
            int intValue = ((Integer) concurrentHashMap.get(currentThread)).intValue();
            if (intValue == 0) {
                runnable.run();
                Thread.yield();
            } else {
                if (!$assertionsDisabled && intValue >= i) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && concurrentHashMap.size() >= i) {
                    throw new AssertionError();
                }
                E.checkState(arrayList.size() == i, "Bad tasks size: %s", new Object[]{Integer.valueOf(arrayList.size())});
                executorService.submit((Callable) arrayList.get(0)).get();
            }
            concurrentHashMap.put(currentThread, Integer.valueOf(intValue + 1));
            return null;
        };
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(callable);
        }
        executorService.invokeAll(arrayList, j, TimeUnit.SECONDS);
    }

    public static ExecutorService newThreadPool(String str, int i) {
        if (i == 0) {
            return null;
        }
        if (i < 0) {
            if (!$assertionsDisabled && i != -1) {
                throw new AssertionError();
            }
            i = THREADS;
        } else if (i > CoreOptions.CPUS * 2) {
            i = CoreOptions.CPUS * 2;
        }
        return ExecutorUtil.newFixedThreadPool(i, str + "-worker-%d");
    }

    public static ExecutorPool newExecutorPool(String str, int i) {
        return new ExecutorPool(str, i);
    }

    public static RuntimeException wrapException(Throwable th) {
        if (th instanceof RuntimeException) {
            throw ((RuntimeException) th);
        }
        throw new HugeException("Error when running task: %s", HugeException.rootCause(th).getMessage(), th);
    }

    static {
        $assertionsDisabled = !Consumers.class.desiredAssertionStatus();
        THREADS = 4 + (CoreOptions.CPUS / 4);
        LOG = Log.logger(Consumers.class);
    }
}
