package io.debezium.relational;

import io.debezium.config.ConfigurationDefaults;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/relational/HistorizedRelationalSnapshotChangeEventSource.class */
public abstract class HistorizedRelationalSnapshotChangeEventSource implements SnapshotChangeEventSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(HistorizedRelationalSnapshotChangeEventSource.class);
    private static final Duration LOG_INTERVAL = Duration.ofMillis(10000);
    private final RelationalDatabaseConnectorConfig connectorConfig;
    private final OffsetContext previousOffset;
    private final JdbcConnection jdbcConnection;
    private final HistorizedRelationalDatabaseSchema schema;
    private final EventDispatcher<TableId> dispatcher;
    private final Clock clock;
    private final SnapshotProgressListener snapshotProgressListener;

    /* loaded from: input_file:io/debezium/relational/HistorizedRelationalSnapshotChangeEventSource$SnapshotContext.class */
    public static class SnapshotContext implements AutoCloseable {
        public final String catalogName;
        public final Tables tables = new Tables();
        public Set<TableId> capturedTables;
        public OffsetContext offset;

        public SnapshotContext(String str) throws SQLException {
            this.catalogName = str;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
        }
    }

    /* loaded from: input_file:io/debezium/relational/HistorizedRelationalSnapshotChangeEventSource$SnapshottingTask.class */
    public static class SnapshottingTask {
        private final boolean snapshotSchema;
        private final boolean snapshotData;

        public SnapshottingTask(boolean z, boolean z2) {
            this.snapshotSchema = z;
            this.snapshotData = z2;
        }

        public boolean snapshotData() {
            return this.snapshotData;
        }

        public boolean snapshotSchema() {
            return this.snapshotSchema;
        }

        public String toString() {
            return "SnapshottingTask [snapshotSchema=" + this.snapshotSchema + ", snapshotData=" + this.snapshotData + "]";
        }
    }

    public HistorizedRelationalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig relationalDatabaseConnectorConfig, OffsetContext offsetContext, JdbcConnection jdbcConnection, HistorizedRelationalDatabaseSchema historizedRelationalDatabaseSchema, EventDispatcher<TableId> eventDispatcher, Clock clock, SnapshotProgressListener snapshotProgressListener) {
        this.connectorConfig = relationalDatabaseConnectorConfig;
        this.previousOffset = offsetContext;
        this.jdbcConnection = jdbcConnection;
        this.schema = historizedRelationalDatabaseSchema;
        this.dispatcher = eventDispatcher;
        this.clock = clock;
        this.snapshotProgressListener = snapshotProgressListener;
    }

    @Override // io.debezium.pipeline.source.spi.SnapshotChangeEventSource
    public SnapshotResult execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) throws InterruptedException {
        SnapshottingTask snapshottingTask = getSnapshottingTask(this.previousOffset);
        if (!snapshottingTask.snapshotSchema() && !snapshottingTask.snapshotData()) {
            LOGGER.debug("Skipping snapshotting");
            return SnapshotResult.completed(this.previousOffset);
        }
        delaySnapshotIfNeeded(changeEventSourceContext);
        Connection connection = null;
        try {
            SnapshotContext prepare = prepare(changeEventSourceContext);
            try {
                try {
                    try {
                        try {
                            LOGGER.info("Snapshot step 1 - Preparing");
                            this.snapshotProgressListener.snapshotStarted();
                            if (this.previousOffset != null && this.previousOffset.isSnapshotRunning()) {
                                LOGGER.info("Previous snapshot was cancelled before completion; a new snapshot will be taken.");
                            }
                            connection = this.jdbcConnection.connection();
                            connection.setAutoCommit(false);
                            connectionCreated(prepare);
                            LOGGER.info("Snapshot step 2 - Determining captured tables");
                            determineCapturedTables(prepare);
                            this.snapshotProgressListener.monitoredTablesDetermined(prepare.capturedTables);
                            LOGGER.info("Snapshot step 3 - Locking captured tables");
                            if (snapshottingTask.snapshotSchema()) {
                                lockTablesForSchemaSnapshot(changeEventSourceContext, prepare);
                            }
                            LOGGER.info("Snapshot step 4 - Determining snapshot offset");
                            determineSnapshotOffset(prepare);
                            LOGGER.info("Snapshot step 5 - Reading structure of captured tables");
                            readTableStructure(changeEventSourceContext, prepare);
                            if (snapshottingTask.snapshotSchema()) {
                                LOGGER.info("Snapshot step 6 - Persisting schema history");
                                createSchemaChangeEventsForTables(changeEventSourceContext, prepare);
                                releaseSchemaSnapshotLocks(prepare);
                            } else {
                                LOGGER.info("Snapshot step 6 - Skipping persisting of schema history");
                            }
                            if (snapshottingTask.snapshotData()) {
                                LOGGER.info("Snapshot step 7 - Snapshotting data");
                                createDataEvents(changeEventSourceContext, prepare);
                            } else {
                                LOGGER.info("Snapshot step 7 - Skipping snapshotting of data");
                                prepare.offset.preSnapshotCompletion();
                                prepare.offset.postSnapshotCompletion();
                            }
                            this.dispatcher.dispatchHeartbeatEvent(prepare.offset);
                            this.snapshotProgressListener.snapshotCompleted();
                            SnapshotResult completed = SnapshotResult.completed(prepare.offset);
                            rollbackTransaction(connection);
                            LOGGER.info("Snapshot step 8 - Finalizing");
                            complete(prepare);
                            return completed;
                        } catch (Throwable th) {
                            this.snapshotProgressListener.snapshotAborted();
                            throw new RuntimeException(th);
                        }
                    } catch (RuntimeException e) {
                        this.snapshotProgressListener.snapshotAborted();
                        throw e;
                    }
                } catch (InterruptedException e2) {
                    LOGGER.warn("Snapshot was interrupted before completion");
                    this.snapshotProgressListener.snapshotAborted();
                    throw e2;
                }
            } catch (Throwable th2) {
                rollbackTransaction(connection);
                LOGGER.info("Snapshot step 8 - Finalizing");
                complete(prepare);
                throw th2;
            }
        } catch (Exception e3) {
            LOGGER.error("Failed to initialize snapshot context.", e3);
            throw new RuntimeException(e3);
        }
    }

    protected abstract SnapshottingTask getSnapshottingTask(OffsetContext offsetContext);

    private void delaySnapshotIfNeeded(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) throws InterruptedException {
        Duration snapshotDelay = this.connectorConfig.getSnapshotDelay();
        if (snapshotDelay.isZero() || snapshotDelay.isNegative()) {
            return;
        }
        Threads.Timer timer = Threads.timer(Clock.SYSTEM, snapshotDelay);
        Metronome parker = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);
        while (!timer.expired()) {
            if (!changeEventSourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while awaiting initial snapshot delay");
            }
            LOGGER.info("The connector will wait for {}s before proceeding", Long.valueOf(timer.remaining().getSeconds()));
            parker.pause();
        }
    }

    protected abstract SnapshotContext prepare(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) throws Exception;

    protected void connectionCreated(SnapshotContext snapshotContext) throws Exception {
    }

    private void determineCapturedTables(SnapshotContext snapshotContext) throws Exception {
        Set<TableId> allTableIds = getAllTableIds(snapshotContext);
        HashSet hashSet = new HashSet();
        for (TableId tableId : allTableIds) {
            if (this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
                LOGGER.trace("Adding table {} to the list of captured tables", tableId);
                hashSet.add(tableId);
            } else {
                LOGGER.trace("Ignoring table {} as it's not included in the filter configuration", tableId);
            }
        }
        snapshotContext.capturedTables = hashSet;
    }

    protected abstract Set<TableId> getAllTableIds(SnapshotContext snapshotContext) throws Exception;

    protected abstract void lockTablesForSchemaSnapshot(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, SnapshotContext snapshotContext) throws Exception;

    protected abstract void determineSnapshotOffset(SnapshotContext snapshotContext) throws Exception;

    protected abstract void readTableStructure(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, SnapshotContext snapshotContext) throws Exception;

    protected abstract void releaseSchemaSnapshotLocks(SnapshotContext snapshotContext) throws Exception;

    private void createSchemaChangeEventsForTables(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, SnapshotContext snapshotContext) throws Exception {
        for (TableId tableId : snapshotContext.capturedTables) {
            if (!changeEventSourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while capturing schema of table " + tableId);
            }
            LOGGER.debug("Capturing structure of table {}", tableId);
            this.schema.applySchemaChange(getCreateTableEvent(snapshotContext, snapshotContext.tables.forTable(tableId)));
        }
    }

    protected abstract SchemaChangeEvent getCreateTableEvent(SnapshotContext snapshotContext, Table table) throws Exception;

    private void createDataEvents(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, SnapshotContext snapshotContext) throws InterruptedException {
        EventDispatcher.SnapshotReceiver snapshotChangeEventReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
        snapshotContext.offset.preSnapshotStart();
        for (TableId tableId : snapshotContext.capturedTables) {
            if (!changeEventSourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while snapshotting table " + tableId);
            }
            LOGGER.debug("Snapshotting table {}", tableId);
            createDataEventsForTable(changeEventSourceContext, snapshotContext, snapshotChangeEventReceiver, snapshotContext.tables.forTable(tableId));
        }
        snapshotContext.offset.preSnapshotCompletion();
        snapshotChangeEventReceiver.completeSnapshot();
        snapshotContext.offset.postSnapshotCompletion();
    }

    /* JADX WARN: Finally extract failed */
    private void createDataEventsForTable(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, SnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver snapshotReceiver, Table table) throws InterruptedException {
        long currentTimeInMillis = this.clock.currentTimeInMillis();
        LOGGER.info("\t Exporting data from table '{}'", table.id());
        String snapshotSelect = getSnapshotSelect(snapshotContext, table.id());
        LOGGER.info("\t For table '{}' using select statement: '{}'", table.id(), snapshotSelect);
        try {
            Statement readTableStatement = readTableStatement();
            Throwable th = null;
            try {
                ResultSet executeQuery = readTableStatement.executeQuery(snapshotSelect);
                Throwable th2 = null;
                try {
                    Column[] columnsForResultSet = getColumnsForResultSet(table, executeQuery);
                    int size = table.columns().size();
                    long j = 0;
                    Threads.Timer tableScanLogTimer = getTableScanLogTimer();
                    while (executeQuery.next()) {
                        if (!changeEventSourceContext.isRunning()) {
                            throw new InterruptedException("Interrupted while snapshotting table " + table.id());
                        }
                        j++;
                        Object[] objArr = new Object[size];
                        for (int i = 0; i < size; i++) {
                            objArr[i] = getColumnValue(executeQuery, i + 1, columnsForResultSet[i]);
                        }
                        if (tableScanLogTimer.expired()) {
                            LOGGER.info("\t Exported {} records for table '{}' after {}", new Object[]{Long.valueOf(j), table.id(), Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis)});
                            this.snapshotProgressListener.rowsScanned(table.id(), j);
                            tableScanLogTimer = getTableScanLogTimer();
                        }
                        this.dispatcher.dispatchSnapshotEvent(table.id(), getChangeRecordEmitter(snapshotContext, objArr), snapshotReceiver);
                    }
                    LOGGER.info("\t Finished exporting {} records for table '{}'; total duration '{}'", new Object[]{Long.valueOf(j), table.id(), Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis)});
                    this.snapshotProgressListener.tableSnapshotCompleted(table.id(), j);
                    if (executeQuery != null) {
                        if (0 != 0) {
                            try {
                                executeQuery.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            executeQuery.close();
                        }
                    }
                    if (readTableStatement != null) {
                        if (0 != 0) {
                            try {
                                readTableStatement.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            readTableStatement.close();
                        }
                    }
                } catch (Throwable th5) {
                    if (executeQuery != null) {
                        if (0 != 0) {
                            try {
                                executeQuery.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            executeQuery.close();
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                if (readTableStatement != null) {
                    if (0 != 0) {
                        try {
                            readTableStatement.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    } else {
                        readTableStatement.close();
                    }
                }
                throw th7;
            }
        } catch (SQLException e) {
            throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);
        }
    }

    private Threads.Timer getTableScanLogTimer() {
        return Threads.timer(this.clock, LOG_INTERVAL);
    }

    protected abstract ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext snapshotContext, Object[] objArr);

    protected abstract String getSnapshotSelect(SnapshotContext snapshotContext, TableId tableId);

    private Column[] getColumnsForResultSet(Table table, ResultSet resultSet) throws SQLException {
        ResultSetMetaData metaData = resultSet.getMetaData();
        Column[] columnArr = new Column[metaData.getColumnCount()];
        for (int i = 0; i < columnArr.length; i++) {
            columnArr[i] = table.columnWithName(metaData.getColumnName(i + 1));
        }
        return columnArr;
    }

    private Object getColumnValue(ResultSet resultSet, int i, Column column) throws SQLException {
        return resultSet.getObject(i);
    }

    private Statement readTableStatement() throws SQLException {
        int snapshotFetchSize = this.connectorConfig.getSnapshotFetchSize();
        Statement createStatement = this.jdbcConnection.connection().createStatement();
        createStatement.setFetchSize(snapshotFetchSize);
        return createStatement;
    }

    protected abstract void complete(SnapshotContext snapshotContext);

    private void rollbackTransaction(Connection connection) {
        if (connection != null) {
            try {
                connection.rollback();
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Clock getClock() {
        return this.clock;
    }
}
