package org.apache.storm.sql;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.jar.Attributes;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.zip.ZipEntry;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.impl.ScalarFunctionImpl;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.sql.compiler.CompilerUtil;
import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
import org.apache.storm.sql.javac.CompilingClassLoader;
import org.apache.storm.sql.parser.ColumnConstraint;
import org.apache.storm.sql.parser.ColumnDefinition;
import org.apache.storm.sql.parser.SqlCreateFunction;
import org.apache.storm.sql.parser.SqlCreateTable;
import org.apache.storm.sql.parser.StormParser;
import org.apache.storm.sql.runtime.ChannelHandler;
import org.apache.storm.sql.runtime.DataSource;
import org.apache.storm.sql.runtime.DataSourcesRegistry;
import org.apache.storm.sql.runtime.FieldInfo;
import org.apache.storm.sql.runtime.ISqlTridentDataSource;
import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;
import org.apache.storm.trident.TridentTopology;

/* loaded from: input_file:org/apache/storm/sql/StormSqlImpl.class */
class StormSqlImpl extends StormSql {
    private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
    private final SchemaPlus schema = Frameworks.createRootSchema(true);
    private boolean hasUdf = false;

    @Override // org.apache.storm.sql.StormSql
    public void execute(Iterable<String> iterable, ChannelHandler channelHandler) throws Exception {
        HashMap hashMap = new HashMap();
        for (String str : iterable) {
            SqlNode parseSqlStmtEof = new StormParser(str).impl().parseSqlStmtEof();
            if (parseSqlStmtEof instanceof SqlCreateTable) {
                handleCreateTable((SqlCreateTable) parseSqlStmtEof, hashMap);
            } else if (parseSqlStmtEof instanceof SqlCreateFunction) {
                handleCreateFunction((SqlCreateFunction) parseSqlStmtEof);
            } else {
                Planner planner = Frameworks.getPlanner(buildFrameWorkConfig());
                new PlanCompiler(this.typeFactory).compile(planner.convert(planner.validate(planner.parse(str)))).initialize(hashMap, channelHandler);
            }
        }
    }

    @Override // org.apache.storm.sql.StormSql
    public void submit(String str, Iterable<String> iterable, Map<String, ?> map, SubmitOptions submitOptions, StormSubmitter.ProgressListener progressListener, String str2) throws Exception {
        HashMap hashMap = new HashMap();
        for (String str3 : iterable) {
            SqlNode parseSqlStmtEof = new StormParser(str3).impl().parseSqlStmtEof();
            if (parseSqlStmtEof instanceof SqlCreateTable) {
                handleCreateTableForTrident((SqlCreateTable) parseSqlStmtEof, hashMap);
            } else if (parseSqlStmtEof instanceof SqlCreateFunction) {
                handleCreateFunction((SqlCreateFunction) parseSqlStmtEof);
            } else {
                Planner planner = Frameworks.getPlanner(buildFrameWorkConfig());
                RelNode convert = planner.convert(planner.validate(planner.parse(str3)));
                org.apache.storm.sql.compiler.backends.trident.PlanCompiler planCompiler = new org.apache.storm.sql.compiler.backends.trident.PlanCompiler(this.typeFactory);
                AbstractTridentProcessor compile = planCompiler.compile(convert);
                TridentTopology build = compile.build(hashMap);
                Path path = null;
                try {
                    path = Files.createTempFile("storm-sql", ".jar", new FileAttribute[0]);
                    System.setProperty("storm.jar", path.toString());
                    packageTopology(path, planCompiler.getCompilingClassLoader(), compile);
                    StormSubmitter.submitTopologyAs(str, map, build.build(), submitOptions, progressListener, str2);
                    if (path != null) {
                        Files.delete(path);
                    }
                } catch (Throwable th) {
                    if (path != null) {
                        Files.delete(path);
                    }
                    throw th;
                }
            }
        }
    }

