/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.DebeziumException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.MySqlSnapshotChangeEventSourceMetrics;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.Collect;
import io.debezium.util.Strings;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlSnapshotChangeEventSource
extends RelationalSnapshotChangeEventSource<MySqlPartition, MySqlOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSnapshotChangeEventSource.class);
    private final MySqlConnectorConfig connectorConfig;
    private final MySqlConnection connection;
    private long globalLockAcquiredAt = -1L;
    private long tableLockAcquiredAt = -1L;
    private final RelationalTableFilters filters;
    private final MySqlSnapshotChangeEventSourceMetrics metrics;
    private final MySqlDatabaseSchema databaseSchema;
    private final List<SchemaChangeEvent> schemaEvents = new ArrayList<SchemaChangeEvent>();
    private Set<TableId> delayedSchemaSnapshotTables = Collections.emptySet();
    private final BlockingConsumer<Function<SourceRecord, SourceRecord>> lastEventProcessor;

    public MySqlSnapshotChangeEventSource(MySqlConnectorConfig connectorConfig, MySqlConnection connection, MySqlDatabaseSchema schema, EventDispatcher<MySqlPartition, TableId> dispatcher, Clock clock, MySqlSnapshotChangeEventSourceMetrics metrics, BlockingConsumer<Function<SourceRecord, SourceRecord>> lastEventProcessor) {
        super(connectorConfig, connection, schema, dispatcher, clock, metrics);
        this.connectorConfig = connectorConfig;
        this.connection = connection;
        this.filters = connectorConfig.getTableFilters();
        this.metrics = metrics;
        this.databaseSchema = schema;
        this.lastEventProcessor = lastEventProcessor;
    }

    @Override
    protected AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffsetContext previousOffset) {
        boolean snapshotSchema = true;
        boolean snapshotData = true;
        if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
            LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
            snapshotSchema = this.databaseSchema.isStorageInitializationExecuted();
            snapshotData = false;
        } else {
            LOGGER.info("No previous offset has been found");
            if (this.connectorConfig.getSnapshotMode().includeData()) {
                LOGGER.info("According to the connector configuration both schema and data will be snapshotted");
            } else {
                LOGGER.info("According to the connector configuration only schema will be snapshotted");
            }
            snapshotData = this.connectorConfig.getSnapshotMode().includeData();
            snapshotSchema = this.connectorConfig.getSnapshotMode().includeSchema();
        }
        return new AbstractSnapshotChangeEventSource.SnapshottingTask(snapshotSchema, snapshotData);
    }

    @Override
    protected AbstractSnapshotChangeEventSource.SnapshotContext<MySqlPartition, MySqlOffsetContext> prepare(MySqlPartition partition) throws Exception {
        return new MySqlSnapshotContext(partition);
    }

    @Override
    protected void connectionCreated(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext) throws Exception {
    }

    @Override
    protected Set<TableId> getAllTableIds(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> ctx) throws Exception {
        LOGGER.info("Read list of available databases");
        ArrayList databaseNames = new ArrayList();
        this.connection.query("SHOW DATABASES", rs -> {
            while (rs.next()) {
                databaseNames.add(rs.getString(1));
            }
        });
        LOGGER.info("\t list of available databases is: {}", databaseNames);
        LOGGER.info("Read list of available tables in each database");
        HashSet<TableId> tableIds = new HashSet<TableId>();
        HashSet<String> readableDatabaseNames = new HashSet<String>();
        for (String dbName : databaseNames) {
            try {
                this.connection.query("SHOW FULL TABLES IN " + this.quote(dbName) + " where Table_Type = 'BASE TABLE'", rs -> {
                    while (rs.next()) {
                        TableId id = new TableId(dbName, null, rs.getString(1));
                        tableIds.add(id);
                    }
                });
                readableDatabaseNames.add(dbName);
            }
            catch (SQLException e) {
                LOGGER.warn("\t skipping database '{}' due to error reading tables: {}", (Object)dbName, (Object)e.getMessage());
            }
        }
        Set includedDatabaseNames = readableDatabaseNames.stream().filter(this.filters.databaseFilter()).collect(Collectors.toSet());
        LOGGER.info("\tsnapshot continuing with database(s): {}", includedDatabaseNames);
        return tableIds;
    }

    @Override
    protected void lockTablesForSchemaSnapshot(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext) throws SQLException, InterruptedException {
        this.connection.connection().setTransactionIsolation(4);
        this.connection.executeWithoutCommitting("SET SESSION lock_wait_timeout=" + this.connectorConfig.snapshotLockTimeout().getSeconds());
        try {
            this.connection.executeWithoutCommitting("SET SESSION innodb_lock_wait_timeout=" + this.connectorConfig.snapshotLockTimeout().getSeconds());
        }
        catch (SQLException e) {
            LOGGER.warn("Unable to set innodb_lock_wait_timeout", (Throwable)e);
        }
        if (this.connectorConfig.getSnapshotLockingMode().usesLocking() && this.connectorConfig.useGlobalLock()) {
            block6: {
                try {
                    this.globalLock();
                    this.metrics.globalLockAcquired();
                }
                catch (SQLException e) {
                    LOGGER.info("Unable to flush and acquire global read lock, will use table read locks after reading table names");
                    if ($assertionsDisabled || !this.isGloballyLocked()) break block6;
                    throw new AssertionError();
                }
            }
            if (this.connectorConfig.getSnapshotLockingMode().flushResetsIsolationLevel()) {
                this.connection.executeWithoutCommitting("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
            }
        }
    }

    @Override
    protected void releaseSchemaSnapshotLocks(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext) throws SQLException {
        if (this.connectorConfig.getSnapshotLockingMode().usesMinimalLocking()) {
            if (this.isGloballyLocked()) {
                this.globalUnlock();
            }
            if (this.isTablesLocked()) {
                LOGGER.warn("Tables were locked explicitly, but to get a consistent snapshot we cannot release the locks until we've read all tables.");
            }
        }
    }

    @Override
    protected void releaseDataSnapshotLocks(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext) throws Exception {
        if (this.isGloballyLocked()) {
            this.globalUnlock();
        }
        if (this.isTablesLocked()) {
            this.tableUnlock();
            if (!this.delayedSchemaSnapshotTables.isEmpty()) {
                this.schemaEvents.clear();
                this.createSchemaEventsForTables(snapshotContext, this.delayedSchemaSnapshotTables, false);
                for (SchemaChangeEvent event : this.schemaEvents) {
                    if (this.databaseSchema.storeOnlyCapturedTables() && event.getDatabase() != null && event.getDatabase().length() != 0 && !this.connectorConfig.getTableFilters().databaseFilter().test(event.getDatabase())) {
                        LOGGER.debug("Skipping schema event as it belongs to a non-captured database: '{}'", (Object)event);
                        continue;
                    }
                    LOGGER.debug("Processing schema event {}", (Object)event);
                    TableId tableId = event.getTables().isEmpty() ? null : event.getTables().iterator().next().id();
                    ((MySqlOffsetContext)snapshotContext.offset).event(tableId, this.getClock().currentTime());
                    this.dispatcher.dispatchSchemaChangeEvent((MySqlPartition)snapshotContext.partition, tableId, receiver -> receiver.schemaChangeEvent(event));
                }
                this.databaseSchema.tableIds().forEach(x -> snapshotContext.tables.overwriteTable(this.databaseSchema.tableFor((TableId)x)));
            }
        }
    }

    @Override
    protected void determineSnapshotOffset(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> ctx, MySqlOffsetContext previousOffset) throws Exception {
        if (!this.isGloballyLocked() && !this.isTablesLocked() && this.connectorConfig.getSnapshotLockingMode().usesLocking()) {
            return;
        }
        if (previousOffset != null) {
            ctx.offset = previousOffset;
            this.tryStartingSnapshot(ctx);
            return;
        }
        MySqlOffsetContext offsetContext = MySqlOffsetContext.initial(this.connectorConfig);
        ctx.offset = offsetContext;
        LOGGER.info("Read binlog position of MySQL primary server");
        String showMasterStmt = "SHOW MASTER STATUS";
        this.connection.query("SHOW MASTER STATUS", rs -> {
            if (rs.next()) {
                String binlogFilename = rs.getString(1);
                long binlogPosition = rs.getLong(2);
                offsetContext.setBinlogStartPoint(binlogFilename, binlogPosition);
                if (rs.getMetaData().getColumnCount() > 4) {
                    String gtidSet = rs.getString(5);
                    offsetContext.setCompletedGtidSet(gtidSet);
                    LOGGER.info("\t using binlog '{}' at position '{}' and gtid '{}'", new Object[]{binlogFilename, binlogPosition, gtidSet});
                } else {
                    LOGGER.info("\t using binlog '{}' at position '{}'", (Object)binlogFilename, (Object)binlogPosition);
                }
            } else {
                throw new DebeziumException("Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured");
            }
        });
        this.tryStartingSnapshot(ctx);
    }

    private void addSchemaEvent(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext, String database, String ddl) {
        this.schemaEvents.addAll(this.databaseSchema.parseSnapshotDdl((MySqlPartition)snapshotContext.partition, ddl, database, (MySqlOffsetContext)snapshotContext.offset, this.clock.currentTimeAsInstant()));
    }

    @Override
    protected void readTableStructure(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext, MySqlOffsetContext offsetContext) throws Exception {
        Set<TableId> capturedSchemaTables;
        if (this.twoPhaseSchemaSnapshot()) {
            this.tableLock(snapshotContext);
            this.determineSnapshotOffset(snapshotContext, offsetContext);
            capturedSchemaTables = snapshotContext.capturedTables;
            LOGGER.info("Table level locking is in place, the schema will be capture in two phases, now capturing: {}", capturedSchemaTables);
            this.delayedSchemaSnapshotTables = Collect.minus(snapshotContext.capturedSchemaTables, snapshotContext.capturedTables);
            LOGGER.info("Tables for delayed schema capture: {}", this.delayedSchemaSnapshotTables);
        }
        if (this.databaseSchema.storeOnlyCapturedTables()) {
            capturedSchemaTables = snapshotContext.capturedTables;
            LOGGER.info("Only captured tables schema should be captured, capturing: {}", capturedSchemaTables);
        } else {
            capturedSchemaTables = snapshotContext.capturedSchemaTables;
            LOGGER.info("All eligible tables schema should be captured, capturing: {}", capturedSchemaTables);
        }
        Map tablesToRead = capturedSchemaTables.stream().collect(Collectors.groupingBy(TableId::catalog, LinkedHashMap::new, Collectors.toList()));
        Set databases = tablesToRead.keySet();
        this.addSchemaEvent(snapshotContext, "", this.connection.setStatementFor(this.connection.readMySqlCharsetSystemVariables()));
        for (TableId tableId : capturedSchemaTables) {
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while emitting initial DROP TABLE events");
            }
            this.addSchemaEvent(snapshotContext, tableId.catalog(), "DROP TABLE IF EXISTS " + this.quote(tableId));
        }
        Map<String, MySqlConnection.DatabaseLocales> databaseCharsets = this.connection.readDatabaseCollations();
        for (String database : databases) {
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while reading structure of schema " + databases);
            }
            LOGGER.info("Reading structure of database '{}'", (Object)database);
            this.addSchemaEvent(snapshotContext, database, "DROP DATABASE IF EXISTS " + this.quote(database));
            StringBuilder createDatabaseDddl = new StringBuilder("CREATE DATABASE " + this.quote(database));
            MySqlConnection.DatabaseLocales defaultDatabaseLocales = databaseCharsets.get(database);
            if (defaultDatabaseLocales != null) {
                defaultDatabaseLocales.appendToDdlStatement(database, createDatabaseDddl);
            }
            this.addSchemaEvent(snapshotContext, database, createDatabaseDddl.toString());
            this.addSchemaEvent(snapshotContext, database, "USE " + this.quote(database));
            this.createSchemaEventsForTables(snapshotContext, (Collection)tablesToRead.get(database), true);
        }
    }

    void createSchemaEventsForTables(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext, Collection<TableId> tablesToRead, boolean firstPhase) throws SQLException {
        for (TableId tableId : tablesToRead) {
            if (firstPhase && this.delayedSchemaSnapshotTables.contains(tableId)) continue;
            this.connection.query("SHOW CREATE TABLE " + this.quote(tableId), rs -> {
                if (rs.next()) {
                    this.addSchemaEvent(snapshotContext, tableId.catalog(), rs.getString(2));
                }
            });
        }
    }

    private boolean twoPhaseSchemaSnapshot() {
        return this.connectorConfig.getSnapshotLockingMode().usesLocking() && !this.isGloballyLocked();
    }

    @Override
    protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext, Table table) throws SQLException {
        return SchemaChangeEvent.ofSnapshotCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table);
    }

    @Override
    protected void complete(AbstractSnapshotChangeEventSource.SnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext) {
    }

    @Override
    protected Optional<String> getSnapshotSelect(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext, TableId tableId, List<String> columns) {
        String snapshotSelectColumns = columns.stream().collect(Collectors.joining(", "));
        return Optional.of(String.format("SELECT %s FROM `%s`.`%s`", snapshotSelectColumns, tableId.catalog(), tableId.table()));
    }

    private boolean isGloballyLocked() {
        return this.globalLockAcquiredAt != -1L;
    }

    private boolean isTablesLocked() {
        return this.tableLockAcquiredAt != -1L;
    }

    private void globalLock() throws SQLException {
        LOGGER.info("Flush and obtain global read lock to prevent writes to database");
        this.connection.executeWithoutCommitting(this.connectorConfig.getSnapshotLockingMode().getLockStatement());
        this.globalLockAcquiredAt = this.clock.currentTimeInMillis();
    }

    private void globalUnlock() throws SQLException {
        LOGGER.info("Releasing global read lock to enable MySQL writes");
        this.connection.executeWithoutCommitting("UNLOCK TABLES");
        long lockReleased = this.clock.currentTimeInMillis();
        this.metrics.globalLockReleased();
        LOGGER.info("Writes to MySQL tables prevented for a total of {}", (Object)Strings.duration(lockReleased - this.globalLockAcquiredAt));
        this.globalLockAcquiredAt = -1L;
    }

    private void tableLock(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext) throws SQLException {
        if (!this.connection.userHasPrivileges("LOCK TABLES")) {
            throw new DebeziumException("User does not have the 'LOCK TABLES' privilege required to obtain a consistent snapshot by preventing concurrent writes to tables.");
        }
        LOGGER.info("Flush and obtain read lock for {} tables (preventing writes)", snapshotContext.capturedTables);
        if (!snapshotContext.capturedTables.isEmpty()) {
            String tableList = snapshotContext.capturedTables.stream().map(tid -> this.quote((TableId)tid)).collect(Collectors.joining(","));
            this.connection.executeWithoutCommitting("FLUSH TABLES " + tableList + " WITH READ LOCK");
        }
        this.tableLockAcquiredAt = this.clock.currentTimeInMillis();
        this.metrics.globalLockAcquired();
    }

    private void tableUnlock() throws SQLException {
        LOGGER.info("Releasing table read lock to enable MySQL writes");
        this.connection.executeWithoutCommitting("UNLOCK TABLES");
        long lockReleased = this.clock.currentTimeInMillis();
        this.metrics.globalLockReleased();
        LOGGER.info("Writes to MySQL tables prevented for a total of {}", (Object)Strings.duration(lockReleased - this.tableLockAcquiredAt));
        this.tableLockAcquiredAt = -1L;
    }

    private String quote(String dbOrTableName) {
        return "`" + dbOrTableName + "`";
    }

    private String quote(TableId id) {
        return this.quote(id.catalog()) + "." + this.quote(id.table());
    }

    @Override
    protected OptionalLong rowCountForTable(TableId tableId) {
        return this.connection.getEstimatedTableSize(tableId);
    }

    @Override
    protected Statement readTableStatement(OptionalLong rowCount) throws SQLException {
        long largeTableRowCount = this.connectorConfig.rowCountForLargeTable();
        if (!rowCount.isPresent() || largeTableRowCount == 0L || rowCount.getAsLong() <= largeTableRowCount) {
            return super.readTableStatement(rowCount);
        }
        return this.createStatementWithLargeResultSet();
    }

    private Statement createStatementWithLargeResultSet() throws SQLException {
        int fetchSize = this.connectorConfig.getSnapshotFetchSize();
        Statement stmt = this.connection.connection().createStatement(1003, 1007);
        stmt.setFetchSize(fetchSize);
        return stmt;
    }

    @Override
    protected void createSchemaChangeEventsForTables(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
        this.tryStartingSnapshot(snapshotContext);
        for (SchemaChangeEvent event : this.schemaEvents) {
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while processing event " + event);
            }
            if (this.databaseSchema.skipSchemaChangeEvent(event)) continue;
            LOGGER.debug("Processing schema event {}", (Object)event);
            TableId tableId = event.getTables().isEmpty() ? null : event.getTables().iterator().next().id();
            ((MySqlOffsetContext)snapshotContext.offset).event(tableId, this.getClock().currentTime());
            this.dispatcher.dispatchSchemaChangeEvent((MySqlPartition)snapshotContext.partition, tableId, receiver -> receiver.schemaChangeEvent(event));
        }
        this.databaseSchema.tableIds().forEach(x -> snapshotContext.tables.overwriteTable(this.databaseSchema.tableFor((TableId)x)));
    }

    @Override
    protected void postSnapshot() throws InterruptedException {
        this.lastEventProcessor.accept(record -> {
            record.sourceOffset().remove("snapshot");
            ((Struct)record.value()).getStruct("source").put("snapshot", (Object)SnapshotRecord.LAST.toString().toLowerCase());
            return record;
        });
        super.postSnapshot();
    }

    private static class MySqlSnapshotContext
    extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> {
        MySqlSnapshotContext(MySqlPartition partition) throws SQLException {
            super(partition, "");
        }
    }
}

