package io.debezium.connector.postgresql;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.ConfigurationDefaults;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.function.BlockingConsumer;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.Array;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.postgresql.util.PGmoney;

@ThreadSafe
/* loaded from: input_file:io/debezium/connector/postgresql/RecordsSnapshotProducer.class */
public class RecordsSnapshotProducer extends RecordsProducer {
    private static final String CONTEXT_NAME = "records-snapshot-producer";
    private final ExecutorService executorService;
    private final Optional<RecordsStreamProducer> streamProducer;
    private final AtomicReference<SourceRecord> currentRecord;
    private final Snapshotter snapshotter;
    static final /* synthetic */ boolean $assertionsDisabled;

    public RecordsSnapshotProducer(PostgresTaskContext postgresTaskContext, SourceInfo sourceInfo, Snapshotter snapshotter) {
        super(postgresTaskContext, sourceInfo);
        this.executorService = Threads.newSingleThreadExecutor(PostgresConnector.class, postgresTaskContext.config().getLogicalName(), CONTEXT_NAME);
        this.currentRecord = new AtomicReference<>();
        this.snapshotter = snapshotter;
        if (snapshotter.shouldStream()) {
            this.streamProducer = Optional.of(new RecordsStreamProducer(postgresTaskContext, sourceInfo));
        } else {
            this.streamProducer = Optional.empty();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.postgresql.RecordsProducer
    public void start(BlockingConsumer<ChangeEvent> blockingConsumer, Consumer<Throwable> consumer) {
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            CompletableFuture.runAsync(this::delaySnapshotIfNeeded, this.executorService).thenRunAsync(() -> {
                takeSnapshot(blockingConsumer);
            }, (Executor) this.executorService).thenRunAsync(() -> {
                startStreaming(blockingConsumer, consumer);
            }, (Executor) this.executorService).exceptionally(th -> {
                this.logger.error("unexpected exception", th.getCause() != null ? th.getCause() : th);
                stop();
                consumer.accept(th);
                return null;
            });
            configureLoggingContext.restore();
        } catch (Throwable th2) {
            configureLoggingContext.restore();
            throw th2;
        }
    }

    private void delaySnapshotIfNeeded() {
        Duration snapshotDelay = this.taskContext.getConfig().getSnapshotDelay();
        if (snapshotDelay.isZero() || snapshotDelay.isNegative()) {
            return;
        }
        Threads.Timer timer = Threads.timer(Clock.SYSTEM, snapshotDelay);
        Metronome parker = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);
        while (!timer.expired()) {
            try {
                this.logger.info("The connector will wait for {}s before proceeding", Long.valueOf(timer.remaining().getSeconds()));
                parker.pause();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.logger.debug("Interrupted while awaiting initial snapshot delay");
                return;
            }
        }
    }

