/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionAction;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionActioner
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(FunctionActioner.class);
    private final WorkerConfig workerConfig;
    private final RuntimeFactory runtimeFactory;
    private final Namespace dlogNamespace;
    private LinkedBlockingQueue<FunctionAction> actionQueue;
    private volatile boolean running;
    private Thread actioner;
    private final ConnectorsManager connectorsManager;
    private final PulsarAdmin pulsarAdmin;

    public FunctionActioner(WorkerConfig workerConfig, RuntimeFactory runtimeFactory, Namespace dlogNamespace, LinkedBlockingQueue<FunctionAction> actionQueue, ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin) {
        this.workerConfig = workerConfig;
        this.runtimeFactory = runtimeFactory;
        this.dlogNamespace = dlogNamespace;
        this.actionQueue = actionQueue;
        this.connectorsManager = connectorsManager;
        this.pulsarAdmin = pulsarAdmin;
        this.actioner = new Thread(() -> {
            log.info("Starting Actioner Thread...");
            while (this.running) {
                try {
                    FunctionAction action = (FunctionAction)actionQueue.poll(1L, TimeUnit.SECONDS);
                    this.processAction(action);
                }
                catch (InterruptedException interruptedException) {}
            }
        });
        this.actioner.setName("FunctionActionerThread");
    }

    void processAction(FunctionAction action) {
        if (action == null) {
            return;
        }
        switch (action.getAction()) {
            case START: {
                try {
                    this.startFunction(action.getFunctionRuntimeInfo());
                }
                catch (Exception ex) {
                    Function.FunctionDetails details = action.getFunctionRuntimeInfo().getFunctionInstance().getFunctionMetaData().getFunctionDetails();
                    log.info("{}/{}/{} Error starting function", new Object[]{details.getTenant(), details.getNamespace(), details.getName(), ex});
                    action.getFunctionRuntimeInfo().setStartupException(ex);
                }
                break;
            }
            case STOP: {
                this.stopFunction(action.getFunctionRuntimeInfo());
                break;
            }
            case TERMINATE: {
                this.terminateFunction(action.getFunctionRuntimeInfo());
            }
        }
    }

    public void start() {
        this.running = true;
        this.actioner.start();
    }

    @Override
    public void close() {
        this.running = false;
    }

    public void join() throws InterruptedException {
        this.actioner.join();
    }

    @VisibleForTesting
    public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception {
        String packageFile;
        Function.FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
        Function.FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
        int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
        log.info("{}/{}/{}-{} Starting function ...", new Object[]{functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), instanceId});
        String pkgLocation = functionMetaData.getPackageLocation().getPackagePath();
        boolean isPkgUrlProvided = org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported((String)pkgLocation);
        if (this.runtimeFactory.externallyManaged()) {
            packageFile = pkgLocation;
        } else if (isPkgUrlProvided && pkgLocation.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
            URL url = new URL(pkgLocation);
            File pkgFile = new File(url.toURI());
            packageFile = pkgFile.getAbsolutePath();
        } else if (FunctionActioner.isFunctionCodeBuiltin((Function.FunctionDetailsOrBuilder)functionDetails)) {
            File pkgFile = this.getBuiltinArchive(Function.FunctionDetails.newBuilder((Function.FunctionDetails)functionMetaData.getFunctionDetails()));
            packageFile = pkgFile.getAbsolutePath();
        } else {
            File pkgDir = new File(this.workerConfig.getDownloadDirectory(), this.getDownloadPackagePath(functionMetaData, instanceId));
            pkgDir.mkdirs();
            File pkgFile = new File(pkgDir, new File(FunctionDetailsUtils.getDownloadFileName((Function.FunctionDetails)functionMetaData.getFunctionDetails(), (Function.PackageLocationMetaData)functionMetaData.getPackageLocation())).getName());
            this.downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId);
            packageFile = pkgFile.getAbsolutePath();
        }
        RuntimeSpawner runtimeSpawner = this.getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile);
        functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
        runtimeSpawner.start();
    }

    RuntimeSpawner getRuntimeSpawner(Function.Instance instance, String packageFile) {
        Function.FunctionMetaData functionMetaData = instance.getFunctionMetaData();
        int instanceId = instance.getInstanceId();
        Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder((Function.FunctionDetails)functionMetaData.getFunctionDetails());
        InstanceConfig instanceConfig = this.createInstanceConfig(functionDetailsBuilder.build(), instanceId, this.workerConfig.getPulsarFunctionsCluster());
        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, packageFile, functionMetaData.getPackageLocation().getOriginalFileName(), this.runtimeFactory, this.workerConfig.getInstanceLivenessCheckFreqMs());
        return runtimeSpawner;
    }

    InstanceConfig createInstanceConfig(Function.FunctionDetails functionDetails, int instanceId, String clusterName) {
        InstanceConfig instanceConfig = new InstanceConfig();
        instanceConfig.setFunctionDetails(functionDetails);
        instanceConfig.setFunctionId(UUID.randomUUID().toString());
        instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
        instanceConfig.setInstanceId(instanceId);
        instanceConfig.setMaxBufferedTuples(1024);
        instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
        instanceConfig.setClusterName(clusterName);
        return instanceConfig;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void downloadFile(File pkgFile, boolean isPkgUrlProvided, Function.FunctionMetaData functionMetaData, int instanceId) throws FileNotFoundException, IOException {
        File tempPkgFile;
        Function.FunctionDetails details = functionMetaData.getFunctionDetails();
        File pkgDir = pkgFile.getParentFile();
        if (pkgFile.exists()) {
            log.warn("Function package exists already {} deleting it", (Object)pkgFile);
            pkgFile.delete();
        }
        while ((tempPkgFile = new File(pkgDir, pkgFile.getName() + "." + instanceId + "." + UUID.randomUUID().toString())).exists() || !tempPkgFile.createNewFile()) {
        }
        String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath();
        boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(org.apache.pulsar.common.functions.Utils.HTTP);
        log.info("{}/{}/{} Function package file {} will be downloaded from {}", new Object[]{tempPkgFile, details.getTenant(), details.getNamespace(), details.getName(), downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation()});
        if (downloadFromHttp) {
            Utils.downloadFromHttpUrl(pkgLocationPath, new FileOutputStream(tempPkgFile));
        } else {
            Utils.downloadFromBookkeeper(this.dlogNamespace, new FileOutputStream(tempPkgFile), pkgLocationPath);
        }
        try {
            try {
                Files.createLink(Paths.get(pkgFile.toURI()), Paths.get(tempPkgFile.toURI()));
                log.info("Function package file is linked from {} to {}", (Object)tempPkgFile, (Object)pkgFile);
            }
            catch (FileAlreadyExistsException faee) {
                log.warn("Function package has been downloaded from {} and saved at {}", (Object)functionMetaData.getPackageLocation(), (Object)pkgFile);
            }
        }
        finally {
            tempPkgFile.delete();
        }
    }

    public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        File pkgDir;
        Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
        Function.FunctionMetaData functionMetaData = instance.getFunctionMetaData();
        Function.FunctionDetails details = functionMetaData.getFunctionDetails();
        log.info("{}/{}/{}-{} Stopping function...", new Object[]{details.getTenant(), details.getNamespace(), details.getName(), instance.getInstanceId()});
        if (functionRuntimeInfo.getRuntimeSpawner() != null) {
            functionRuntimeInfo.getRuntimeSpawner().close();
            functionRuntimeInfo.setRuntimeSpawner(null);
        }
        if ((pkgDir = new File(this.workerConfig.getDownloadDirectory(), this.getDownloadPackagePath(functionMetaData, instance.getInstanceId()))).exists()) {
            try {
                MoreFiles.deleteRecursively((Path)Paths.get(pkgDir.toURI()), (RecursiveDeleteOption[])new RecursiveDeleteOption[]{RecursiveDeleteOption.ALLOW_INSECURE});
            }
            catch (IOException e) {
                log.warn("Failed to delete package for function: {}", (Object)FunctionDetailsUtils.getFullyQualifiedName((Function.FunctionDetails)functionMetaData.getFunctionDetails()), (Object)e);
            }
        }
    }

    private void terminateFunction(final FunctionRuntimeInfo functionRuntimeInfo) {
        Function.FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
        log.info("{}/{}/{}-{} Terminating function...", new Object[]{details.getTenant(), details.getNamespace(), details.getName(), functionRuntimeInfo.getFunctionInstance().getInstanceId()});
        this.stopFunction(functionRuntimeInfo);
        if (details.getSource().getCleanupSubscription()) {
            Map consumerSpecMap = details.getSource().getInputSpecsMap();
            consumerSpecMap.entrySet().forEach(new Consumer<Map.Entry<String, Function.ConsumerSpec>>(){

                @Override
                public void accept(Map.Entry<String, Function.ConsumerSpec> stringConsumerSpecEntry) {
                    Function.ConsumerSpec consumerSpec = stringConsumerSpecEntry.getValue();
                    String topic = stringConsumerSpecEntry.getKey();
                    String subscriptionName = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName();
                    if (StringUtils.isBlank((CharSequence)subscriptionName)) {
                        subscriptionName = InstanceUtils.getDefaultSubscriptionName((Function.FunctionDetails)functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails());
                    }
                    try {
                        if (consumerSpec.getIsRegexPattern()) {
                            FunctionActioner.this.pulsarAdmin.namespaces().unsubscribeNamespace(TopicName.get((String)topic).getNamespace(), subscriptionName);
                        } else {
                            FunctionActioner.this.pulsarAdmin.topics().deleteSubscription(topic, subscriptionName);
                        }
                    }
                    catch (PulsarAdminException e) {
                        log.warn("Failed to cleanup {} subscription for {}", new Object[]{subscriptionName, FunctionDetailsUtils.getFullyQualifiedName((Function.FunctionDetails)functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails()), e});
                    }
                }
            });
        }
    }

    private String getDownloadPackagePath(Function.FunctionMetaData functionMetaData, int instanceId) {
        return StringUtils.join((Object[])new String[]{functionMetaData.getFunctionDetails().getTenant(), functionMetaData.getFunctionDetails().getNamespace(), functionMetaData.getFunctionDetails().getName(), Integer.toString(instanceId)}, (char)File.separatorChar);
    }

    public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder functionDetails) {
        Function.SinkSpec sinkSpec;
        Function.SourceSpec sourceSpec;
        if (functionDetails.hasSource() && !StringUtils.isEmpty((CharSequence)(sourceSpec = functionDetails.getSource()).getBuiltin())) {
            return true;
        }
        return functionDetails.hasSink() && !StringUtils.isEmpty((CharSequence)(sinkSpec = functionDetails.getSink()).getBuiltin());
    }

    private File getBuiltinArchive(Function.FunctionDetails.Builder functionDetails) throws IOException {
        Function.SinkSpec sinkSpec;
        Function.SourceSpec sourceSpec;
        if (functionDetails.hasSource() && !StringUtils.isEmpty((CharSequence)(sourceSpec = functionDetails.getSource()).getBuiltin())) {
            File archive = this.connectorsManager.getSourceArchive(sourceSpec.getBuiltin()).toFile();
            String sourceClass = ConnectorUtils.getConnectorDefinition((String)archive.toString()).getSourceClass();
            Function.SourceSpec.Builder builder = Function.SourceSpec.newBuilder((Function.SourceSpec)functionDetails.getSource());
            builder.setClassName(sourceClass);
            functionDetails.setSource(builder);
            this.fillSourceTypeClass(functionDetails, archive, sourceClass);
            return archive;
        }
        if (functionDetails.hasSink() && !StringUtils.isEmpty((CharSequence)(sinkSpec = functionDetails.getSink()).getBuiltin())) {
            File archive = this.connectorsManager.getSinkArchive(sinkSpec.getBuiltin()).toFile();
            String sinkClass = ConnectorUtils.getConnectorDefinition((String)archive.toString()).getSinkClass();
            Function.SinkSpec.Builder builder = Function.SinkSpec.newBuilder((Function.SinkSpec)functionDetails.getSink());
            builder.setClassName(sinkClass);
            functionDetails.setSink(builder);
            this.fillSinkTypeClass(functionDetails, archive, sinkClass);
            return archive;
        }
        throw new IOException("Could not find built in archive definition");
    }

    private void fillSourceTypeClass(Function.FunctionDetails.Builder functionDetails, File archive, String className) throws IOException {
        try (NarClassLoader ncl = NarClassLoader.getFromArchive((File)archive, Collections.emptySet());){
            String typeArg = org.apache.pulsar.functions.utils.Utils.getSourceType((String)className, (ClassLoader)ncl).getName();
            Function.SourceSpec.Builder sourceBuilder = Function.SourceSpec.newBuilder((Function.SourceSpec)functionDetails.getSource());
            sourceBuilder.setTypeClassName(typeArg);
            functionDetails.setSource(sourceBuilder);
            Function.SinkSpec sinkSpec = functionDetails.getSink();
            if (null == sinkSpec || StringUtils.isEmpty((CharSequence)sinkSpec.getTypeClassName())) {
                Function.SinkSpec.Builder sinkBuilder = Function.SinkSpec.newBuilder((Function.SinkSpec)sinkSpec);
                sinkBuilder.setTypeClassName(typeArg);
                functionDetails.setSink(sinkBuilder);
            }
        }
    }

    private void fillSinkTypeClass(Function.FunctionDetails.Builder functionDetails, File archive, String className) throws IOException {
        try (NarClassLoader ncl = NarClassLoader.getFromArchive((File)archive, Collections.emptySet());){
            String typeArg = org.apache.pulsar.functions.utils.Utils.getSinkType((String)className, (ClassLoader)ncl).getName();
            Function.SinkSpec.Builder sinkBuilder = Function.SinkSpec.newBuilder((Function.SinkSpec)functionDetails.getSink());
            sinkBuilder.setTypeClassName(typeArg);
            functionDetails.setSink(sinkBuilder);
            Function.SourceSpec sourceSpec = functionDetails.getSource();
            if (null == sourceSpec || StringUtils.isEmpty((CharSequence)sourceSpec.getTypeClassName())) {
                Function.SourceSpec.Builder sourceBuilder = Function.SourceSpec.newBuilder((Function.SourceSpec)sourceSpec);
                sourceBuilder.setTypeClassName(typeArg);
                functionDetails.setSource(sourceBuilder);
            }
        }
    }

    public void setActionQueue(LinkedBlockingQueue<FunctionAction> actionQueue) {
        this.actionQueue = actionQueue;
    }

    public void setRunning(boolean running) {
        this.running = running;
    }

    public void setActioner(Thread actioner) {
        this.actioner = actioner;
    }

    public WorkerConfig getWorkerConfig() {
        return this.workerConfig;
    }

    public RuntimeFactory getRuntimeFactory() {
        return this.runtimeFactory;
    }

    public Namespace getDlogNamespace() {
        return this.dlogNamespace;
    }

    public LinkedBlockingQueue<FunctionAction> getActionQueue() {
        return this.actionQueue;
    }

    public boolean isRunning() {
        return this.running;
    }

    public Thread getActioner() {
        return this.actioner;
    }

    public ConnectorsManager getConnectorsManager() {
        return this.connectorsManager;
    }

    public PulsarAdmin getPulsarAdmin() {
        return this.pulsarAdmin;
    }

    public boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        if (!(o instanceof FunctionActioner)) {
            return false;
        }
        FunctionActioner other = (FunctionActioner)o;
        if (!other.canEqual(this)) {
            return false;
        }
        WorkerConfig this$workerConfig = this.getWorkerConfig();
        WorkerConfig other$workerConfig = other.getWorkerConfig();
        if (this$workerConfig == null ? other$workerConfig != null : !((Object)this$workerConfig).equals(other$workerConfig)) {
            return false;
        }
        RuntimeFactory this$runtimeFactory = this.getRuntimeFactory();
        RuntimeFactory other$runtimeFactory = other.getRuntimeFactory();
        if (this$runtimeFactory == null ? other$runtimeFactory != null : !this$runtimeFactory.equals(other$runtimeFactory)) {
            return false;
        }
        Namespace this$dlogNamespace = this.getDlogNamespace();
        Namespace other$dlogNamespace = other.getDlogNamespace();
        if (this$dlogNamespace == null ? other$dlogNamespace != null : !this$dlogNamespace.equals(other$dlogNamespace)) {
            return false;
        }
        LinkedBlockingQueue<FunctionAction> this$actionQueue = this.getActionQueue();
        LinkedBlockingQueue<FunctionAction> other$actionQueue = other.getActionQueue();
        if (this$actionQueue == null ? other$actionQueue != null : !this$actionQueue.equals(other$actionQueue)) {
            return false;
        }
        if (this.isRunning() != other.isRunning()) {
            return false;
        }
        Thread this$actioner = this.getActioner();
        Thread other$actioner = other.getActioner();
        if (this$actioner == null ? other$actioner != null : !this$actioner.equals(other$actioner)) {
            return false;
        }
        ConnectorsManager this$connectorsManager = this.getConnectorsManager();
        ConnectorsManager other$connectorsManager = other.getConnectorsManager();
        if (this$connectorsManager == null ? other$connectorsManager != null : !this$connectorsManager.equals(other$connectorsManager)) {
            return false;
        }
        PulsarAdmin this$pulsarAdmin = this.getPulsarAdmin();
        PulsarAdmin other$pulsarAdmin = other.getPulsarAdmin();
        return !(this$pulsarAdmin == null ? other$pulsarAdmin != null : !this$pulsarAdmin.equals(other$pulsarAdmin));
    }

    protected boolean canEqual(Object other) {
        return other instanceof FunctionActioner;
    }

    public int hashCode() {
        int PRIME = 59;
        int result = 1;
        WorkerConfig $workerConfig = this.getWorkerConfig();
        result = result * 59 + ($workerConfig == null ? 43 : ((Object)$workerConfig).hashCode());
        RuntimeFactory $runtimeFactory = this.getRuntimeFactory();
        result = result * 59 + ($runtimeFactory == null ? 43 : $runtimeFactory.hashCode());
        Namespace $dlogNamespace = this.getDlogNamespace();
        result = result * 59 + ($dlogNamespace == null ? 43 : $dlogNamespace.hashCode());
        LinkedBlockingQueue<FunctionAction> $actionQueue = this.getActionQueue();
        result = result * 59 + ($actionQueue == null ? 43 : $actionQueue.hashCode());
        result = result * 59 + (this.isRunning() ? 79 : 97);
        Thread $actioner = this.getActioner();
        result = result * 59 + ($actioner == null ? 43 : $actioner.hashCode());
        ConnectorsManager $connectorsManager = this.getConnectorsManager();
        result = result * 59 + ($connectorsManager == null ? 43 : $connectorsManager.hashCode());
        PulsarAdmin $pulsarAdmin = this.getPulsarAdmin();
        result = result * 59 + ($pulsarAdmin == null ? 43 : $pulsarAdmin.hashCode());
        return result;
    }

    public String toString() {
        return "FunctionActioner(workerConfig=" + this.getWorkerConfig() + ", runtimeFactory=" + this.getRuntimeFactory() + ", dlogNamespace=" + this.getDlogNamespace() + ", actionQueue=" + this.getActionQueue() + ", running=" + this.isRunning() + ", actioner=" + this.getActioner() + ", connectorsManager=" + this.getConnectorsManager() + ", pulsarAdmin=" + this.getPulsarAdmin() + ")";
    }
}

