package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;

import com.alibaba.fastjson.JSONObject;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
import org.apache.beam.sdk.schemas.Schema;

@Experimental
@Internal
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.class */
public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
    @Override // org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider
    public String getTableType() {
        return "pubsub";
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider
    public BeamSqlTable buildBeamSqlTable(Table table) {
        validatePubsubMessageSchema(table);
        JSONObject properties = table.getProperties();
        String string = properties.getString("timestampAttributeKey");
        String string2 = properties.getString("deadLetterQueue");
        validateDlq(string2);
        return PubsubIOJsonTable.builder().setSchema(table.getSchema()).setTimestampAttribute(string).setDeadLetterQueue(string2).setTopic(table.getLocation()).build();
    }

    private void validatePubsubMessageSchema(Table table) {
        Schema schema = table.getSchema();
        if (schema.getFieldCount() != 3 || !fieldPresent(schema, "event_timestamp", CalciteUtils.TIMESTAMP) || !fieldPresent(schema, "attributes", Schema.FieldType.map(CalciteUtils.VARCHAR.withNullable(false), CalciteUtils.VARCHAR)) || !schema.hasField("payload") || !Schema.TypeName.ROW.equals(schema.getField("payload").getType().getTypeName())) {
            throw new IllegalArgumentException("Unsupported schema specified for Pubsub source in CREATE TABLE. CREATE TABLE for Pubsub topic should define exactly the following fields: 'event_timestamp' field of type 'TIMESTAMP', 'attributes' field of type MAP<VARCHAR, VARCHAR>, and 'payload' field of type 'ROW<...>' which matches the payload JSON format.");
        }
    }

    private boolean fieldPresent(Schema schema, String str, Schema.FieldType fieldType) {
        return schema.hasField(str) && fieldType.equivalent(schema.getField(str).getType(), Schema.EquivalenceNullablePolicy.IGNORE);
    }

    private void validateDlq(String str) {
        if (str != null && str.isEmpty()) {
            throw new IllegalArgumentException("Dead letter queue topic name is not specified");
        }
    }
}
