/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.connectors;

import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.ExternalCatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
import org.apache.flink.table.operations.CollectModifyOperation;
import org.apache.flink.table.operations.ExternalModifyOperation;
import org.apache.flink.table.operations.SinkModifyOperation;
import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.connectors.CollectDynamicSink;
import org.apache.flink.table.planner.connectors.ExternalDynamicSink;
import org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec;
import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
import org.apache.flink.table.planner.plan.abilities.sink.WritingMetadataSpec;
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSink;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeTransformation;
import org.apache.flink.table.types.inference.TypeTransformations;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.types.utils.TypeConversions;

@Internal
public final class DynamicSinkUtils {
    public static RelNode convertCollectToRel(FlinkRelBuilder relBuilder, RelNode input, CollectModifyOperation collectModifyOperation, ReadableConfig configuration, ClassLoader classLoader) {
        DataTypeFactory dataTypeFactory = ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
        ResolvedSchema childSchema = collectModifyOperation.getChild().getResolvedSchema();
        ResolvedSchema schema = ResolvedSchema.physical((List)childSchema.getColumnNames(), (List)childSchema.getColumnDataTypes());
        ResolvedCatalogTable catalogTable = new ResolvedCatalogTable((CatalogTable)new ExternalCatalogTable(Schema.newBuilder().fromResolvedSchema(schema).build()), schema);
        ContextResolvedTable contextResolvedTable = ContextResolvedTable.anonymous((String)"collect", (ResolvedCatalogBaseTable)catalogTable);
        DataType consumedDataType = DynamicSinkUtils.fixCollectDataType(dataTypeFactory, schema);
        String zone = (String)configuration.get(TableConfigOptions.LOCAL_TIME_ZONE);
        ZoneId zoneId = ((String)TableConfigOptions.LOCAL_TIME_ZONE.defaultValue()).equals(zone) ? ZoneId.systemDefault() : ZoneId.of(zone);
        CollectDynamicSink tableSink = new CollectDynamicSink(contextResolvedTable.getIdentifier(), consumedDataType, (MemorySize)configuration.get(CollectSinkOperatorFactory.MAX_BATCH_SIZE), (Duration)configuration.get(CollectSinkOperatorFactory.SOCKET_TIMEOUT), classLoader, zoneId, ((ExecutionConfigOptions.LegacyCastBehaviour)configuration.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR)).isEnabled());
        collectModifyOperation.setSelectResultProvider(tableSink.getSelectResultProvider());
        collectModifyOperation.setConsumedDataType(consumedDataType);
        return DynamicSinkUtils.convertSinkToRel(relBuilder, input, Collections.emptyMap(), contextResolvedTable, Collections.emptyMap(), false, tableSink);
    }

    public static RelNode convertExternalToRel(FlinkRelBuilder relBuilder, RelNode input, ExternalModifyOperation externalModifyOperation) {
        ExternalDynamicSink tableSink = new ExternalDynamicSink(externalModifyOperation.getChangelogMode().orElse(null), externalModifyOperation.getPhysicalDataType());
        return DynamicSinkUtils.convertSinkToRel(relBuilder, input, Collections.emptyMap(), externalModifyOperation.getContextResolvedTable(), Collections.emptyMap(), false, tableSink);
    }

    public static RelNode convertSinkToRel(FlinkRelBuilder relBuilder, RelNode input, SinkModifyOperation sinkModifyOperation, DynamicTableSink sink) {
        return DynamicSinkUtils.convertSinkToRel(relBuilder, input, sinkModifyOperation.getDynamicOptions(), sinkModifyOperation.getContextResolvedTable(), sinkModifyOperation.getStaticPartitions(), sinkModifyOperation.isOverwrite(), sink);
    }

    private static RelNode convertSinkToRel(FlinkRelBuilder relBuilder, RelNode input, Map<String, String> dynamicOptions, ContextResolvedTable contextResolvedTable, Map<String, String> staticPartitions, boolean isOverwrite, DynamicTableSink sink) {
        DataTypeFactory dataTypeFactory = ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
        FlinkTypeFactory typeFactory = ShortcutUtils.unwrapTypeFactory(relBuilder);
        ResolvedSchema schema = contextResolvedTable.getResolvedSchema();
        String tableDebugName = contextResolvedTable.getIdentifier().asSummaryString();
        ArrayList<SinkAbilitySpec> sinkAbilitySpecs = new ArrayList<SinkAbilitySpec>();
        DynamicSinkUtils.prepareDynamicSink(tableDebugName, staticPartitions, isOverwrite, sink, (ResolvedCatalogTable)contextResolvedTable.getResolvedTable(), sinkAbilitySpecs);
        sinkAbilitySpecs.forEach(spec -> spec.apply(sink));
        RelNode query = DynamicSinkUtils.validateSchemaAndApplyImplicitCast(input, schema, tableDebugName, dataTypeFactory, typeFactory);
        relBuilder.push(query);
        List<Integer> metadataColumns = DynamicSinkUtils.extractPersistedMetadataColumns(schema);
        if (!metadataColumns.isEmpty()) {
            DynamicSinkUtils.pushMetadataProjection(relBuilder, typeFactory, schema, sink);
        }
        ArrayList<RelHint> hints = new ArrayList<RelHint>();
        if (!dynamicOptions.isEmpty()) {
            hints.add(RelHint.builder("OPTIONS").hintOptions(dynamicOptions).build());
        }
        RelNode finalQuery = relBuilder.build();
        return LogicalSink.create(finalQuery, hints, contextResolvedTable, sink, staticPartitions, sinkAbilitySpecs.toArray(new SinkAbilitySpec[0]));
    }

    public static RelNode validateSchemaAndApplyImplicitCast(RelNode query, ResolvedSchema sinkSchema, String tableDebugName, DataTypeFactory dataTypeFactory, FlinkTypeFactory typeFactory) {
        RowType queryType = FlinkTypeFactory.toLogicalRowType(query.getRowType());
        List queryFields = queryType.getFields();
        RowType sinkType = (RowType)DynamicSinkUtils.fixSinkDataType(dataTypeFactory, sinkSchema.toSinkRowDataType()).getLogicalType();
        List sinkFields = sinkType.getFields();
        if (queryFields.size() != sinkFields.size()) {
            throw DynamicSinkUtils.createSchemaMismatchException("Different number of columns.", tableDebugName, queryFields, sinkFields);
        }
        boolean requiresCasting = false;
        for (int i = 0; i < sinkFields.size(); ++i) {
            LogicalType sinkColumnType;
            LogicalType queryColumnType = ((RowType.RowField)queryFields.get(i)).getType();
            if (!LogicalTypeCasts.supportsImplicitCast((LogicalType)queryColumnType, (LogicalType)(sinkColumnType = ((RowType.RowField)sinkFields.get(i)).getType()))) {
                throw DynamicSinkUtils.createSchemaMismatchException(String.format("Incompatible types for sink column '%s' at position %s.", ((RowType.RowField)sinkFields.get(i)).getName(), i), tableDebugName, queryFields, sinkFields);
            }
            if (LogicalTypeCasts.supportsAvoidingCast((LogicalType)queryColumnType, (LogicalType)sinkColumnType)) continue;
            requiresCasting = true;
        }
        if (requiresCasting) {
            RelDataType castRelDataType = typeFactory.buildRelNodeRowType(sinkType);
            return RelOptUtil.createCastRel(query, castRelDataType, true);
        }
        return query;
    }

    private static DataType fixCollectDataType(DataTypeFactory dataTypeFactory, ResolvedSchema schema) {
        DataType fixedDataType = DataTypeUtils.transform((DataTypeFactory)dataTypeFactory, (DataType)schema.toSourceRowDataType(), (TypeTransformation[])new TypeTransformation[]{TypeTransformations.legacyRawToTypeInfoRaw(), TypeTransformations.legacyToNonLegacy()});
        return TypeConversions.fromLogicalToDataType((LogicalType)fixedDataType.getLogicalType());
    }

    private static void pushMetadataProjection(FlinkRelBuilder relBuilder, FlinkTypeFactory typeFactory, ResolvedSchema schema, DynamicTableSink sink) {
        RexBuilder rexBuilder = relBuilder.getRexBuilder();
        List columns = schema.getColumns();
        List<Integer> physicalColumns = DynamicSinkUtils.extractPhysicalColumns(schema);
        Map keyToMetadataColumn = DynamicSinkUtils.extractPersistedMetadataColumns(schema).stream().collect(Collectors.toMap(pos -> {
            Column.MetadataColumn metadataColumn = (Column.MetadataColumn)columns.get((int)pos);
            return metadataColumn.getMetadataKey().orElse(metadataColumn.getName());
        }, Function.identity()));
        List metadataColumns = DynamicSinkUtils.createRequiredMetadataColumns(schema, sink).stream().map(col -> col.getMetadataKey().orElse(col.getName())).map(keyToMetadataColumn::get).collect(Collectors.toList());
        List<String> fieldNames = Stream.concat(physicalColumns.stream().map(columns::get).map(Column::getName), metadataColumns.stream().map(columns::get).map(Column.MetadataColumn.class::cast).map(c -> c.getMetadataKey().orElse(c.getName()))).collect(Collectors.toList());
        Map<String, DataType> metadataMap = DynamicSinkUtils.extractMetadataMap(sink);
        List fieldNodes = Stream.concat(physicalColumns.stream().map(pos -> {
            int posAdjusted = DynamicSinkUtils.adjustByVirtualColumns(columns, pos);
            return relBuilder.field(posAdjusted);
        }), metadataColumns.stream().map(pos -> {
            Column.MetadataColumn metadataColumn = (Column.MetadataColumn)columns.get((int)pos);
            String metadataKey = metadataColumn.getMetadataKey().orElse(metadataColumn.getName());
            LogicalType expectedType = ((DataType)metadataMap.get(metadataKey)).getLogicalType();
            RelDataType expectedRelDataType = typeFactory.createFieldTypeFromLogicalType(expectedType);
            int posAdjusted = DynamicSinkUtils.adjustByVirtualColumns(columns, pos);
            return rexBuilder.makeAbstractCast(expectedRelDataType, relBuilder.field(posAdjusted));
        })).collect(Collectors.toList());
        relBuilder.projectNamed(fieldNodes, fieldNames, true);
    }

    private static void prepareDynamicSink(String tableDebugName, Map<String, String> staticPartitions, boolean isOverwrite, DynamicTableSink sink, ResolvedCatalogTable table, List<SinkAbilitySpec> sinkAbilitySpecs) {
        DynamicSinkUtils.validatePartitioning(tableDebugName, staticPartitions, sink, table.getPartitionKeys());
        DynamicSinkUtils.validateAndApplyOverwrite(tableDebugName, isOverwrite, sink, sinkAbilitySpecs);
        DynamicSinkUtils.validateAndApplyMetadata(tableDebugName, sink, table.getResolvedSchema(), sinkAbilitySpecs);
    }

    private static List<Column.MetadataColumn> createRequiredMetadataColumns(ResolvedSchema schema, DynamicTableSink sink) {
        List tableColumns = schema.getColumns();
        List<Integer> metadataColumns = DynamicSinkUtils.extractPersistedMetadataColumns(schema);
        HashMap<String, Column.MetadataColumn> metadataKeysToMetadataColumns = new HashMap<String, Column.MetadataColumn>();
        for (Integer columnIndex : metadataColumns) {
            Column.MetadataColumn metadataColumn = (Column.MetadataColumn)tableColumns.get(columnIndex);
            String metadataKey = metadataColumn.getMetadataKey().orElse(metadataColumn.getName());
            metadataKeysToMetadataColumns.put(metadataKey, metadataColumn);
        }
        Map<String, DataType> metadataMap = DynamicSinkUtils.extractMetadataMap(sink);
        return metadataMap.keySet().stream().filter(metadataKeysToMetadataColumns::containsKey).map(metadataKeysToMetadataColumns::get).collect(Collectors.toList());
    }

    private static ValidationException createSchemaMismatchException(String cause, String tableDebugName, List<RowType.RowField> queryFields, List<RowType.RowField> sinkFields) {
        String querySchema = queryFields.stream().map(f -> f.getName() + ": " + f.getType().asSummaryString()).collect(Collectors.joining(", ", "[", "]"));
        String sinkSchema = sinkFields.stream().map(sinkField -> sinkField.getName() + ": " + sinkField.getType().asSummaryString()).collect(Collectors.joining(", ", "[", "]"));
        return new ValidationException(String.format("Column types of query result and sink for '%s' do not match.\nCause: %s\n\nQuery schema: %s\nSink schema:  %s", tableDebugName, cause, querySchema, sinkSchema));
    }

    private static DataType fixSinkDataType(DataTypeFactory dataTypeFactory, DataType sinkDataType) {
        return DataTypeUtils.transform((DataTypeFactory)dataTypeFactory, (DataType)sinkDataType, (TypeTransformation[])new TypeTransformation[]{TypeTransformations.legacyRawToTypeInfoRaw(), TypeTransformations.legacyToNonLegacy(), TypeTransformations.toNullable()});
    }

    private static void validatePartitioning(String tableDebugName, Map<String, String> staticPartitions, DynamicTableSink sink, List<String> partitionKeys) {
        if (!partitionKeys.isEmpty() && !(sink instanceof SupportsPartitioning)) {
            throw new TableException(String.format("Table '%s' is a partitioned table, but the underlying %s doesn't implement the %s interface.", tableDebugName, DynamicTableSink.class.getSimpleName(), SupportsPartitioning.class.getSimpleName()));
        }
        staticPartitions.keySet().forEach(p -> {
            if (!partitionKeys.contains(p)) {
                throw new ValidationException(String.format("Static partition column '%s' should be in the partition keys list %s for table '%s'.", p, partitionKeys, tableDebugName));
            }
        });
    }

    private static void validateAndApplyOverwrite(String tableDebugName, boolean isOverwrite, DynamicTableSink sink, List<SinkAbilitySpec> sinkAbilitySpecs) {
        if (!isOverwrite) {
            return;
        }
        if (!(sink instanceof SupportsOverwrite)) {
            throw new ValidationException(String.format("INSERT OVERWRITE requires that the underlying %s of table '%s' implements the %s interface.", DynamicTableSink.class.getSimpleName(), tableDebugName, SupportsOverwrite.class.getSimpleName()));
        }
        sinkAbilitySpecs.add(new OverwriteSpec(true));
    }

    private static List<Integer> extractPhysicalColumns(ResolvedSchema schema) {
        List columns = schema.getColumns();
        return IntStream.range(0, schema.getColumnCount()).filter(pos -> ((Column)columns.get(pos)).isPhysical()).boxed().collect(Collectors.toList());
    }

    private static List<Integer> extractPersistedMetadataColumns(ResolvedSchema schema) {
        List columns = schema.getColumns();
        return IntStream.range(0, schema.getColumnCount()).filter(pos -> {
            Column column = (Column)columns.get(pos);
            return column instanceof Column.MetadataColumn && column.isPersisted();
        }).boxed().collect(Collectors.toList());
    }

    private static int adjustByVirtualColumns(List<Column> columns, int pos) {
        return pos - (int)IntStream.range(0, pos).filter(i -> !((Column)columns.get(i)).isPersisted()).count();
    }

    private static Map<String, DataType> extractMetadataMap(DynamicTableSink sink) {
        if (sink instanceof SupportsWritingMetadata) {
            return ((SupportsWritingMetadata)sink).listWritableMetadata();
        }
        return Collections.emptyMap();
    }

    private static void validateAndApplyMetadata(String tableDebugName, DynamicTableSink sink, ResolvedSchema schema, List<SinkAbilitySpec> sinkAbilitySpecs) {
        List columns = schema.getColumns();
        List<Integer> metadataColumns = DynamicSinkUtils.extractPersistedMetadataColumns(schema);
        if (metadataColumns.isEmpty()) {
            return;
        }
        if (!(sink instanceof SupportsWritingMetadata)) {
            throw new ValidationException(String.format("Table '%s' declares persistable metadata columns, but the underlying %s doesn't implement the %s interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword.", tableDebugName, DynamicTableSink.class.getSimpleName(), SupportsWritingMetadata.class.getSimpleName()));
        }
        Map metadataMap = ((SupportsWritingMetadata)sink).listWritableMetadata();
        metadataColumns.forEach(pos -> {
            Column.MetadataColumn metadataColumn = (Column.MetadataColumn)columns.get((int)pos);
            String metadataKey = metadataColumn.getMetadataKey().orElse(metadataColumn.getName());
            LogicalType metadataType = metadataColumn.getDataType().getLogicalType();
            DataType expectedMetadataDataType = (DataType)metadataMap.get(metadataKey);
            if (expectedMetadataDataType == null) {
                throw new ValidationException(String.format("Invalid metadata key '%s' in column '%s' of table '%s'. The %s class '%s' supports the following metadata keys for writing:\n%s", metadataKey, metadataColumn.getName(), tableDebugName, DynamicTableSink.class.getSimpleName(), sink.getClass().getName(), String.join((CharSequence)"\n", metadataMap.keySet())));
            }
            if (!LogicalTypeCasts.supportsExplicitCast((LogicalType)metadataType, (LogicalType)expectedMetadataDataType.getLogicalType())) {
                if (metadataKey.equals(metadataColumn.getName())) {
                    throw new ValidationException(String.format("Invalid data type for metadata column '%s' of table '%s'. The column cannot be declared as '%s' because the type must be castable to metadata type '%s'.", metadataColumn.getName(), tableDebugName, metadataType, expectedMetadataDataType.getLogicalType()));
                }
                throw new ValidationException(String.format("Invalid data type for metadata column '%s' with metadata key '%s' of table '%s'. The column cannot be declared as '%s' because the type must be castable to metadata type '%s'.", metadataColumn.getName(), metadataKey, tableDebugName, metadataType, expectedMetadataDataType.getLogicalType()));
            }
        });
        sinkAbilitySpecs.add(new WritingMetadataSpec(DynamicSinkUtils.createRequiredMetadataColumns(schema, sink).stream().map(col -> col.getMetadataKey().orElse(col.getName())).collect(Collectors.toList()), (LogicalType)DynamicSinkUtils.createConsumedType(schema, sink)));
    }

    private static RowType createConsumedType(ResolvedSchema schema, DynamicTableSink sink) {
        Map<String, DataType> metadataMap = DynamicSinkUtils.extractMetadataMap(sink);
        Stream<RowType.RowField> physicalFields = schema.getColumns().stream().filter(Column::isPhysical).map(c -> new RowType.RowField(c.getName(), c.getDataType().getLogicalType()));
        Stream<RowType.RowField> metadataFields = DynamicSinkUtils.createRequiredMetadataColumns(schema, sink).stream().map(column -> new RowType.RowField(column.getName(), ((DataType)metadataMap.get(column.getMetadataKey().orElse(column.getName()))).getLogicalType()));
        List rowFields = Stream.concat(physicalFields, metadataFields).collect(Collectors.toList());
        return new RowType(false, rowFields);
    }

    private DynamicSinkUtils() {
    }
}