    private void startStreaming(BlockingConsumer<ChangeEvent> blockingConsumer, Consumer<Throwable> consumer) {
        try {
            this.streamProducer.ifPresent(recordsStreamProducer -> {
                if (this.sourceInfo.lsn() != null) {
                    this.logger.info("Snapshot finished, continuing streaming changes from {}", ReplicationConnection.format(this.sourceInfo.lsn().longValue()));
                }
                recordsStreamProducer.start(blockingConsumer, consumer);
            });
        } finally {
            cleanup();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.postgresql.RecordsProducer
    public void commit(long j) {
        this.streamProducer.ifPresent(recordsStreamProducer -> {
            recordsStreamProducer.commit(j);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.postgresql.RecordsProducer
    public void stop() {
        try {
            this.streamProducer.ifPresent((v0) -> {
                v0.stop();
            });
        } finally {
            cleanup();
        }
    }

    private void cleanup() {
        this.currentRecord.set(null);
        this.executorService.shutdownNow();
    }

    /* JADX WARN: Failed to calculate best type for var: r17v2 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r17v2 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r18v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r18v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 17, insn: 0x0347: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r17 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:45:0x0347 */
    /* JADX WARN: Not initialized variable reg: 18, insn: 0x034c: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r18 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:47:0x034c */
    /* JADX WARN: Type inference failed for: r17v2, types: [io.debezium.connector.postgresql.connection.PostgresConnection] */
    /* JADX WARN: Type inference failed for: r18v0, types: [java.lang.Throwable] */
    private void takeSnapshot(BlockingConsumer<ChangeEvent> blockingConsumer) {
        ?? r17;
        ?? r18;
        if (this.executorService.isShutdown()) {
            this.logger.info("Not taking snapshot as this task has been cancelled already");
            return;
        }
        long currentTimeInMillis = clock().currentTimeInMillis();
        try {
            try {
                PostgresConnection createConnection = this.taskContext.createConnection();
                Throwable th = null;
                Connection connection = createConnection.connection();
                String lineSeparator = System.lineSeparator();
                this.logger.info("Step 0: disabling autocommit");
                createConnection.setAutoCommit(false);
                long snapshotLockTimeoutMillis = this.taskContext.config().snapshotLockTimeoutMillis();
                this.logger.info("Step 1: starting transaction and refreshing the DB schemas for database '{}' and user '{}'", createConnection.database(), createConnection.username());
                StringBuilder sb = new StringBuilder("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;");
                createConnection.executeWithoutCommitting(new String[]{sb.toString()});
                sb.delete(0, sb.length());
                PostgresSchema schema = schema();
                schema.refresh(createConnection, false);
                this.logger.info("Step 2: locking each of the database tables, waiting a maximum of '{}' seconds for each lock", Double.valueOf(snapshotLockTimeoutMillis / 1000.0d));
                sb.append("SET lock_timeout = ").append(snapshotLockTimeoutMillis).append(";").append(lineSeparator);
                schema.tableIds().forEach(tableId -> {
                    sb.append("LOCK TABLE ").append(tableId.toDoubleQuotedString()).append(" IN SHARE UPDATE EXCLUSIVE MODE;").append(lineSeparator);
                });
                createConnection.executeWithoutCommitting(new String[]{sb.toString()});
                schema.refresh(createConnection, false);
                long currentXLogLocation = createConnection.currentXLogLocation();
                long longValue = createConnection.currentTransactionId().longValue();
                this.logger.info("\t read xlogStart at '{}' from transaction '{}'", ReplicationConnection.format(currentXLogLocation), Long.valueOf(longValue));
                this.sourceInfo.startSnapshot();
                this.sourceInfo.update(Long.valueOf(currentXLogLocation), clock().currentTime(), Long.valueOf(longValue), null, this.sourceInfo.xmin());
                this.logger.info("Step 3: reading and exporting the contents of each table");
                AtomicInteger atomicInteger = new AtomicInteger(0);
                for (TableId tableId2 : schema.tableIds()) {
                    long currentTimeInMillis2 = clock().currentTimeInMillis();
                    this.logger.info("\t exporting data from table '{}'", tableId2);
                    try {
                        Optional<String> buildSnapshotQuery = this.snapshotter.buildSnapshotQuery(tableId2);
                        if (buildSnapshotQuery.isPresent()) {
                            this.logger.info("For table '{}' using select statement: '{}'", tableId2, buildSnapshotQuery);
                            createConnection.queryWithBlockingConsumer(buildSnapshotQuery.get(), this::readTableStatement, resultSet -> {
                                readTable(tableId2, resultSet, blockingConsumer, atomicInteger);
                            });
                            this.logger.info("\t finished exporting '{}' records for '{}'; total duration '{}'", new Object[]{Integer.valueOf(atomicInteger.get()), tableId2, Strings.duration(clock().currentTimeInMillis() - currentTimeInMillis2)});
                            atomicInteger.set(0);
                        } else {
                            this.logger.warn("For table '{}' the select statement was not provided, skipping table", tableId2);
                        }
                    } catch (SQLException e) {
                        throw new ConnectException(e);
                    }
                }
                this.logger.info("Step 4: committing transaction '{}'", Long.valueOf(longValue));
                connection.commit();
                SourceRecord sourceRecord = this.currentRecord.get();
                if (sourceRecord != null) {
                    this.logger.info("Step 5: sending the last snapshot record");
                    this.sourceInfo.markLastSnapshotRecord();
                    changeSourceToLastSnapshotRecord(sourceRecord);
                    this.currentRecord.set(new SourceRecord(sourceRecord.sourcePartition(), this.sourceInfo.offset(), sourceRecord.topic(), sourceRecord.kafkaPartition(), sourceRecord.keySchema(), sourceRecord.key(), sourceRecord.valueSchema(), sourceRecord.value()));
                    sendCurrentRecord(blockingConsumer);
                }
                this.sourceInfo.completeSnapshot();
                this.logger.info("Snapshot completed in '{}'", Strings.duration(clock().currentTimeInMillis() - currentTimeInMillis));
                Heartbeat.create(this.taskContext.config().getConfig(), this.taskContext.topicSelector().getHeartbeatTopic(), this.taskContext.config().getLogicalName()).forcedBeat(this.sourceInfo.partition(), this.sourceInfo.offset(), sourceRecord2 -> {
                    blockingConsumer.accept(new ChangeEvent(sourceRecord2, this.sourceInfo.lsn()));
                });
                if (createConnection != null) {
                    if (0 != 0) {
                        try {
                            createConnection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConnection.close();
                    }
                }
            } catch (Throwable th3) {
                if (r17 != 0) {
                    if (r18 != 0) {
                        try {
                            r17.close();
                        } catch (Throwable th4) {
                            r18.addSuppressed(th4);
                        }
                    } else {
                        r17.close();
                    }
                }
                throw th3;
            }
        } catch (InterruptedException e2) {
            Thread.interrupted();
            rollbackTransaction(null);
            this.logger.warn("Snapshot aborted after '{}'", Strings.duration(clock().currentTimeInMillis() - currentTimeInMillis));
        } catch (SQLException e3) {
            rollbackTransaction(null);
            throw new ConnectException(e3);
        }
    }

    private void changeSourceToLastSnapshotRecord(SourceRecord sourceRecord) {
        Struct struct = (Struct) ((Struct) sourceRecord.value()).get("source");
        if (struct.getBoolean(SourceInfo.LAST_SNAPSHOT_RECORD_KEY) != null) {
            struct.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, true);
        }
    }

    private void rollbackTransaction(Connection connection) {
        if (connection != null) {
            try {
                connection.rollback();
            } catch (SQLException e) {
                this.logger.error("Cannot rollback snapshot transaction", e);
            }
        }
    }

    private Statement readTableStatement(Connection connection) throws SQLException {
        int rowsFetchSize = this.taskContext.config().rowsFetchSize();
        Statement createStatement = connection.createStatement();
        createStatement.setFetchSize(rowsFetchSize);
        return createStatement;
    }

    private void readTable(TableId tableId, ResultSet resultSet, BlockingConsumer<ChangeEvent> blockingConsumer, AtomicInteger atomicInteger) throws SQLException, InterruptedException {
        Table tableFor = schema().tableFor(tableId);
        if (!$assertionsDisabled && tableFor == null) {
            throw new AssertionError();
        }
        int size = tableFor.columns().size();
        Object[] objArr = new Object[size];
        ResultSetMetaData metaData = resultSet.getMetaData();
        while (resultSet.next()) {
            atomicInteger.incrementAndGet();
            sendCurrentRecord(blockingConsumer);
            int i = 0;
            int i2 = 1;
            while (i != size) {
                objArr[i] = valueForColumn(resultSet, i2, metaData);
                i++;
                i2++;
            }
            generateReadRecord(tableId, objArr);
        }
    }

    private Object valueForColumn(ResultSet resultSet, int i, ResultSetMetaData resultSetMetaData) throws SQLException {
        try {
            String columnTypeName = resultSetMetaData.getColumnTypeName(i);
            PostgresType postgresType = this.taskContext.schema().getTypeRegistry().get(columnTypeName);
            this.logger.trace("Type of incoming data is: " + String.valueOf(postgresType.getOid()));
            this.logger.trace("ColumnTypeName is: " + columnTypeName);
            this.logger.trace("Type toString: " + postgresType.toString());
            if (postgresType.isArrayType()) {
                Array array = resultSet.getArray(i);
                if (array == null) {
                    return null;
                }
                return Arrays.asList((Object[]) array.getArray());
            }
            switch (postgresType.getOid()) {
                case 790:
                    return Double.valueOf(new PGmoney(resultSet.getString(i)).val);
                case 1560:
                    return resultSet.getString(i);
                case 1700:
                    String string = resultSet.getString(i);
                    if (string == null) {
                        return string;
                    }
                    Optional<SpecialValueDecimal> specialValue = PostgresValueConverter.toSpecialValue(string);
                    return specialValue.isPresent() ? specialValue.get() : new SpecialValueDecimal(resultSet.getBigDecimal(i));
                default:
                    Object object = resultSet.getObject(i);
                    if (object != null) {
                        this.logger.trace("rs getobject returns class: {}; rs getObject toString is: {}", object.getClass(), object.toString());
                    }
                    return object;
            }
        } catch (SQLException e) {
            return resultSet.getObject(i);
        }
    }

    protected void generateReadRecord(TableId tableId, Object[] objArr) {
        this.currentRecord.set(null);
        if (objArr.length == 0) {
            return;
        }
        this.logger.trace("tableId value is: {}", tableId.toString());
        TableSchema schemaFor = schema().schemaFor(tableId);
        if (!$assertionsDisabled && schemaFor == null) {
            throw new AssertionError();
        }
        Object keyFromColumnData = schemaFor.keyFromColumnData(objArr);
        Struct valueFromColumnData = schemaFor.valueFromColumnData(objArr);
        if (keyFromColumnData == null || valueFromColumnData == null) {
            this.logger.trace("key: {}; value: {}; One is null", String.valueOf(keyFromColumnData), String.valueOf(valueFromColumnData));
            return;
        }
        Schema keySchema = schemaFor.keySchema();
        this.sourceInfo.update(Long.valueOf(clock().currentTimeInMicros()), tableId);
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String str = topicSelector().topicNameFor(tableId);
        Envelope envelopeSchema = schemaFor.getEnvelopeSchema();
        this.currentRecord.set(new SourceRecord(partition, offset, str, (Integer) null, keySchema, keyFromColumnData, envelopeSchema.schema(), envelopeSchema.read(valueFromColumnData, this.sourceInfo.source(), Long.valueOf(clock().currentTimeInMillis()))));
    }

    private void sendCurrentRecord(BlockingConsumer<ChangeEvent> blockingConsumer) throws InterruptedException {
        SourceRecord sourceRecord = this.currentRecord.get();
        if (sourceRecord == null) {
            return;
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending read event '{}'", sourceRecord);
        }
        blockingConsumer.accept(new ChangeEvent(sourceRecord, this.sourceInfo.lsn()));
    }

    static {
        $assertionsDisabled = !RecordsSnapshotProducer.class.desiredAssertionStatus();
    }
}