    private void packageTopology(Path path, CompilingClassLoader compilingClassLoader, AbstractTridentProcessor abstractTridentProcessor) throws IOException {
        Manifest manifest = new Manifest();
        Attributes mainAttributes = manifest.getMainAttributes();
        mainAttributes.put(Attributes.Name.MANIFEST_VERSION, "1.0");
        mainAttributes.put(Attributes.Name.MAIN_CLASS, abstractTridentProcessor.getClass().getCanonicalName());
        JarOutputStream jarOutputStream = new JarOutputStream(new BufferedOutputStream(new FileOutputStream(path.toFile())), manifest);
        Throwable th = null;
        try {
            try {
                for (Map.Entry<String, ByteArrayOutputStream> entry : compilingClassLoader.getClasses().entrySet()) {
                    jarOutputStream.putNextEntry(new ZipEntry(entry.getKey().replace(".", "/") + ".class"));
                    jarOutputStream.write(entry.getValue().toByteArray());
                    jarOutputStream.closeEntry();
                }
                if (jarOutputStream != null) {
                    if (0 == 0) {
                        jarOutputStream.close();
                        return;
                    }
                    try {
                        jarOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (jarOutputStream != null) {
                if (th != null) {
                    try {
                        jarOutputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    jarOutputStream.close();
                }
            }
            throw th4;
        }
    }

    private void handleCreateTable(SqlCreateTable sqlCreateTable, Map<String, DataSource> map) {
        DataSource construct = DataSourcesRegistry.construct(sqlCreateTable.location(), sqlCreateTable.inputFormatClass(), sqlCreateTable.outputFormatClass(), updateSchema(sqlCreateTable));
        if (construct == null) {
            throw new RuntimeException("Cannot construct data source for " + sqlCreateTable.tableName());
        }
        if (map.containsKey(sqlCreateTable.tableName())) {
            throw new RuntimeException("Duplicated definition for table " + sqlCreateTable.tableName());
        }
        map.put(sqlCreateTable.tableName(), construct);
    }

    private void handleCreateFunction(SqlCreateFunction sqlCreateFunction) throws ClassNotFoundException {
        if (sqlCreateFunction.jarName() != null) {
            throw new UnsupportedOperationException("UDF 'USING JAR' not implemented");
        }
        this.schema.add(sqlCreateFunction.functionName().toUpperCase(), ScalarFunctionImpl.create(Class.forName(sqlCreateFunction.className()), "evaluate"));
        this.hasUdf = true;
    }

    private void handleCreateTableForTrident(SqlCreateTable sqlCreateTable, Map<String, ISqlTridentDataSource> map) {
        ISqlTridentDataSource constructTridentDataSource = DataSourcesRegistry.constructTridentDataSource(sqlCreateTable.location(), sqlCreateTable.inputFormatClass(), sqlCreateTable.outputFormatClass(), sqlCreateTable.properties(), updateSchema(sqlCreateTable));
        if (constructTridentDataSource == null) {
            throw new RuntimeException("Failed to find data source for " + sqlCreateTable.tableName() + " URI: " + sqlCreateTable.location());
        }
        if (map.containsKey(sqlCreateTable.tableName())) {
            throw new RuntimeException("Duplicated definition for table " + sqlCreateTable.tableName());
        }
        map.put(sqlCreateTable.tableName(), constructTridentDataSource);
    }

    private List<FieldInfo> updateSchema(SqlCreateTable sqlCreateTable) {
        CompilerUtil.TableBuilderInfo tableBuilderInfo = new CompilerUtil.TableBuilderInfo(this.typeFactory);
        ArrayList arrayList = new ArrayList();
        for (ColumnDefinition columnDefinition : sqlCreateTable.fieldList()) {
            tableBuilderInfo.field(columnDefinition.name(), columnDefinition.type(), columnDefinition.constraint());
            Class cls = (Class) this.typeFactory.getJavaClass(columnDefinition.type().deriveType(this.typeFactory));
            ColumnConstraint constraint = columnDefinition.constraint();
            arrayList.add(new FieldInfo(columnDefinition.name(), cls, constraint != null && (constraint instanceof ColumnConstraint.PrimaryKey)));
        }
        this.schema.add(sqlCreateTable.tableName(), tableBuilderInfo.build());
        return arrayList;
    }

    private FrameworkConfig buildFrameWorkConfig() {
        if (!this.hasUdf) {
            return Frameworks.newConfigBuilder().defaultSchema(this.schema).build();
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(SqlStdOperatorTable.instance());
        arrayList.add(new CalciteCatalogReader(CalciteSchema.from(this.schema), false, Collections.emptyList(), this.typeFactory));
        return Frameworks.newConfigBuilder().defaultSchema(this.schema).operatorTable(new ChainedSqlOperatorTable(arrayList)).build();
    }
}
