/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.smallrye.reactivemessaging.runtime;

import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry;
import io.smallrye.reactive.messaging.providers.helpers.Validation;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.WorkerExecutor;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.BeforeDestroyed;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.event.Reception;
import jakarta.enterprise.inject.Alternative;
import jakarta.inject.Inject;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;

@Alternative
@Priority(value=1)
@ApplicationScoped
public class QuarkusWorkerPoolRegistry
extends WorkerPoolRegistry {
    private static final Logger log = Logger.getLogger(WorkerPoolRegistry.class);
    private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker";
    private static final String WORKER_CONCURRENCY = "max-concurrency";
    public static final String DEFAULT_VIRTUAL_THREAD_WORKER = "<virtual-thread>";
    @Inject
    ExecutionHolder executionHolder;
    private final Map<String, Integer> workerConcurrency = new HashMap<String, Integer>();
    private final Map<String, WorkerExecutor> workerExecutors = new ConcurrentHashMap<String, WorkerExecutor>();
    private final Set<String> virtualThreadWorkers = QuarkusWorkerPoolRegistry.initVirtualThreadWorkers();

    private static Set<String> initVirtualThreadWorkers() {
        ConcurrentHashSet set = new ConcurrentHashSet();
        set.add(DEFAULT_VIRTUAL_THREAD_WORKER);
        return set;
    }

    public void terminate(@Observes(notifyObserver=Reception.IF_EXISTS) @Priority(value=100) @BeforeDestroyed(value=ApplicationScoped.class) Object event) {
        if (!this.workerExecutors.isEmpty()) {
            for (WorkerExecutor executor : this.workerExecutors.values()) {
                executor.close();
            }
        }
    }

    public <T> Uni<T> executeWork(Context currentContext, Uni<T> uni, String workerName, boolean ordered) {
        Objects.requireNonNull(uni, "Action to execute not provided");
        if (workerName == null) {
            if (currentContext != null) {
                return currentContext.executeBlocking(Uni.createFrom().deferred(() -> uni), ordered);
            }
            return this.executionHolder.vertx().executeBlocking(uni, ordered);
        }
        if (this.virtualThreadWorkers.contains(workerName)) {
            return this.runOnVirtualThread(currentContext, uni);
        }
        if (currentContext != null) {
            return this.getWorker(workerName).executeBlocking(uni, ordered).onItemOrFailure().transformToUni((item, failure) -> Uni.createFrom().emitter(emitter -> {
                if (failure != null) {
                    currentContext.runOnContext(() -> emitter.fail(failure));
                } else {
                    currentContext.runOnContext(() -> emitter.complete(item));
                }
            }));
        }
        return this.getWorker(workerName).executeBlocking(uni, ordered);
    }

    private <T> Uni<T> runOnVirtualThread(Context currentContext, Uni<T> uni) {
        return uni.runSubscriptionOn((Executor)VirtualThreadsRecorder.getCurrent()).onItemOrFailure().transformToUni((item, failure) -> Uni.createFrom().emitter(emitter -> {
            if (currentContext != null) {
                if (failure != null) {
                    currentContext.runOnContext(() -> emitter.fail(failure));
                } else {
                    currentContext.runOnContext(() -> emitter.complete(item));
                }
            } else if (failure != null) {
                emitter.fail(failure);
            } else {
                emitter.complete(item);
            }
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public WorkerExecutor getWorker(String workerName) {
        Objects.requireNonNull(workerName, "Worker Name not specified");
        if (this.workerExecutors.containsKey(workerName)) {
            return this.workerExecutors.get(workerName);
        }
        if (this.workerConcurrency.containsKey(workerName)) {
            WorkerExecutor executor = this.workerExecutors.get(workerName);
            if (executor == null) {
                QuarkusWorkerPoolRegistry quarkusWorkerPoolRegistry = this;
                synchronized (quarkusWorkerPoolRegistry) {
                    executor = this.workerExecutors.get(workerName);
                    if (executor == null) {
                        executor = this.executionHolder.vertx().createSharedWorkerExecutor(workerName, this.workerConcurrency.get(workerName).intValue());
                        log.info((Object)("Created worker pool named " + workerName + " with concurrency of " + this.workerConcurrency.get(workerName)));
                        this.workerExecutors.put(workerName, executor);
                    }
                }
            }
            if (executor != null) {
                return executor;
            }
            throw new RuntimeException("Failed to create Worker for " + workerName);
        }
        throw new IllegalArgumentException("@Blocking referred to invalid worker name. " + workerName);
    }

    public void defineWorker(String className, String method, String poolName, boolean virtualThread) {
        Objects.requireNonNull(className, "className was empty");
        Objects.requireNonNull(method, "Method was empty");
        if (virtualThread) {
            this.virtualThreadWorkers.add(poolName);
            return;
        }
        if (!poolName.equals("<no-value>")) {
            if (Validation.isBlank((String)poolName)) {
                throw this.getBlockingError(className, method, "value is blank or null");
            }
            String workerConfigKey = "smallrye.messaging.worker." + poolName + ".max-concurrency";
            Optional concurrency = ConfigProvider.getConfig().getOptionalValue(workerConfigKey, Integer.class);
            if (concurrency.isEmpty()) {
                throw this.getBlockingError(className, method, workerConfigKey + " was not defined");
            }
            this.workerConcurrency.put(poolName, (Integer)concurrency.get());
        }
    }

    private IllegalArgumentException getBlockingError(String className, String method, String message) {
        return new IllegalArgumentException("Invalid method annotated with @Blocking: " + className + "#" + method + " - " + message);
    }
}

