package io.debezium.pipeline;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.data.Envelope;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.TopicSelector;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/pipeline/EventDispatcher.class */
public class EventDispatcher<T extends DataCollectionId> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
    private final TopicSelector<T> topicSelector;
    private final DatabaseSchema<T> schema;
    private final HistorizedDatabaseSchema<T> historizedSchema;
    private final ChangeEventQueue<Object> queue;
    private final DataCollectionFilters.DataCollectionFilter<T> filter;

    /* loaded from: input_file:io/debezium/pipeline/EventDispatcher$ChangeRecordReceiver.class */
    private final class ChangeRecordReceiver implements ChangeRecordEmitter.Receiver {
        private final T dataCollectionId;
        private final ChangeEventCreator changeEventCreator;
        private final DataCollectionSchema dataCollectionSchema;

        private ChangeRecordReceiver(T t, ChangeEventCreator changeEventCreator, DataCollectionSchema dataCollectionSchema) {
            this.dataCollectionId = t;
            this.changeEventCreator = changeEventCreator;
            this.dataCollectionSchema = dataCollectionSchema;
        }

        @Override // io.debezium.pipeline.spi.ChangeRecordEmitter.Receiver
        public void changeRecord(Envelope.Operation operation, Object obj, Struct struct, OffsetContext offsetContext) throws InterruptedException {
            Objects.requireNonNull(obj, "key must not be null");
            Objects.requireNonNull(struct, "key must not be null");
            EventDispatcher.LOGGER.trace("Received change record for {} operation on key {}", operation, obj);
            Schema keySchema = this.dataCollectionSchema.keySchema();
            SourceRecord sourceRecord = new SourceRecord(offsetContext.getPartition(), offsetContext.getOffset(), EventDispatcher.this.topicSelector.topicNameFor(this.dataCollectionId), null, keySchema, obj, this.dataCollectionSchema.getEnvelopeSchema().schema(), struct);
            EventDispatcher.this.queue.enqueue(this.changeEventCreator.createDataChangeEvent(sourceRecord));
            if (1 == 0 || operation != Envelope.Operation.DELETE) {
                return;
            }
            EventDispatcher.this.queue.enqueue(this.changeEventCreator.createDataChangeEvent(sourceRecord.newRecord(sourceRecord.topic(), sourceRecord.kafkaPartition(), sourceRecord.keySchema(), sourceRecord.key(), (Schema) null, (Object) null, sourceRecord.timestamp())));
        }
    }

    /* loaded from: input_file:io/debezium/pipeline/EventDispatcher$SchemaChangeEventReceiver.class */
    private final class SchemaChangeEventReceiver implements SchemaChangeEventEmitter.Receiver {
        private SchemaChangeEventReceiver() {
        }

        @Override // io.debezium.pipeline.spi.SchemaChangeEventEmitter.Receiver
        public void schemaChangeEvent(SchemaChangeEvent schemaChangeEvent) throws InterruptedException {
            EventDispatcher.this.historizedSchema.applySchemaChange(schemaChangeEvent);
        }
    }

    public EventDispatcher(TopicSelector<T> topicSelector, DatabaseSchema<T> databaseSchema, ChangeEventQueue<Object> changeEventQueue, DataCollectionFilters.DataCollectionFilter<T> dataCollectionFilter) {
        this.topicSelector = topicSelector;
        this.schema = databaseSchema;
        this.historizedSchema = databaseSchema instanceof HistorizedDatabaseSchema ? (HistorizedDatabaseSchema) databaseSchema : null;
        this.queue = changeEventQueue;
        this.filter = dataCollectionFilter;
    }

    public void dispatchDataChangeEvent(T t, Supplier<ChangeRecordEmitter> supplier, ChangeEventCreator changeEventCreator) throws InterruptedException {
        if (!this.filter.isIncluded(t)) {
            LOGGER.trace("Skipping data change event for {}", t);
            return;
        }
        DataCollectionSchema schemaFor = this.schema.schemaFor(t);
        if (schemaFor == null) {
            throw new IllegalArgumentException("No metadata registered for captured table " + t);
        }
        supplier.get().emitChangeRecords(schemaFor, new ChangeRecordReceiver(t, changeEventCreator, schemaFor));
    }

    public void dispatchSchemaChangeEvent(T t, Supplier<SchemaChangeEventEmitter> supplier) throws InterruptedException {
        if (this.filter.isIncluded(t)) {
            supplier.get().emitSchemaChangeEvent(new SchemaChangeEventReceiver());
        } else {
            LOGGER.trace("Skipping data change event for {}", t);
        }
    }
}
