package io.debezium.transforms.outbox;

import io.debezium.annotation.Incubating;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.data.Envelope;
import io.debezium.transforms.outbox.EventRouterConfigDefinition;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.RegexRouter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Incubating
/* loaded from: input_file:io/debezium/transforms/outbox/EventRouter.class */
public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventRouter.class);
    private static final String ENVELOPE_EVENT_TYPE = "eventType";
    private static final String ENVELOPE_PAYLOAD = "payload";
    private static final String RECORD_ENVELOPE_SCHEMA_NAME_SUFFIX = ".Envelope";
    private EventRouterConfigDefinition.InvalidOperationBehavior invalidOperationBehavior;
    private String fieldEventId;
    private String fieldEventKey;
    private String fieldEventType;
    private String fieldEventTimestamp;
    private String fieldPayload;
    private String fieldPayloadId;
    private String fieldSchemaVersion;
    private String routeByField;
    private boolean routeTombstoneOnEmptyPayload;
    private List<EventRouterConfigDefinition.AdditionalField> additionalFields;
    private Schema defaultValueSchema;
    private final ExtractField<R> afterExtractor = new ExtractField.Value();
    private final RegexRouter<R> regexRouter = new RegexRouter<>();
    private final Map<Integer, Schema> versionedValueSchema = new HashMap();

    public R apply(R r) {
        Struct struct;
        Schema schema;
        if (r.value() == null) {
            LOGGER.debug("Tombstone message ignored. Message key: \"{}\"", r.key());
            return null;
        }
        if (r.valueSchema() == null || r.valueSchema().name() == null || !r.valueSchema().name().endsWith(RECORD_ENVELOPE_SCHEMA_NAME_SUFFIX)) {
            LOGGER.debug("Message without Debezium CDC Envelope ignored. Message key: \"{}\"", r.key());
            return null;
        }
        Struct requireStruct = Requirements.requireStruct(r.value(), "Detect Debezium Operation");
        String string = requireStruct.getString(Envelope.FieldName.OPERATION);
        if (string.equals(Envelope.Operation.DELETE.code())) {
            LOGGER.info("Delete message {} ignored", r.key());
            return null;
        }
        if (string.equals(Envelope.Operation.UPDATE.code())) {
            handleUnexpectedOperation(r);
            return null;
        }
        ConnectRecord apply = this.afterExtractor.apply(r);
        Struct requireStruct2 = Requirements.requireStruct(apply.value(), "Read Outbox Event");
        Schema valueSchema = apply.valueSchema();
        Long int64 = this.fieldEventTimestamp == null ? requireStruct.getInt64("ts_ms") : requireStruct2.getInt64(this.fieldEventTimestamp);
        Object obj = requireStruct2.get(this.fieldEventId);
        Object obj2 = requireStruct2.get(this.fieldEventType);
        Object obj3 = requireStruct2.get(this.fieldPayload);
        Object obj4 = requireStruct2.get(this.fieldPayloadId);
        Headers headers = r.headers();
        headers.add("id", obj, valueSchema.field(this.fieldEventId).schema());
        Schema valueSchema2 = this.fieldSchemaVersion == null ? getValueSchema(valueSchema) : getValueSchema(valueSchema, requireStruct2.getInt32(this.fieldSchemaVersion));
        Struct put = new Struct(valueSchema2).put(ENVELOPE_EVENT_TYPE, obj2).put(ENVELOPE_PAYLOAD, obj3);
        this.additionalFields.forEach(additionalField -> {
            switch (additionalField.getPlacement()) {
                case ENVELOPE:
                    put.put(additionalField.getAlias(), requireStruct2.get(additionalField.getField()));
                    return;
                case HEADER:
                    headers.add(additionalField.getAlias(), requireStruct2.get(additionalField.getField()), valueSchema.field(additionalField.getField()).schema());
                    return;
                default:
                    return;
            }
        });
        if ((obj3 == null || obj3.toString().trim().isEmpty()) && this.routeTombstoneOnEmptyPayload) {
            struct = null;
            schema = null;
        } else {
            struct = put;
            schema = valueSchema2;
        }
        return (R) this.regexRouter.apply(r.newRecord(requireStruct2.getString(this.routeByField).toLowerCase(), (Integer) null, Schema.STRING_SCHEMA, defineRecordKey(requireStruct2, obj4), schema, struct, int64, headers));
    }

    private Object defineRecordKey(Struct struct, Object obj) {
        Object obj2 = null;
        if (this.fieldEventKey != null) {
            obj2 = struct.get(this.fieldEventKey);
        }
        return obj2 != null ? obj2 : obj;
    }

    private void handleUnexpectedOperation(R r) {
        switch (this.invalidOperationBehavior) {
            case SKIP_AND_WARN:
                LOGGER.warn("Unexpected update message received {} and ignored", r.key());
                return;
            case SKIP_AND_ERROR:
                LOGGER.error("Unexpected update message received {} and ignored", r.key());
                return;
            case FATAL:
                throw new IllegalStateException(String.format("Unexpected update message received %s, fail.", r.key()));
            default:
                return;
        }
    }

    public ConfigDef config() {
        return EventRouterConfigDefinition.configDef();
    }

    public void close() {
    }

    public void configure(Map<String, ?> map) {
        Configuration from = Configuration.from(map);
        Field.Set of = Field.setOf(EventRouterConfigDefinition.CONFIG_FIELDS);
        Logger logger = LOGGER;
        logger.getClass();
        if (!from.validateAndRecord(of, logger::error)) {
            throw new ConnectException("Unable to validate config.");
        }
        this.invalidOperationBehavior = EventRouterConfigDefinition.InvalidOperationBehavior.parse(from.getString(EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR));
        this.fieldEventId = from.getString(EventRouterConfigDefinition.FIELD_EVENT_ID);
        this.fieldEventKey = from.getString(EventRouterConfigDefinition.FIELD_EVENT_KEY);
        this.fieldEventType = from.getString(EventRouterConfigDefinition.FIELD_EVENT_TYPE);
        this.fieldEventTimestamp = from.getString(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP);
        this.fieldPayload = from.getString(EventRouterConfigDefinition.FIELD_PAYLOAD);
        this.fieldPayloadId = from.getString(EventRouterConfigDefinition.FIELD_PAYLOAD_ID);
        this.fieldSchemaVersion = from.getString(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION);
        this.routeByField = from.getString(EventRouterConfigDefinition.ROUTE_BY_FIELD);
        this.routeTombstoneOnEmptyPayload = from.getBoolean(EventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD);
        HashMap hashMap = new HashMap();
        hashMap.put("regex", from.getString(EventRouterConfigDefinition.ROUTE_TOPIC_REGEX));
        hashMap.put("replacement", from.getString(EventRouterConfigDefinition.ROUTE_TOPIC_REPLACEMENT));
        this.regexRouter.configure(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("field", Envelope.FieldName.AFTER);
        this.afterExtractor.configure(hashMap2);
        this.additionalFields = EventRouterConfigDefinition.parseAdditionalFieldsConfig(from);
    }

    private Schema getValueSchema(Schema schema) {
        if (this.defaultValueSchema == null) {
            this.defaultValueSchema = getSchemaBuilder(schema).build();
        }
        return this.defaultValueSchema;
    }

    private Schema getValueSchema(Schema schema, Integer num) {
        if (!this.versionedValueSchema.containsKey(num)) {
            this.versionedValueSchema.put(num, getSchemaBuilder(schema).version(num).build());
        }
        return this.versionedValueSchema.get(num);
    }

    private SchemaBuilder getSchemaBuilder(Schema schema) {
        SchemaBuilder struct = SchemaBuilder.struct();
        struct.field(ENVELOPE_EVENT_TYPE, schema.field(this.fieldEventType).schema()).field(ENVELOPE_PAYLOAD, schema.field(this.fieldPayload).schema());
        this.additionalFields.forEach(additionalField -> {
            if (additionalField.getPlacement() == EventRouterConfigDefinition.AdditionalFieldPlacement.ENVELOPE) {
                struct.field(additionalField.getAlias(), schema.field(additionalField.getField()).schema());
            }
        });
        return struct;
    }
}
