package io.debezium.connector.mysql;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.mysql.ChainedReader;
import io.debezium.connector.mysql.Filters;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.util.Collect;
import io.debezium.util.LoggingContext;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:io/debezium/connector/mysql/MySqlConnectorTask.class */
public final class MySqlConnectorTask extends BaseSourceTask {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private volatile MySqlTaskContext taskContext;
    private volatile MySqlJdbcContext connectionContext;
    private volatile ChainedReader readers;

    /* loaded from: input_file:io/debezium/connector/mysql/MySqlConnectorTask$ServerIdGenerator.class */
    public class ServerIdGenerator {
        private final long configuredServerId;
        private final long offset;
        private int counter;

        private ServerIdGenerator(long j, long j2) {
            this.configuredServerId = j;
            this.offset = j2;
            this.counter = 0;
        }

        public long getNextServerId() {
            this.counter++;
            return this.configuredServerId + (this.counter * this.offset);
        }

        public long getConfiguredServerId() {
            return this.configuredServerId;
        }
    }

    @Override // org.apache.kafka.connect.connector.Task
    public String version() {
        return Module.version();
    }

    @Override // io.debezium.connector.common.BaseSourceTask
    public synchronized void start(Configuration configuration) {
        SourceInfo source;
        String string = configuration.getString(MySqlConnectorConfig.SERVER_NAME);
        LoggingContext.PreviousContext forConnector = LoggingContext.forConnector(Module.contextName(), string, ConnectMetricsRegistry.TASK_TAG_NAME);
        try {
            try {
                boolean z = false;
                Map<String, ?> restartOffset = getRestartOffset(this.context.offsetStorageReader().offset(Collect.hashMapOf(SourceInfo.SERVER_PARTITION_KEY, string)));
                if (restartOffset != null) {
                    this.taskContext = createAndStartTaskContext(configuration, SourceInfo.offsetsHaveFilterInfo(restartOffset) ? getOldFilters(restartOffset, configuration) : getAllFilters(configuration));
                    this.connectionContext = this.taskContext.getConnectionContext();
                    source = this.taskContext.source();
                    source.setOffset(restartOffset);
                    this.logger.info("Found existing offset: {}", restartOffset);
                    if (this.taskContext.historyExists()) {
                        this.taskContext.loadHistory(source);
                        if (!source.isSnapshotInEffect()) {
                            z = false;
                            if (!isBinlogAvailable()) {
                                if (!this.taskContext.isSnapshotAllowedWhenNeeded()) {
                                    throw new ConnectException("The connector is trying to read binlog starting at " + source + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.");
                                }
                                z = true;
                            }
                        } else {
                            if (this.taskContext.isSnapshotNeverAllowed()) {
                                throw new ConnectException("The connector previously stopped while taking a snapshot, but now the connector is configured to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.");
                            }
                            z = true;
                            this.logger.info("Prior execution was an incomplete snapshot, so starting new snapshot");
                        }
                    } else {
                        if (!this.taskContext.isSchemaOnlyRecoverySnapshot()) {
                            throw new ConnectException("The db history topic is missing. You may attempt to recover it by reconfiguring the connector to " + MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
                        }
                        z = true;
                        if (!isBinlogAvailable()) {
                            throw new ConnectException("The connector is trying to read binlog starting at " + source + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.");
                        }
                        this.logger.info("The db-history topic is missing but we are in {} snapshot mode. Attempting to snapshot the current schema and then begin reading the binlog from the last recorded offset.", MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
                        this.taskContext.initializeHistoryStorage();
                    }
                } else {
                    this.taskContext = createAndStartTaskContext(configuration, getAllFilters(configuration));
                    this.taskContext.initializeHistoryStorage();
                    this.connectionContext = this.taskContext.getConnectionContext();
                    source = this.taskContext.source();
                    if (this.taskContext.isSnapshotNeverAllowed()) {
                        this.logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog");
                        source.setBinlogStartPoint("", 0L);
                        this.taskContext.initializeHistory();
                        String earliestBinlogFilename = earliestBinlogFilename();
                        if (earliestBinlogFilename == null) {
                            this.logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
                        } else if (!earliestBinlogFilename.endsWith("00001")) {
                            this.logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
                        }
                    } else {
                        z = true;
                        this.logger.info("Found no existing offset, so preparing to perform a snapshot");
                    }
                }
                if (!z && source.gtidSet() == null && this.connectionContext.isGtidModeEnabled()) {
                    source.setCompletedGtidSet("");
                }
                boolean isRowBinlogEnabled = isRowBinlogEnabled();
                ChainedReader.Builder builder = new ChainedReader.Builder();
                if (z) {
                    SnapshotReader snapshotReader = new SnapshotReader("snapshot", this.taskContext);
                    if (1 != 0) {
                        snapshotReader.generateInsertEvents();
                    }
                    if (!this.taskContext.getConnectorConfig().getSnapshotDelay().isZero()) {
                        builder.addReader(new TimedBlockingReader("timed-blocker", this.taskContext.getConnectorConfig().getSnapshotDelay()));
                    }
                    builder.addReader(snapshotReader);
                    if (this.taskContext.isInitialSnapshotOnly()) {
                        this.logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
                        builder.addReader(new BlockingReader("blocker", "Connector has completed all of its work but will continue in the running state. It can be shut down at any time."));
                        builder.completionMessage("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
                    } else {
                        if (!isRowBinlogEnabled) {
                            throw new ConnectException("The MySQL server is not configured to use a row-level binlog, which is required for this connector to work properly. Change the MySQL configuration to use a row-level binlog and restart the connector.");
                        }
                        builder.addReader(new BinlogReader("binlog", this.taskContext, null));
                    }
                } else {
                    if (!source.hasFilterInfo()) {
                        source.maybeSetFilterDataFromConfig(configuration);
                    }
                    if (!isRowBinlogEnabled) {
                        throw new ConnectException("The MySQL server does not appear to be using a full row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector.");
                    }
                    if (!newTablesInConfig()) {
                        builder.addReader(new BinlogReader("binlog", this.taskContext, null));
                    } else if (this.taskContext.getConnectorConfig().getSnapshotNewTables() == MySqlConnectorConfig.SnapshotNewTables.PARALLEL) {
                        ServerIdGenerator serverIdGenerator = new ServerIdGenerator(configuration.getLong(MySqlConnectorConfig.SERVER_ID), configuration.getLong(MySqlConnectorConfig.SERVER_ID_OFFSET));
                        ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(configuration, this.taskContext, getNewFilters(restartOffset, configuration), serverIdGenerator);
                        MySqlTaskContext createAndStartTaskContext = createAndStartTaskContext(configuration, getAllFilters(configuration));
                        createAndStartTaskContext.source().completeSnapshot();
                        BinlogReader binlogReader = new BinlogReader("binlog", createAndStartTaskContext, null, serverIdGenerator.getConfiguredServerId());
                        ReconcilingBinlogReader createReconcilingBinlogReader = parallelSnapshotReader.createReconcilingBinlogReader(binlogReader);
                        builder.addReader(parallelSnapshotReader);
                        builder.addReader(createReconcilingBinlogReader);
                        builder.addReader(binlogReader);
                        createAndStartTaskContext.getClass();
                        binlogReader.uponCompletion(createAndStartTaskContext::shutdown);
                    }
                }
                this.readers = builder.build();
                this.readers.uponCompletion(this::completeReaders);
                this.readers.initialize();
                this.readers.start();
                forConnector.restore();
            } catch (Throwable th) {
                try {
                    stop();
                } catch (Throwable th2) {
                    this.logger.error("Failed to start the connector (see other exception), but got this error while cleaning up", th2);
                }
                if (th instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                    throw new ConnectException("Interrupted while starting the connector", th);
                }
                if (!(th instanceof ConnectException)) {
                    throw new ConnectException(th);
                }
                throw ((ConnectException) th);
            }
        } catch (Throwable th3) {
            forConnector.restore();
            throw th3;
        }
    }

    private Map<String, ?> getRestartOffset(Map<String, ?> map) {
        HashMap hashMap = new HashMap();
        if (map != null) {
            for (Map.Entry<String, ?> entry : map.entrySet()) {
                if (entry.getKey().startsWith(SourceInfo.RESTART_PREFIX)) {
                    hashMap.put(entry.getKey().substring(SourceInfo.RESTART_PREFIX.length()), entry.getValue());
                }
            }
        }
        return hashMap.isEmpty() ? map : hashMap;
    }

    private static MySqlTaskContext createAndStartTaskContext(Configuration configuration, Filters filters) {
        MySqlTaskContext mySqlTaskContext = new MySqlTaskContext(configuration, filters);
        mySqlTaskContext.start();
        return mySqlTaskContext;
    }

    private boolean newTablesInConfig() {
        BiFunction biFunction = (str, str2) -> {
            if (str == null || str.isEmpty()) {
                return false;
            }
            if (str2 == null || str2.isEmpty()) {
                return true;
            }
            Set set = (Set) Stream.of((Object[]) str2.split("/s*,/s*")).collect(Collectors.toSet());
            return Boolean.valueOf(!((Set) Stream.of((Object[]) str.split("/s*,/s*")).filter(str -> {
                return !set.contains(str);
            }).collect(Collectors.toSet())).isEmpty());
        };
        SourceInfo source = this.taskContext.source();
        Configuration config = this.taskContext.config();
        if (source.hasFilterInfo()) {
            return ((Boolean) biFunction.apply(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST), source.getDatabaseWhitelist())).booleanValue() || ((Boolean) biFunction.apply(config.getString(MySqlConnectorConfig.TABLE_WHITELIST), source.getTableWhitelist())).booleanValue() || ((Boolean) biFunction.apply(source.getDatabaseBlacklist(), config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST))).booleanValue() || ((Boolean) biFunction.apply(source.getTableBlacklist(), config.getString(MySqlConnectorConfig.TABLE_BLACKLIST))).booleanValue();
        }
        return false;
    }

    private static Filters getNewFilters(Map<String, ?> map, Configuration configuration) {
        return new Filters.Builder(configuration).excludeAllTables(getOldFilters(map, configuration)).build();
    }

    private static Filters getOldFilters(Map<String, ?> map, Configuration configuration) {
        return new Filters.Builder(configuration).setFiltersFromOffsets(map).build();
    }

    private static Filters getAllFilters(Configuration configuration) {
        return new Filters.Builder(configuration).build();
    }

    @Override // org.apache.kafka.connect.source.SourceTask
    public List<SourceRecord> poll() throws InterruptedException {
        ChainedReader chainedReader = this.readers;
        if (chainedReader == null) {
            return null;
        }
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(ConnectMetricsRegistry.TASK_TAG_NAME);
        try {
            this.logger.trace("Polling for events");
            List<SourceRecord> poll = chainedReader.poll();
            configureLoggingContext.restore();
            return poll;
        } catch (Throwable th) {
            configureLoggingContext.restore();
            throw th;
        }
    }

    @Override // org.apache.kafka.connect.source.SourceTask, org.apache.kafka.connect.connector.Task
    public synchronized void stop() {
        if (this.context != null) {
            LoggingContext.PreviousContext previousContext = null;
            if (this.taskContext != null) {
                previousContext = this.taskContext.configureLoggingContext(ConnectMetricsRegistry.TASK_TAG_NAME);
            }
            try {
                this.logger.info("Stopping MySQL connector task");
                if (this.readers != null) {
                    this.readers.stop();
                    this.readers.destroy();
                }
            } finally {
                if (previousContext != null) {
                    previousContext.restore();
                }
            }
        }
    }

    @Override // io.debezium.connector.common.BaseSourceTask
    protected Iterable<Field> getAllConfigurationFields() {
        return MySqlConnectorConfig.ALL_FIELDS;
    }

    protected void completeReaders() {
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(ConnectMetricsRegistry.TASK_TAG_NAME);
        try {
            if (this.taskContext != null) {
                this.taskContext.shutdown();
            }
        } catch (Throwable th) {
            this.logger.error("Unexpected error shutting down the database history and/or closing JDBC connections", th);
        } finally {
            this.context = null;
            this.logger.info("Connector task finished all work and is now shutdown");
            configureLoggingContext.restore();
        }
    }

    protected boolean isBinlogAvailable() {
        String gtidSet = this.taskContext.source().gtidSet();
        if (gtidSet == null) {
            String binlogFilename = this.taskContext.source().binlogFilename();
            if (binlogFilename == null || binlogFilename.equals("")) {
                return true;
            }
            ArrayList arrayList = new ArrayList();
            try {
                this.logger.info("Step 0: Get all known binlogs from MySQL");
                this.connectionContext.jdbc().query("SHOW BINARY LOGS", resultSet -> {
                    while (resultSet.next()) {
                        arrayList.add(resultSet.getString(1));
                    }
                });
                Stream stream = arrayList.stream();
                binlogFilename.getClass();
                boolean anyMatch = stream.anyMatch((v1) -> {
                    return r1.equals(v1);
                });
                if (anyMatch) {
                    this.logger.info("MySQL has the binlog file '{}' required by the connector", binlogFilename);
                } else if (this.logger.isInfoEnabled()) {
                    this.logger.info("Connector requires binlog file '{}', but MySQL only has {}", binlogFilename, String.join(", ", arrayList));
                }
                return anyMatch;
            } catch (SQLException e) {
                throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", e);
            }
        }
        if (gtidSet.trim().isEmpty()) {
            return true;
        }
        String knownGtidSet = this.connectionContext.knownGtidSet();
        if (knownGtidSet == null || knownGtidSet.trim().isEmpty()) {
            this.logger.info("Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled");
            return false;
        }
        GtidSet retainAll = new GtidSet(gtidSet).retainAll(this.taskContext.gtidSourceFilter());
        GtidSet gtidSet2 = new GtidSet(knownGtidSet);
        if (!retainAll.isContainedWithin(gtidSet2)) {
            this.logger.info("Connector last known GTIDs are {}, but MySQL has {}", retainAll, gtidSet2);
            return false;
        }
        this.logger.info("MySQL current GTID set {} does contain the GTID set required by the connector {}", gtidSet2, retainAll);
        GtidSet subtractGtidSet = this.connectionContext.subtractGtidSet(gtidSet2.retainAll(this.taskContext.gtidSourceFilter()), retainAll);
        GtidSet subtractGtidSet2 = this.connectionContext.subtractGtidSet(subtractGtidSet, this.connectionContext.purgedGtidSet());
        this.logger.info("GTIDs known by the server but not processed yet {}, for replication are available only {}", subtractGtidSet, subtractGtidSet2);
        if (subtractGtidSet.equals(subtractGtidSet2)) {
            return true;
        }
        this.logger.info("Some of the GTIDs needed to replicate have been already purged");
        return false;
    }

    protected String earliestBinlogFilename() {
        ArrayList arrayList = new ArrayList();
        try {
            this.logger.info("Checking all known binlogs from MySQL");
            this.connectionContext.jdbc().query("SHOW BINARY LOGS", resultSet -> {
                while (resultSet.next()) {
                    arrayList.add(resultSet.getString(1));
                }
            });
            if (arrayList.isEmpty()) {
                return null;
            }
            return (String) arrayList.get(0);
        } catch (SQLException e) {
            throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", e);
        }
    }

    protected boolean isRowBinlogEnabled() {
        AtomicReference atomicReference = new AtomicReference("");
        try {
            this.connectionContext.jdbc().query("SHOW GLOBAL VARIABLES LIKE 'binlog_format'", resultSet -> {
                if (resultSet.next()) {
                    atomicReference.set(resultSet.getString(2));
                }
            });
            this.logger.debug("binlog_format={}", atomicReference.get());
            AtomicReference atomicReference2 = new AtomicReference("");
            try {
                this.connectionContext.jdbc().query("SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'", resultSet2 -> {
                    if (resultSet2.next()) {
                        atomicReference2.set(resultSet2.getString(2));
                    }
                });
                this.logger.debug("binlog_row_image={}", atomicReference2.get());
                return "ROW".equalsIgnoreCase((String) atomicReference.get()) && "FULL".equalsIgnoreCase((String) atomicReference2.get());
            } catch (SQLException e) {
                throw new ConnectException("Unexpected error while connecting to MySQL and looking at BINLOG row image mode: ", e);
            }
        } catch (SQLException e2) {
            throw new ConnectException("Unexpected error while connecting to MySQL and looking at BINLOG mode: ", e2);
        }
    }
}
