package org.apache.pulsar.functions;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.IStringConverter;
import org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.JCommander;
import org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.Parameter;
import org.apache.pulsar.functions.runtime.shaded.com.google.gson.GsonBuilder;
import org.apache.pulsar.functions.runtime.shaded.com.google.gson.JsonParser;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.utils.io.Connectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/LocalRunner.class */
public class LocalRunner {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) LocalRunner.class);
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final List<RuntimeSpawner> spawners = new LinkedList();

    @Parameter(names = {"--functionConfig"}, description = "The json representation of FunctionConfig", hidden = true, converter = FunctionConfigConverter.class)
    protected FunctionConfig functionConfig;

    @Parameter(names = {"--sourceConfig"}, description = "The json representation of SourceConfig", hidden = true, converter = SourceConfigConverter.class)
    protected SourceConfig sourceConfig;

    @Parameter(names = {"--sinkConfig"}, description = "The json representation of SinkConfig", hidden = true, converter = SinkConfigConverter.class)
    protected SinkConfig sinkConfig;

    @Parameter(names = {"--stateStorageServiceUrl"}, description = "The URL for the state storage service (by default Apache BookKeeper)", hidden = true)
    protected String stateStorageServiceUrl;

    @Parameter(names = {"--brokerServiceUrl"}, description = "The URL for the Pulsar broker", hidden = true)
    protected String brokerServiceUrl;

    @Parameter(names = {"--clientAuthPlugin"}, description = "Client authentication plugin using which function-process can connect to broker", hidden = true)
    protected String clientAuthPlugin;

    @Parameter(names = {"--clientAuthParams"}, description = "Client authentication param", hidden = true)
    protected String clientAuthParams;

    @Parameter(names = {"--useTls"}, description = "Use tls connection\n", hidden = true, arity = 1)
    protected boolean useTls;

    @Parameter(names = {"--tlsAllowInsecureConnection"}, description = "Allow insecure tls connection\n", hidden = true, arity = 1)
    protected boolean tlsAllowInsecureConnection;

    @Parameter(names = {"--tlsHostNameVerificationEnabled"}, description = "Enable hostname verification", hidden = true, arity = 1)
    protected boolean tlsHostNameVerificationEnabled;

    @Parameter(names = {"--tlsTrustCertFilePath"}, description = "tls trust cert file path", hidden = true)
    protected String tlsTrustCertFilePath;

    @Parameter(names = {"--instanceIdOffset"}, description = "Start the instanceIds from this offset", hidden = true)
    protected int instanceIdOffset;

    @Parameter(names = {"--runtime"}, description = "Function runtime to use (Thread/Process)", hidden = true, converter = RuntimeConverter.class)
    protected RuntimeEnv runtimeEnv;
    private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";

    /* loaded from: input_file:org/apache/pulsar/functions/LocalRunner$FunctionConfigConverter.class */
    public static class FunctionConfigConverter implements IStringConverter<FunctionConfig> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.IStringConverter
        public FunctionConfig convert(String str) {
            try {
                return (FunctionConfig) ObjectMapperFactory.getThreadLocal().readValue(str, FunctionConfig.class);
            } catch (IOException e) {
                throw new RuntimeException("Failed to parse function config:", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/LocalRunner$LocalRunnerBuilder.class */
    public static class LocalRunnerBuilder {
        private FunctionConfig functionConfig;
        private SourceConfig sourceConfig;
        private SinkConfig sinkConfig;
        private String stateStorageServiceUrl;
        private String brokerServiceUrl;
        private String clientAuthPlugin;
        private String clientAuthParams;
        private boolean useTls;
        private boolean tlsAllowInsecureConnection;
        private boolean tlsHostNameVerificationEnabled;
        private String tlsTrustCertFilePath;
        private int instanceIdOffset;
        private RuntimeEnv runtimeEnv;

        LocalRunnerBuilder() {
        }

        public LocalRunnerBuilder functionConfig(FunctionConfig functionConfig) {
            this.functionConfig = functionConfig;
            return this;
        }

        public LocalRunnerBuilder sourceConfig(SourceConfig sourceConfig) {
            this.sourceConfig = sourceConfig;
            return this;
        }

        public LocalRunnerBuilder sinkConfig(SinkConfig sinkConfig) {
            this.sinkConfig = sinkConfig;
            return this;
        }

        public LocalRunnerBuilder stateStorageServiceUrl(String str) {
            this.stateStorageServiceUrl = str;
            return this;
        }

        public LocalRunnerBuilder brokerServiceUrl(String str) {
            this.brokerServiceUrl = str;
            return this;
        }

        public LocalRunnerBuilder clientAuthPlugin(String str) {
            this.clientAuthPlugin = str;
            return this;
        }

        public LocalRunnerBuilder clientAuthParams(String str) {
            this.clientAuthParams = str;
            return this;
        }

        public LocalRunnerBuilder useTls(boolean z) {
            this.useTls = z;
            return this;
        }

        public LocalRunnerBuilder tlsAllowInsecureConnection(boolean z) {
            this.tlsAllowInsecureConnection = z;
            return this;
        }

        public LocalRunnerBuilder tlsHostNameVerificationEnabled(boolean z) {
            this.tlsHostNameVerificationEnabled = z;
            return this;
        }

        public LocalRunnerBuilder tlsTrustCertFilePath(String str) {
            this.tlsTrustCertFilePath = str;
            return this;
        }

        public LocalRunnerBuilder instanceIdOffset(int i) {
            this.instanceIdOffset = i;
            return this;
        }

        public LocalRunnerBuilder runtimeEnv(RuntimeEnv runtimeEnv) {
            this.runtimeEnv = runtimeEnv;
            return this;
        }

        public LocalRunner build() {
            return new LocalRunner(this.functionConfig, this.sourceConfig, this.sinkConfig, this.stateStorageServiceUrl, this.brokerServiceUrl, this.clientAuthPlugin, this.clientAuthParams, this.useTls, this.tlsAllowInsecureConnection, this.tlsHostNameVerificationEnabled, this.tlsTrustCertFilePath, this.instanceIdOffset, this.runtimeEnv);
        }

        public String toString() {
            return "LocalRunner.LocalRunnerBuilder(functionConfig=" + this.functionConfig + ", sourceConfig=" + this.sourceConfig + ", sinkConfig=" + this.sinkConfig + ", stateStorageServiceUrl=" + this.stateStorageServiceUrl + ", brokerServiceUrl=" + this.brokerServiceUrl + ", clientAuthPlugin=" + this.clientAuthPlugin + ", clientAuthParams=" + this.clientAuthParams + ", useTls=" + this.useTls + ", tlsAllowInsecureConnection=" + this.tlsAllowInsecureConnection + ", tlsHostNameVerificationEnabled=" + this.tlsHostNameVerificationEnabled + ", tlsTrustCertFilePath=" + this.tlsTrustCertFilePath + ", instanceIdOffset=" + this.instanceIdOffset + ", runtimeEnv=" + this.runtimeEnv + DefaultExpressionEngine.DEFAULT_INDEX_END;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/LocalRunner$RuntimeConverter.class */
    public static class RuntimeConverter implements IStringConverter<RuntimeEnv> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.IStringConverter
        public RuntimeEnv convert(String str) {
            return RuntimeEnv.valueOf(str);
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/LocalRunner$RuntimeEnv.class */
    public enum RuntimeEnv {
        THREAD,
        PROCESS
    }

    /* loaded from: input_file:org/apache/pulsar/functions/LocalRunner$SinkConfigConverter.class */
    public static class SinkConfigConverter implements IStringConverter<SinkConfig> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.IStringConverter
        public SinkConfig convert(String str) {
            try {
                return (SinkConfig) ObjectMapperFactory.getThreadLocal().readValue(str, SinkConfig.class);
            } catch (IOException e) {
                throw new RuntimeException("Failed to parse sink config:", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/LocalRunner$SourceConfigConverter.class */
    public static class SourceConfigConverter implements IStringConverter<SourceConfig> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.IStringConverter
        public SourceConfig convert(String str) {
            try {
                return (SourceConfig) ObjectMapperFactory.getThreadLocal().readValue(str, SourceConfig.class);
            } catch (IOException e) {
                throw new RuntimeException("Failed to parse source config:", e);
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        LocalRunner build = builder().build();
        JCommander jCommander = new JCommander(build);
        jCommander.setProgramName("LocalRunner");
        jCommander.parse(strArr);
        build.start(true);
    }

    public LocalRunner(FunctionConfig functionConfig, SourceConfig sourceConfig, SinkConfig sinkConfig, String str, String str2, String str3, String str4, boolean z, boolean z2, boolean z3, String str5, int i, RuntimeEnv runtimeEnv) {
        this.instanceIdOffset = 0;
        this.functionConfig = functionConfig;
        this.sourceConfig = sourceConfig;
        this.sinkConfig = sinkConfig;
        this.stateStorageServiceUrl = str;
        this.brokerServiceUrl = str2;
        this.clientAuthPlugin = str3;
        this.clientAuthParams = str4;
        this.useTls = z;
        this.tlsAllowInsecureConnection = z2;
        this.tlsHostNameVerificationEnabled = z3;
        this.tlsTrustCertFilePath = str5;
        this.instanceIdOffset = i;
        this.runtimeEnv = runtimeEnv;
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.pulsar.functions.LocalRunner.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                LocalRunner.this.stop();
            }
        });
    }

    public synchronized void stop() {
        this.running.set(false);
        log.info("Shutting down the localrun runtimeSpawner ...");
        Iterator<RuntimeSpawner> it = this.spawners.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.spawners.clear();
    }

    public void start(boolean z) throws Exception {
        String archive;
        int intValue;
        Function.FunctionDetails convert;
        LinkedList<RuntimeSpawner> linkedList = new LinkedList();
        synchronized (this) {
            if (this.running.get()) {
                throw new IllegalArgumentException("Pulsar Function local run already started!");
            }
            if (this.functionConfig != null) {
                FunctionConfigUtils.inferMissingArguments(this.functionConfig);
                ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                intValue = this.functionConfig.getParallelism().intValue();
                if (this.functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
                    archive = this.functionConfig.getJar();
                    if (archive == null && this.functionConfig.getClassName() != null) {
                        archive = Thread.currentThread().getContextClassLoader().loadClass(this.functionConfig.getClassName()).getProtectionDomain().getCodeSource().getLocation().getFile();
                    }
                    if (Utils.isFunctionPackageUrlSupported(archive)) {
                        contextClassLoader = FunctionConfigUtils.validate(this.functionConfig, FunctionCommon.extractFileFromPkgURL(archive));
                    } else {
                        File file = new File(archive);
                        if (!file.exists()) {
                            throw new RuntimeException("User jar does not exist");
                        }
                        contextClassLoader = FunctionConfigUtils.validate(this.functionConfig, file);
                    }
                } else if (this.functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
                    archive = this.functionConfig.getGo();
                } else {
                    if (this.functionConfig.getRuntime() != FunctionConfig.Runtime.PYTHON) {
                        throw new UnsupportedOperationException();
                    }
                    archive = this.functionConfig.getPy();
                }
                convert = FunctionConfigUtils.convert(this.functionConfig, contextClassLoader);
            } else if (this.sourceConfig != null) {
                Utils.inferMissingArguments(this.sourceConfig);
                archive = this.sourceConfig.getArchive();
                if (archive == null && this.sourceConfig.getClassName() != null) {
                    archive = Thread.currentThread().getContextClassLoader().loadClass(this.sourceConfig.getClassName()).getProtectionDomain().getCodeSource().getLocation().getFile();
                }
                if (archive == null) {
                    archive = Thread.currentThread().getContextClassLoader().loadClass(LocalRunner.class.getName()).getProtectionDomain().getCodeSource().getLocation().getFile();
                }
                String isBuiltInSource = isBuiltInSource(archive);
                if (isBuiltInSource != null) {
                    this.sourceConfig.setArchive(isBuiltInSource);
                }
                intValue = this.sourceConfig.getParallelism().intValue();
                if (Utils.isFunctionPackageUrlSupported(archive)) {
                    convert = SourceConfigUtils.convert(this.sourceConfig, SourceConfigUtils.validate(this.sourceConfig, null, FunctionCommon.extractFileFromPkgURL(archive)));
                } else {
                    File file2 = new File(archive);
                    if (!file2.exists()) {
                        throw new RuntimeException("Source archive does not exist");
                    }
                    convert = SourceConfigUtils.convert(this.sourceConfig, SourceConfigUtils.validate(this.sourceConfig, null, file2));
                }
            } else {
                if (this.sinkConfig == null) {
                    throw new IllegalArgumentException("Must specify Function, Source or Sink config");
                }
                Utils.inferMissingArguments(this.sinkConfig);
                archive = this.sinkConfig.getArchive();
                if (archive == null && this.sinkConfig.getClassName() != null) {
                    archive = Thread.currentThread().getContextClassLoader().loadClass(this.sinkConfig.getClassName()).getProtectionDomain().getCodeSource().getLocation().getFile();
                }
                String isBuiltInSource2 = isBuiltInSource(archive);
                if (isBuiltInSource2 != null) {
                    this.sinkConfig.setArchive(isBuiltInSource2);
                }
                intValue = this.sinkConfig.getParallelism().intValue();
                if (Utils.isFunctionPackageUrlSupported(archive)) {
                    convert = SinkConfigUtils.convert(this.sinkConfig, SinkConfigUtils.validate(this.sinkConfig, null, FunctionCommon.extractFileFromPkgURL(archive)));
                } else {
                    File file3 = new File(archive);
                    if (!file3.exists()) {
                        throw new RuntimeException("Sink archive does not exist");
                    }
                    convert = SinkConfigUtils.convert(this.sinkConfig, SinkConfigUtils.validate(this.sinkConfig, null, file3));
                }
            }
            if (System.getProperty(FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY) == null) {
                System.setProperty(FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, LocalRunner.class.getProtectionDomain().getCodeSource().getLocation().getFile());
            }
            AuthenticationConfig build = AuthenticationConfig.builder().clientAuthenticationPlugin(this.clientAuthPlugin).clientAuthenticationParameters(this.clientAuthParams).useTls(this.useTls).tlsAllowInsecureConnection(this.tlsAllowInsecureConnection).tlsHostnameVerificationEnable(this.tlsHostNameVerificationEnabled).tlsTrustCertsFilePath(this.tlsTrustCertFilePath).build();
            String str = DEFAULT_SERVICE_URL;
            if (this.brokerServiceUrl != null) {
                str = this.brokerServiceUrl;
            }
            if (!(this.sourceConfig == null && this.sinkConfig == null && this.functionConfig.getRuntime() != FunctionConfig.Runtime.JAVA) && (this.runtimeEnv == null || this.runtimeEnv == RuntimeEnv.THREAD)) {
                startThreadedMode(convert, intValue, this.instanceIdOffset, str, this.stateStorageServiceUrl, build, archive);
            } else {
                startProcessMode(convert, intValue, this.instanceIdOffset, str, this.stateStorageServiceUrl, build, archive);
            }
            linkedList.addAll(this.spawners);
        }
        if (z) {
            for (RuntimeSpawner runtimeSpawner : linkedList) {
                runtimeSpawner.join();
                log.info("RuntimeSpawner quit because of", runtimeSpawner.getRuntime().getDeathException());
            }
        }
    }

    private void startProcessMode(Function.FunctionDetails functionDetails, int i, int i2, String str, String str2, AuthenticationConfig authenticationConfig, String str3) throws Exception {
        ProcessRuntimeFactory processRuntimeFactory = new ProcessRuntimeFactory(str, str2, authenticationConfig, null, null, null, null, new DefaultSecretsProviderConfigurator(), false);
        Throwable th = null;
        try {
            for (int i3 = 0; i3 < i; i3++) {
                InstanceConfig instanceConfig = new InstanceConfig();
                instanceConfig.setFunctionDetails(functionDetails);
                instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
                instanceConfig.setFunctionId(UUID.randomUUID().toString());
                instanceConfig.setInstanceId(i3 + i2);
                instanceConfig.setMaxBufferedTuples(1024);
                instanceConfig.setPort(FunctionCommon.findAvailablePort());
                instanceConfig.setClusterName("local");
                RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, str3, null, processRuntimeFactory, 30000L);
                this.spawners.add(runtimeSpawner);
                runtimeSpawner.start();
            }
            final Timer timer = new Timer();
            timer.scheduleAtFixedRate(new TimerTask() { // from class: org.apache.pulsar.functions.LocalRunner.2
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    CompletableFuture[] completableFutureArr = new CompletableFuture[LocalRunner.this.spawners.size()];
                    int i4 = 0;
                    Iterator it = LocalRunner.this.spawners.iterator();
                    while (it.hasNext()) {
                        completableFutureArr[i4] = ((RuntimeSpawner) it.next()).getFunctionStatusAsJson(i4);
                        i4++;
                    }
                    try {
                        CompletableFuture.allOf(completableFutureArr).get(5L, TimeUnit.SECONDS);
                        for (CompletableFuture completableFuture : completableFutureArr) {
                            LocalRunner.log.info(new GsonBuilder().setPrettyPrinting().create().toJson(new JsonParser().parse((String) completableFuture.get())));
                        }
                    } catch (Exception e) {
                        LocalRunner.log.error("Could not get status from all local instances");
                    }
                }
            }, 30000L, 30000L);
            Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.pulsar.functions.LocalRunner.3
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    timer.cancel();
                }
            });
            if (processRuntimeFactory != null) {
                if (0 == 0) {
                    processRuntimeFactory.close();
                    return;
                }
                try {
                    processRuntimeFactory.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (processRuntimeFactory != null) {
                if (0 != 0) {
                    try {
                        processRuntimeFactory.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    processRuntimeFactory.close();
                }
            }
            throw th3;
        }
    }

    private void startThreadedMode(Function.FunctionDetails functionDetails, int i, int i2, String str, String str2, AuthenticationConfig authenticationConfig, String str3) throws Exception {
        ThreadRuntimeFactory threadRuntimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", str, str2, authenticationConfig, new ClearTextSecretsProvider(), null, null);
        for (int i3 = 0; i3 < i; i3++) {
            InstanceConfig instanceConfig = new InstanceConfig();
            instanceConfig.setFunctionDetails(functionDetails);
            instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
            instanceConfig.setFunctionId(UUID.randomUUID().toString());
            instanceConfig.setInstanceId(i3 + i2);
            instanceConfig.setMaxBufferedTuples(1024);
            instanceConfig.setPort(FunctionCommon.findAvailablePort());
            instanceConfig.setClusterName("local");
            RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, str3, null, threadRuntimeFactory, 30000L);
            this.spawners.add(runtimeSpawner);
            runtimeSpawner.start();
        }
    }

    private String isBuiltInSource(String str) throws IOException {
        Connectors connectors = getConnectors();
        if (connectors.getSources().containsKey(str)) {
            return connectors.getSources().get(str).toString();
        }
        return null;
    }

    private String isBuiltInSink(String str) throws IOException {
        Connectors connectors = getConnectors();
        if (connectors.getSinks().containsKey(str)) {
            return connectors.getSinks().get(str).toString();
        }
        return null;
    }

    private Connectors getConnectors() throws IOException {
        String str = System.getenv("PULSAR_HOME");
        if (str == null) {
            str = Paths.get("", new String[0]).toAbsolutePath().toString();
        }
        return ConnectorUtils.searchForConnectors(Paths.get(str, "connectors").toString());
    }

    public static LocalRunnerBuilder builder() {
        return new LocalRunnerBuilder();
    }
}
