package org.apache.flink.table.client.gateway.local;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli.Options;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.api.QueryConfig;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.calcite.FlinkTypeFactory;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.local.result.BasicResult;
import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
import org.apache.flink.table.client.gateway.local.result.DynamicResult;
import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/client/gateway/local/LocalExecutor.class */
public class LocalExecutor implements Executor {
    private static final Logger LOG = LoggerFactory.getLogger(LocalExecutor.class);
    private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml";
    private final Environment defaultEnvironment;
    private final List<URL> dependencies;
    private final Configuration flinkConfig;
    private final List<CustomCommandLine<?>> commandLines;
    private final Options commandLineOptions;
    private final ResultStore resultStore;
    private ExecutionContext<?> executionContext;

    public LocalExecutor(URL url, List<URL> list, List<URL> list2) {
        try {
            String configurationDirectoryFromEnv = CliFrontend.getConfigurationDirectoryFromEnv();
            this.flinkConfig = GlobalConfiguration.loadConfiguration(configurationDirectoryFromEnv);
            try {
                FileSystem.initialize(this.flinkConfig);
                this.commandLines = CliFrontend.loadCustomCommandLines(this.flinkConfig, configurationDirectoryFromEnv);
                this.commandLineOptions = collectCommandLineOptions(this.commandLines);
                if (url == null) {
                    String str = configurationDirectoryFromEnv + "/" + DEFAULT_ENV_FILE;
                    System.out.println("No default environment specified.");
                    System.out.print("Searching for '" + str + "'...");
                    File file = new File(str);
                    if (file.exists()) {
                        System.out.println("found.");
                        try {
                            url = Path.fromLocalFile(file).toUri().toURL();
                            LOG.info("Using default environment file: {}", url);
                        } catch (MalformedURLException e) {
                            throw new SqlClientException(e);
                        }
                    } else {
                        System.out.println("not found.");
                    }
                }
                if (url != null) {
                    System.out.println("Reading default environment from: " + url);
                    try {
                        this.defaultEnvironment = Environment.parse(url);
                    } catch (IOException e2) {
                        throw new SqlClientException("Could not read default environment file at: " + url, e2);
                    }
                } else {
                    this.defaultEnvironment = new Environment();
                }
                this.dependencies = discoverDependencies(list, list2);
                this.resultStore = new ResultStore(this.flinkConfig);
            } catch (IOException e3) {
                throw new SqlClientException("Error while setting the default filesystem scheme from configuration.", e3);
            }
        } catch (Exception e4) {
            throw new SqlClientException("Could not load Flink configuration.", e4);
        }
    }

