/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.auth.FunctionAuthUtils;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionActioner {
    private static final Logger log = LoggerFactory.getLogger(FunctionActioner.class);
    private final WorkerConfig workerConfig;
    private final RuntimeFactory runtimeFactory;
    private final Namespace dlogNamespace;
    private final ConnectorsManager connectorsManager;
    private final PulsarAdmin pulsarAdmin;

    public FunctionActioner(WorkerConfig workerConfig, RuntimeFactory runtimeFactory, Namespace dlogNamespace, ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin) {
        this.workerConfig = workerConfig;
        this.runtimeFactory = runtimeFactory;
        this.dlogNamespace = dlogNamespace;
        this.connectorsManager = connectorsManager;
        this.pulsarAdmin = pulsarAdmin;
    }

    public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        try {
            String packageFile;
            Function.FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
            Function.FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
            int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
            log.info("{}/{}/{}-{} Starting function ...", new Object[]{functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), instanceId});
            String pkgLocation = functionMetaData.getPackageLocation().getPackagePath();
            boolean isPkgUrlProvided = Utils.isFunctionPackageUrlSupported((String)pkgLocation);
            if (this.runtimeFactory.externallyManaged()) {
                packageFile = pkgLocation;
            } else if (isPkgUrlProvided && pkgLocation.startsWith("file")) {
                URL url = new URL(pkgLocation);
                File pkgFile = new File(url.toURI());
                packageFile = pkgFile.getAbsolutePath();
            } else if (WorkerUtils.isFunctionCodeBuiltin((Function.FunctionDetailsOrBuilder)functionDetails)) {
                File pkgFile = this.getBuiltinArchive(Function.FunctionDetails.newBuilder((Function.FunctionDetails)functionMetaData.getFunctionDetails()));
                packageFile = pkgFile.getAbsolutePath();
            } else {
                File pkgDir = new File(this.workerConfig.getDownloadDirectory(), this.getDownloadPackagePath(functionMetaData, instanceId));
                pkgDir.mkdirs();
                File pkgFile = new File(pkgDir, new File(FunctionActioner.getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName());
                this.downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId);
                packageFile = pkgFile.getAbsolutePath();
            }
            RuntimeSpawner runtimeSpawner = this.getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile);
            functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
            runtimeSpawner.start();
        }
        catch (Exception ex) {
            Function.FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
            log.info("{}/{}/{} Error starting function", new Object[]{details.getTenant(), details.getNamespace(), details.getName(), ex});
            functionRuntimeInfo.setStartupException(ex);
        }
    }

    RuntimeSpawner getRuntimeSpawner(Function.Instance instance, String packageFile) {
        Function.FunctionMetaData functionMetaData = instance.getFunctionMetaData();
        int instanceId = instance.getInstanceId();
        Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder((Function.FunctionDetails)functionMetaData.getFunctionDetails());
        Function.FunctionAuthenticationSpec functionAuthenticationSpec = null;
        if (this.workerConfig.isAuthenticationEnabled() && instance.getFunctionMetaData().hasFunctionAuthSpec()) {
            functionAuthenticationSpec = instance.getFunctionMetaData().getFunctionAuthSpec();
        }
        InstanceConfig instanceConfig = this.createInstanceConfig(functionDetailsBuilder.build(), functionAuthenticationSpec, instanceId, this.workerConfig.getPulsarFunctionsCluster());
        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, packageFile, functionMetaData.getPackageLocation().getOriginalFileName(), this.runtimeFactory, this.workerConfig.getInstanceLivenessCheckFreqMs());
        return runtimeSpawner;
    }

    InstanceConfig createInstanceConfig(Function.FunctionDetails functionDetails, Function.FunctionAuthenticationSpec functionAuthSpec, int instanceId, String clusterName) {
        InstanceConfig instanceConfig = new InstanceConfig();
        instanceConfig.setFunctionDetails(functionDetails);
        instanceConfig.setFunctionId(UUID.randomUUID().toString());
        instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
        instanceConfig.setInstanceId(instanceId);
        instanceConfig.setMaxBufferedTuples(1024);
        instanceConfig.setPort(FunctionCommon.findAvailablePort());
        instanceConfig.setClusterName(clusterName);
        instanceConfig.setFunctionAuthenticationSpec(functionAuthSpec);
        instanceConfig.setMaxPendingAsyncRequests(this.workerConfig.getMaxPendingAsyncRequests());
        return instanceConfig;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void downloadFile(File pkgFile, boolean isPkgUrlProvided, Function.FunctionMetaData functionMetaData, int instanceId) throws FileNotFoundException, IOException {
        File tempPkgFile;
        Function.FunctionDetails details = functionMetaData.getFunctionDetails();
        File pkgDir = pkgFile.getParentFile();
        if (pkgFile.exists()) {
            log.warn("Function package exists already {} deleting it", (Object)pkgFile);
            pkgFile.delete();
        }
        while ((tempPkgFile = new File(pkgDir, pkgFile.getName() + "." + instanceId + "." + UUID.randomUUID().toString())).exists() || !tempPkgFile.createNewFile()) {
        }
        String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath();
        boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith("http");
        log.info("{}/{}/{} Function package file {} will be downloaded from {}", new Object[]{tempPkgFile, details.getTenant(), details.getNamespace(), details.getName(), downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation()});
        if (downloadFromHttp) {
            FunctionCommon.downloadFromHttpUrl((String)pkgLocationPath, (File)tempPkgFile);
        } else {
            FileOutputStream tempPkgFos = new FileOutputStream(tempPkgFile);
            WorkerUtils.downloadFromBookkeeper(this.dlogNamespace, tempPkgFos, pkgLocationPath);
            if (tempPkgFos != null) {
                tempPkgFos.close();
            }
        }
        try {
            try {
                Files.createLink(Paths.get(pkgFile.toURI()), Paths.get(tempPkgFile.toURI()));
                log.info("Function package file is linked from {} to {}", (Object)tempPkgFile, (Object)pkgFile);
            }
            catch (FileAlreadyExistsException faee) {
                log.warn("Function package has been downloaded from {} and saved at {}", (Object)functionMetaData.getPackageLocation(), (Object)pkgFile);
            }
        }
        finally {
            tempPkgFile.delete();
        }
        if (details.getRuntime() == Function.FunctionDetails.Runtime.GO && !pkgFile.canExecute()) {
            pkgFile.setExecutable(true);
            log.info("Golang function package file {} is set to executable", (Object)pkgFile);
        }
    }

    private void cleanupFunctionFiles(FunctionRuntimeInfo functionRuntimeInfo) {
        Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
        Function.FunctionMetaData functionMetaData = instance.getFunctionMetaData();
        File pkgDir = new File(this.workerConfig.getDownloadDirectory(), this.getDownloadPackagePath(functionMetaData, instance.getInstanceId()));
        if (pkgDir.exists()) {
            try {
                MoreFiles.deleteRecursively((Path)Paths.get(pkgDir.toURI()), (RecursiveDeleteOption[])new RecursiveDeleteOption[]{RecursiveDeleteOption.ALLOW_INSECURE});
            }
            catch (IOException e) {
                log.warn("Failed to delete package for function: {}", (Object)FunctionCommon.getFullyQualifiedName((Function.FunctionDetails)functionMetaData.getFunctionDetails()), (Object)e);
            }
        }
    }

    public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
        Function.FunctionMetaData functionMetaData = instance.getFunctionMetaData();
        Function.FunctionDetails details = functionMetaData.getFunctionDetails();
        log.info("{}/{}/{}-{} Stopping function...", new Object[]{details.getTenant(), details.getNamespace(), details.getName(), instance.getInstanceId()});
        if (functionRuntimeInfo.getRuntimeSpawner() != null) {
            functionRuntimeInfo.getRuntimeSpawner().close();
            functionRuntimeInfo.setRuntimeSpawner(null);
        }
        this.cleanupFunctionFiles(functionRuntimeInfo);
    }

    public void terminateFunction(final FunctionRuntimeInfo functionRuntimeInfo) {
        Function.FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
        final String fqfn = FunctionCommon.getFullyQualifiedName((Function.FunctionDetails)details);
        log.info("{}-{} Terminating function...", (Object)fqfn, (Object)functionRuntimeInfo.getFunctionInstance().getInstanceId());
        Function.FunctionDetails funcDetails = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
        if (functionRuntimeInfo.getRuntimeSpawner() != null) {
            functionRuntimeInfo.getRuntimeSpawner().close();
            if (this.workerConfig.isAuthenticationEnabled()) {
                functionRuntimeInfo.getRuntimeSpawner().getRuntimeFactory().getAuthProvider().ifPresent(functionAuthProvider -> {
                    try {
                        log.info("{}-{} Cleaning up authentication data for function...", (Object)fqfn, (Object)functionRuntimeInfo.getFunctionInstance().getInstanceId());
                        functionAuthProvider.cleanUpAuthData(details, Optional.ofNullable(FunctionAuthUtils.getFunctionAuthData(Optional.ofNullable(functionRuntimeInfo.getRuntimeSpawner().getInstanceConfig().getFunctionAuthenticationSpec()))));
                    }
                    catch (Exception e) {
                        log.error("Failed to cleanup auth data for function: {}", (Object)fqfn, (Object)e);
                    }
                });
            }
            functionRuntimeInfo.setRuntimeSpawner(null);
        }
        this.cleanupFunctionFiles(functionRuntimeInfo);
        if (details.getSource().getCleanupSubscription()) {
            Map consumerSpecMap = details.getSource().getInputSpecsMap();
            consumerSpecMap.entrySet().forEach(new Consumer<Map.Entry<String, Function.ConsumerSpec>>(){

                @Override
                public void accept(Map.Entry<String, Function.ConsumerSpec> stringConsumerSpecEntry) {
                    Function.ConsumerSpec consumerSpec = stringConsumerSpecEntry.getValue();
                    String topic = stringConsumerSpecEntry.getKey();
                    String subscriptionName = org.apache.commons.lang3.StringUtils.isBlank((CharSequence)functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName()) ? InstanceUtils.getDefaultSubscriptionName((Function.FunctionDetails)functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails()) : functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName();
                    try {
                        Actions.newBuilder().addAction(Actions.Action.builder().actionName(String.format("Cleaning up subscriptions for function %s", fqfn)).numRetries(10).sleepBetweenInvocationsMs(1000L).supplier(() -> {
                            try {
                                if (consumerSpec.getIsRegexPattern()) {
                                    FunctionActioner.this.pulsarAdmin.namespaces().unsubscribeNamespace(TopicName.get((String)topic).getNamespace(), subscriptionName);
                                } else {
                                    FunctionActioner.this.pulsarAdmin.topics().deleteSubscription(topic, subscriptionName);
                                }
                            }
                            catch (PulsarAdminException e) {
                                if (e instanceof PulsarAdminException.NotFoundException) {
                                    return Actions.ActionResult.builder().success(true).build();
                                }
                                List existingConsumers = Collections.emptyList();
                                try {
                                    TopicStats stats = FunctionActioner.this.pulsarAdmin.topics().getStats(topic);
                                    SubscriptionStats sub = (SubscriptionStats)stats.subscriptions.get(subscriptionName);
                                    if (sub != null) {
                                        existingConsumers = sub.consumers.stream().map(consumerStats -> consumerStats.metadata).collect(Collectors.toList());
                                    }
                                }
                                catch (PulsarAdminException stats) {
                                    // empty catch block
                                }
                                String errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage();
                                return Actions.ActionResult.builder().success(false).errorMsg(String.format("%s - existing consumers: %s", errorMsg, existingConsumers)).build();
                            }
                            return Actions.ActionResult.builder().success(true).build();
                        }).build()).run();
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }
    }

    private String getDownloadPackagePath(Function.FunctionMetaData functionMetaData, int instanceId) {
        return org.apache.commons.lang3.StringUtils.join((Object[])new String[]{functionMetaData.getFunctionDetails().getTenant(), functionMetaData.getFunctionDetails().getNamespace(), functionMetaData.getFunctionDetails().getName(), Integer.toString(instanceId)}, (char)File.separatorChar);
    }

    private File getBuiltinArchive(Function.FunctionDetails.Builder functionDetails) throws IOException, ClassNotFoundException {
        Function.SinkSpec sinkSpec;
        Function.SourceSpec sourceSpec;
        if (functionDetails.hasSource() && !org.apache.commons.lang3.StringUtils.isEmpty((CharSequence)(sourceSpec = functionDetails.getSource()).getBuiltin())) {
            File archive = this.connectorsManager.getSourceArchive(sourceSpec.getBuiltin()).toFile();
            String sourceClass = ConnectorUtils.getConnectorDefinition((String)archive.toString()).getSourceClass();
            Function.SourceSpec.Builder builder = Function.SourceSpec.newBuilder((Function.SourceSpec)functionDetails.getSource());
            builder.setClassName(sourceClass);
            functionDetails.setSource(builder);
            this.fillSourceTypeClass(functionDetails, archive, sourceClass);
            return archive;
        }
        if (functionDetails.hasSink() && !org.apache.commons.lang3.StringUtils.isEmpty((CharSequence)(sinkSpec = functionDetails.getSink()).getBuiltin())) {
            File archive = this.connectorsManager.getSinkArchive(sinkSpec.getBuiltin()).toFile();
            String sinkClass = ConnectorUtils.getConnectorDefinition((String)archive.toString()).getSinkClass();
            Function.SinkSpec.Builder builder = Function.SinkSpec.newBuilder((Function.SinkSpec)functionDetails.getSink());
            builder.setClassName(sinkClass);
            functionDetails.setSink(builder);
            this.fillSinkTypeClass(functionDetails, archive, sinkClass);
            return archive;
        }
        throw new IOException("Could not find built in archive definition");
    }

    private void fillSourceTypeClass(Function.FunctionDetails.Builder functionDetails, File archive, String className) throws IOException, ClassNotFoundException {
        try (NarClassLoader ncl = NarClassLoader.getFromArchive((File)archive, Collections.emptySet());){
            String typeArg = FunctionCommon.getSourceType((String)className, (ClassLoader)ncl).getName();
            Function.SourceSpec.Builder sourceBuilder = Function.SourceSpec.newBuilder((Function.SourceSpec)functionDetails.getSource());
            sourceBuilder.setTypeClassName(typeArg);
            functionDetails.setSource(sourceBuilder);
            Function.SinkSpec sinkSpec = functionDetails.getSink();
            if (null == sinkSpec || org.apache.commons.lang3.StringUtils.isEmpty((CharSequence)sinkSpec.getTypeClassName())) {
                Function.SinkSpec.Builder sinkBuilder = Function.SinkSpec.newBuilder((Function.SinkSpec)sinkSpec);
                sinkBuilder.setTypeClassName(typeArg);
                functionDetails.setSink(sinkBuilder);
            }
        }
    }

    private void fillSinkTypeClass(Function.FunctionDetails.Builder functionDetails, File archive, String className) throws IOException, ClassNotFoundException {
        try (NarClassLoader ncl = NarClassLoader.getFromArchive((File)archive, Collections.emptySet());){
            String typeArg = FunctionCommon.getSinkType((String)className, (ClassLoader)ncl).getName();
            Function.SinkSpec.Builder sinkBuilder = Function.SinkSpec.newBuilder((Function.SinkSpec)functionDetails.getSink());
            sinkBuilder.setTypeClassName(typeArg);
            functionDetails.setSink(sinkBuilder);
            Function.SourceSpec sourceSpec = functionDetails.getSource();
            if (null == sourceSpec || org.apache.commons.lang3.StringUtils.isEmpty((CharSequence)sourceSpec.getTypeClassName())) {
                Function.SourceSpec.Builder sourceBuilder = Function.SourceSpec.newBuilder((Function.SourceSpec)sourceSpec);
                sourceBuilder.setTypeClassName(typeArg);
                functionDetails.setSource(sourceBuilder);
            }
        }
    }

    private static String getDownloadFileName(Function.FunctionDetails FunctionDetails2, Function.PackageLocationMetaData packageLocation) {
        if (!StringUtils.isEmpty((String)packageLocation.getOriginalFileName())) {
            return packageLocation.getOriginalFileName();
        }
        String[] hierarchy = FunctionDetails2.getClassName().split("\\.");
        String fileName = hierarchy.length <= 0 ? FunctionDetails2.getClassName() : (hierarchy.length == 1 ? hierarchy[0] : hierarchy[hierarchy.length - 2]);
        switch (FunctionDetails2.getRuntime()) {
            case JAVA: {
                return fileName + ".jar";
            }
            case PYTHON: {
                return fileName + ".py";
            }
            case GO: {
                return fileName + ".go";
            }
        }
        throw new RuntimeException("Unknown runtime " + FunctionDetails2.getRuntime());
    }

    public WorkerConfig getWorkerConfig() {
        return this.workerConfig;
    }

    public RuntimeFactory getRuntimeFactory() {
        return this.runtimeFactory;
    }

    public Namespace getDlogNamespace() {
        return this.dlogNamespace;
    }

    public ConnectorsManager getConnectorsManager() {
        return this.connectorsManager;
    }

    public PulsarAdmin getPulsarAdmin() {
        return this.pulsarAdmin;
    }

    public boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        if (!(o instanceof FunctionActioner)) {
            return false;
        }
        FunctionActioner other = (FunctionActioner)o;
        if (!other.canEqual(this)) {
            return false;
        }
        WorkerConfig this$workerConfig = this.getWorkerConfig();
        WorkerConfig other$workerConfig = other.getWorkerConfig();
        if (this$workerConfig == null ? other$workerConfig != null : !this$workerConfig.equals(other$workerConfig)) {
            return false;
        }
        RuntimeFactory this$runtimeFactory = this.getRuntimeFactory();
        RuntimeFactory other$runtimeFactory = other.getRuntimeFactory();
        if (this$runtimeFactory == null ? other$runtimeFactory != null : !this$runtimeFactory.equals(other$runtimeFactory)) {
            return false;
        }
        Namespace this$dlogNamespace = this.getDlogNamespace();
        Namespace other$dlogNamespace = other.getDlogNamespace();
        if (this$dlogNamespace == null ? other$dlogNamespace != null : !this$dlogNamespace.equals(other$dlogNamespace)) {
            return false;
        }
        ConnectorsManager this$connectorsManager = this.getConnectorsManager();
        ConnectorsManager other$connectorsManager = other.getConnectorsManager();
        if (this$connectorsManager == null ? other$connectorsManager != null : !this$connectorsManager.equals(other$connectorsManager)) {
            return false;
        }
        PulsarAdmin this$pulsarAdmin = this.getPulsarAdmin();
        PulsarAdmin other$pulsarAdmin = other.getPulsarAdmin();
        return !(this$pulsarAdmin == null ? other$pulsarAdmin != null : !this$pulsarAdmin.equals(other$pulsarAdmin));
    }

    protected boolean canEqual(Object other) {
        return other instanceof FunctionActioner;
    }

    public int hashCode() {
        int PRIME = 59;
        int result = 1;
        WorkerConfig $workerConfig = this.getWorkerConfig();
        result = result * 59 + ($workerConfig == null ? 43 : $workerConfig.hashCode());
        RuntimeFactory $runtimeFactory = this.getRuntimeFactory();
        result = result * 59 + ($runtimeFactory == null ? 43 : $runtimeFactory.hashCode());
        Namespace $dlogNamespace = this.getDlogNamespace();
        result = result * 59 + ($dlogNamespace == null ? 43 : $dlogNamespace.hashCode());
        ConnectorsManager $connectorsManager = this.getConnectorsManager();
        result = result * 59 + ($connectorsManager == null ? 43 : $connectorsManager.hashCode());
        PulsarAdmin $pulsarAdmin = this.getPulsarAdmin();
        result = result * 59 + ($pulsarAdmin == null ? 43 : $pulsarAdmin.hashCode());
        return result;
    }

    public String toString() {
        return "FunctionActioner(workerConfig=" + this.getWorkerConfig() + ", runtimeFactory=" + this.getRuntimeFactory() + ", dlogNamespace=" + this.getDlogNamespace() + ", connectorsManager=" + this.getConnectorsManager() + ", pulsarAdmin=" + this.getPulsarAdmin() + ")";
    }
}