    public LocalExecutor(Environment environment, List<URL> list, Configuration configuration, CustomCommandLine<?> customCommandLine) {
        this.defaultEnvironment = environment;
        this.dependencies = list;
        this.flinkConfig = configuration;
        this.commandLines = Collections.singletonList(customCommandLine);
        this.commandLineOptions = collectCommandLineOptions(this.commandLines);
        this.resultStore = new ResultStore(configuration);
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public void start() {
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public Map<String, String> getSessionProperties(SessionContext sessionContext) throws SqlExecutionException {
        Environment mergedEnvironment = getOrCreateExecutionContext(sessionContext).getMergedEnvironment();
        HashMap hashMap = new HashMap();
        hashMap.putAll(mergedEnvironment.getExecution().asTopLevelMap());
        hashMap.putAll(mergedEnvironment.getDeployment().asTopLevelMap());
        return hashMap;
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public List<String> listTables(SessionContext sessionContext) throws SqlExecutionException {
        return Arrays.asList(getOrCreateExecutionContext(sessionContext).createEnvironmentInstance().getTableEnvironment().listTables());
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public List<String> listUserDefinedFunctions(SessionContext sessionContext) throws SqlExecutionException {
        return Arrays.asList(getOrCreateExecutionContext(sessionContext).createEnvironmentInstance().getTableEnvironment().listUserDefinedFunctions());
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public TableSchema getTableSchema(SessionContext sessionContext, String str) throws SqlExecutionException {
        try {
            return getOrCreateExecutionContext(sessionContext).createEnvironmentInstance().getTableEnvironment().scan(str).getSchema();
        } catch (Throwable th) {
            throw new SqlExecutionException("No table with this name could be found.", th);
        }
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public String explainStatement(SessionContext sessionContext, String str) throws SqlExecutionException {
        ExecutionContext<?> orCreateExecutionContext = getOrCreateExecutionContext(sessionContext);
        TableEnvironment tableEnvironment = orCreateExecutionContext.createEnvironmentInstance().getTableEnvironment();
        try {
            Table createTable = createTable(tableEnvironment, str);
            return (String) orCreateExecutionContext.wrapClassLoader(() -> {
                return tableEnvironment.explain(createTable);
            });
        } catch (Throwable th) {
            throw new SqlExecutionException("Invalid SQL statement.", th);
        }
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public List<String> completeStatement(SessionContext sessionContext, String str, int i) {
        try {
            return Arrays.asList(getOrCreateExecutionContext(sessionContext).createEnvironmentInstance().getTableEnvironment().getCompletionHints(str, i));
        } catch (Throwable th) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Could not complete statement at " + i + ":" + str, th);
            }
            return Collections.emptyList();
        }
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public ResultDescriptor executeQuery(SessionContext sessionContext, String str) throws SqlExecutionException {
        return executeQueryInternal(getOrCreateExecutionContext(sessionContext), str);
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public TypedResult<List<Tuple2<Boolean, Row>>> retrieveResultChanges(SessionContext sessionContext, String str) throws SqlExecutionException {
        DynamicResult result = this.resultStore.getResult(str);
        if (result == null) {
            throw new SqlExecutionException("Could not find a result with result identifier '" + str + "'.");
        }
        if (result.isMaterialized()) {
            throw new SqlExecutionException("Invalid result retrieval mode.");
        }
        return ((ChangelogResult) result).retrieveChanges();
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public TypedResult<Integer> snapshotResult(SessionContext sessionContext, String str, int i) throws SqlExecutionException {
        DynamicResult result = this.resultStore.getResult(str);
        if (result == null) {
            throw new SqlExecutionException("Could not find a result with result identifier '" + str + "'.");
        }
        if (result.isMaterialized()) {
            return ((MaterializedResult) result).snapshot(i);
        }
        throw new SqlExecutionException("Invalid result retrieval mode.");
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public List<Row> retrieveResultPage(String str, int i) throws SqlExecutionException {
        DynamicResult result = this.resultStore.getResult(str);
        if (result == null) {
            throw new SqlExecutionException("Could not find a result with result identifier '" + str + "'.");
        }
        if (result.isMaterialized()) {
            return ((MaterializedResult) result).retrievePage(i);
        }
        throw new SqlExecutionException("Invalid result retrieval mode.");
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public void cancelQuery(SessionContext sessionContext, String str) throws SqlExecutionException {
        cancelQueryInternal(getOrCreateExecutionContext(sessionContext), str);
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public ProgramTargetDescriptor executeUpdate(SessionContext sessionContext, String str) throws SqlExecutionException {
        return executeUpdateInternal(getOrCreateExecutionContext(sessionContext), str);
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public void validateSession(SessionContext sessionContext) throws SqlExecutionException {
        getOrCreateExecutionContext(sessionContext).createEnvironmentInstance();
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public void stop(SessionContext sessionContext) {
        this.resultStore.getResults().forEach(str -> {
            try {
                cancelQuery(sessionContext, str);
            } catch (Throwable th) {
            }
        });
    }

    /* JADX WARN: Failed to calculate best type for var: r10v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Failed to calculate best type for var: r9v2 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 10, insn: 0x00e2: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r10 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:56:0x00e2 */
    /* JADX WARN: Not initialized variable reg: 9, insn: 0x00dd: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r9 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:54:0x00dd */
    /* JADX WARN: Type inference failed for: r10v0, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r9v2, types: [org.apache.flink.client.deployment.ClusterDescriptor] */
    private <T> void cancelQueryInternal(ExecutionContext<T> executionContext, String str) {
        ?? r9;
        ?? r10;
        DynamicResult<T> result = this.resultStore.getResult(str);
        if (result == null) {
            throw new SqlExecutionException("Could not find a result with result identifier '" + str + "'.");
        }
        LOG.info("Cancelling job {} and result retrieval.", str);
        result.close();
        this.resultStore.removeResult(str);
        try {
            try {
                ClusterDescriptor<T> createClusterDescriptor = executionContext.createClusterDescriptor();
                Throwable th = null;
                ClusterClient clusterClient = null;
                try {
                    try {
                        ClusterClient retrieve = createClusterDescriptor.retrieve(executionContext.getClusterId());
                        try {
                            retrieve.cancel(new JobID(StringUtils.hexStringToByte(str)));
                        } catch (Throwable th2) {
                        }
                        if (retrieve != null) {
                            try {
                                retrieve.shutdown();
                            } catch (Exception e) {
                            }
                        }
                        if (createClusterDescriptor != null) {
                            if (0 != 0) {
                                try {
                                    createClusterDescriptor.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                createClusterDescriptor.close();
                            }
                        }
                    } catch (Throwable th4) {
                        if (0 != 0) {
                            try {
                                clusterClient.shutdown();
                            } catch (Exception e2) {
                                throw th4;
                            }
                        }
                        throw th4;
                    }
                } catch (Exception e3) {
                    throw new SqlExecutionException("Could not retrieve or create a cluster.", e3);
                }
            } catch (Throwable th5) {
                if (r9 != 0) {
                    if (r10 != 0) {
                        try {
                            r9.close();
                        } catch (Throwable th6) {
                            r10.addSuppressed(th6);
                        }
                    } else {
                        r9.close();
                    }
                }
                throw th5;
            }
        } catch (SqlExecutionException e4) {
            throw e4;
        } catch (Exception e5) {
            throw new SqlExecutionException("Could not locate a cluster.", e5);
        }
    }

    private <C> ProgramTargetDescriptor executeUpdateInternal(ExecutionContext<C> executionContext, String str) {
        ExecutionContext<C>.EnvironmentInstance createEnvironmentInstance = executionContext.createEnvironmentInstance();
        applyUpdate(executionContext, createEnvironmentInstance.getTableEnvironment(), createEnvironmentInstance.getQueryConfig(), str);
        String str2 = executionContext.getSessionContext().getName() + ": " + str;
        try {
            JobGraph createJobGraph = createEnvironmentInstance.createJobGraph(str2);
            BasicResult basicResult = new BasicResult();
            new ProgramDeployer(executionContext, str2, createJobGraph, basicResult, false).run();
            return ProgramTargetDescriptor.of(basicResult.getClusterId(), createJobGraph.getJobID(), basicResult.getWebInterfaceUrl());
        } catch (Throwable th) {
            throw new SqlExecutionException("Invalid SQL statement.", th);
        }
    }

    private <C> ResultDescriptor executeQueryInternal(ExecutionContext<C> executionContext, String str) {
        ExecutionContext<C>.EnvironmentInstance createEnvironmentInstance = executionContext.createEnvironmentInstance();
        Table createTable = createTable(createEnvironmentInstance.getTableEnvironment(), str);
        DynamicResult createResult = this.resultStore.createResult(executionContext.getMergedEnvironment(), removeTimeAttributes(createTable.getSchema()), createEnvironmentInstance.getExecutionConfig());
        String str2 = executionContext.getSessionContext().getName() + ": " + str;
        try {
            executionContext.wrapClassLoader(() -> {
                createTable.writeToSink(createResult.getTableSink(), createEnvironmentInstance.getQueryConfig());
                return null;
            });
            JobGraph createJobGraph = createEnvironmentInstance.createJobGraph(str2);
            String jobID = createJobGraph.getJobID().toString();
            this.resultStore.storeResult(jobID, createResult);
            createResult.startRetrieval(new ProgramDeployer<>(executionContext, str2, createJobGraph, createResult, true));
            return new ResultDescriptor(jobID, removeTimeAttributes(createTable.getSchema()), createResult.isMaterialized());
        } catch (Throwable th) {
            createResult.close();
            throw new SqlExecutionException("Invalid SQL query.", th);
        }
    }

    private Table createTable(TableEnvironment tableEnvironment, String str) {
        try {
            return tableEnvironment.sqlQuery(str);
        } catch (Throwable th) {
            throw new SqlExecutionException("Invalid SQL statement.", th);
        }
    }

    private <C> void applyUpdate(ExecutionContext<C> executionContext, TableEnvironment tableEnvironment, QueryConfig queryConfig, String str) {
        try {
            executionContext.wrapClassLoader(() -> {
                tableEnvironment.sqlUpdate(str, queryConfig);
                return null;
            });
        } catch (Throwable th) {
            throw new SqlExecutionException("Invalid SQL update statement.", th);
        }
    }

    private synchronized ExecutionContext<?> getOrCreateExecutionContext(SessionContext sessionContext) throws SqlExecutionException {
        if (this.executionContext == null || !this.executionContext.getSessionContext().equals(sessionContext)) {
            try {
                this.executionContext = new ExecutionContext<>(this.defaultEnvironment, sessionContext, this.dependencies, this.flinkConfig, this.commandLineOptions, this.commandLines);
            } catch (Throwable th) {
                throw new SqlExecutionException("Could not create execution context.", th);
            }
        }
        return this.executionContext;
    }

    private static List<URL> discoverDependencies(List<URL> list, List<URL> list2) {
        ArrayList arrayList = new ArrayList();
        try {
            for (URL url : list) {
                JobWithJars.checkJarFile(url);
                arrayList.add(url);
            }
            Iterator<URL> it = list2.iterator();
            while (it.hasNext()) {
                File file = new File(it.next().toURI());
                if (!file.isDirectory()) {
                    throw new SqlClientException("Directory expected: " + file);
                }
                if (!file.canRead()) {
                    throw new SqlClientException("Directory cannot be read: " + file);
                }
                File[] listFiles = file.listFiles();
                if (listFiles == null) {
                    throw new SqlClientException("Directory cannot be read: " + file);
                }
                for (File file2 : listFiles) {
                    if (file2.isFile() && file2.getAbsolutePath().toLowerCase().endsWith(".jar")) {
                        URL url2 = file2.toURI().toURL();
                        JobWithJars.checkJarFile(url2);
                        arrayList.add(url2);
                    }
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Using the following dependencies: {}", arrayList);
            }
            return arrayList;
        } catch (Exception e) {
            throw new SqlClientException("Could not load all required JAR files.", e);
        }
    }

    private static Options collectCommandLineOptions(List<CustomCommandLine<?>> list) {
        Options options = new Options();
        Iterator<CustomCommandLine<?>> it = list.iterator();
        while (it.hasNext()) {
            it.next().addRunOptions(options);
        }
        return CliFrontendParser.mergeOptions(CliFrontendParser.getRunCommandOptions(), options);
    }

    private static TableSchema removeTimeAttributes(TableSchema tableSchema) {
        TableSchema.Builder builder = TableSchema.builder();
        for (int i = 0; i < tableSchema.getFieldCount(); i++) {
            TypeInformation<?> typeInformation = tableSchema.getFieldTypes()[i];
            builder.field(tableSchema.getFieldNames()[i], FlinkTypeFactory.isTimeIndicatorType(typeInformation) ? Types.SQL_TIMESTAMP : typeInformation);
        }
        return builder.build();
    }
}
