package io.debezium.connector.oracle.logminer;

import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.AbstractStreamingAdapter;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.connector.oracle.Scn;
import io.debezium.document.Document;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Clock;
import io.debezium.util.HexConverter;
import io.debezium.util.Strings;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/oracle/logminer/LogMinerAdapter.class */
public class LogMinerAdapter extends AbstractStreamingAdapter {
    private static final int GET_TRANSACTION_SCN_ATTEMPTS = 5;
    public static final String TYPE = "logminer";
    private static final Duration GET_TRANSACTION_SCN_PAUSE = Duration.ofSeconds(1);
    private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerAdapter.class);

    public LogMinerAdapter(OracleConnectorConfig oracleConnectorConfig) {
        super(oracleConnectorConfig);
    }

    @Override // io.debezium.connector.oracle.StreamingAdapter
    public String getType() {
        return TYPE;
    }

    @Override // io.debezium.connector.oracle.StreamingAdapter
    public HistoryRecordComparator getHistoryRecordComparator() {
        return new HistoryRecordComparator() { // from class: io.debezium.connector.oracle.logminer.LogMinerAdapter.1
            @Override // io.debezium.relational.history.HistoryRecordComparator
            protected boolean isPositionAtOrBefore(Document document, Document document2) {
                return LogMinerAdapter.this.resolveScn(document).compareTo(LogMinerAdapter.this.resolveScn(document2)) < 1;
            }
        };
    }

    @Override // io.debezium.connector.oracle.StreamingAdapter
    public OffsetContext.Loader<OracleOffsetContext> getOffsetContextLoader() {
        return new LogMinerOracleOffsetContextLoader(this.connectorConfig);
    }

    @Override // io.debezium.connector.oracle.StreamingAdapter
    public StreamingChangeEventSource<OraclePartition, OracleOffsetContext> getSource(OracleConnection oracleConnection, EventDispatcher<OraclePartition, TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema oracleDatabaseSchema, OracleTaskContext oracleTaskContext, Configuration configuration, OracleStreamingChangeEventSourceMetrics oracleStreamingChangeEventSourceMetrics) {
        return new LogMinerStreamingChangeEventSource(this.connectorConfig, oracleConnection, eventDispatcher, errorHandler, clock, oracleDatabaseSchema, configuration, oracleStreamingChangeEventSourceMetrics);
    }

    @Override // io.debezium.connector.oracle.StreamingAdapter
    public OracleOffsetContext determineSnapshotOffset(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> relationalSnapshotContext, OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection) throws SQLException {
        Scn orElse = getLatestTableDdlScn(relationalSnapshotContext, oracleConnection).orElse(null);
        String transactionTableName = getTransactionTableName(oracleConnectorConfig);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Optional<Scn> currentScn = isPendingTransactionSkip(oracleConnectorConfig) ? getCurrentScn(orElse, oracleConnection) : getPendingTransactions(orElse, oracleConnection, linkedHashMap, transactionTableName);
        if (!currentScn.isPresent()) {
            throw new DebeziumException("Failed to resolve current SCN");
        }
        OracleConnection oracleConnection2 = new OracleConnection(oracleConnection.config(), () -> {
            return getClass().getClassLoader();
        }, false);
        try {
            oracleConnection2.setAutoCommit(false);
            if (!Strings.isNullOrEmpty(oracleConnectorConfig.getPdbName())) {
                oracleConnection2.resetSessionToCdb();
            }
            OracleOffsetContext determineSnapshotOffset = determineSnapshotOffset(oracleConnectorConfig, oracleConnection2, currentScn.get(), linkedHashMap, transactionTableName);
            oracleConnection2.close();
            return determineSnapshotOffset;
        } catch (Throwable th) {
            try {
                oracleConnection2.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private Optional<Scn> getCurrentScn(Scn scn, OracleConnection oracleConnection) throws SQLException {
        Scn scn2;
        do {
            scn2 = (Scn) oracleConnection.queryAndMap("SELECT CURRENT_SCN FROM V$DATABASE", resultSet -> {
                return resultSet.next() ? Scn.valueOf(resultSet.getString(1)) : Scn.NULL;
            });
        } while (areSameTimestamp(scn, scn2, oracleConnection));
        return Optional.ofNullable(scn2);
    }

    private Optional<Scn> getPendingTransactions(Scn scn, OracleConnection oracleConnection, Map<String, Scn> map, String str) throws SQLException {
        Scn scn2;
        String str2 = "SELECT d.CURRENT_SCN, t.XID, t.START_SCN FROM V$DATABASE d LEFT OUTER JOIN " + str + " t ON t.START_SCN < d.CURRENT_SCN ";
        do {
            scn2 = null;
            map.clear();
            try {
                Statement createStatement = oracleConnection.connection().createStatement();
                try {
                    ResultSet executeQuery = createStatement.executeQuery(str2);
                    while (executeQuery.next()) {
                        try {
                            if (scn2 == null) {
                                scn2 = Scn.valueOf(executeQuery.getString(1));
                            }
                            String string = executeQuery.getString(3);
                            if (!Strings.isNullOrEmpty(string)) {
                                map.put(HexConverter.convertToHexString(executeQuery.getBytes(2)), Scn.valueOf(string));
                            }
                        } finally {
                        }
                    }
                    if (executeQuery != null) {
                        executeQuery.close();
                    }
                    if (createStatement != null) {
                        createStatement.close();
                    }
                } finally {
                }
            } catch (SQLException e) {
                LOGGER.warn("Could not query the {} view: {}", new Object[]{str, e.getMessage(), e});
                throw e;
            }
        } while (areSameTimestamp(scn, scn2, oracleConnection));
        for (Map.Entry<String, Scn> entry : map.entrySet()) {
            LOGGER.trace("\tPending Transaction '{}' started at SCN {}", entry.getKey(), entry.getValue());
        }
        return Optional.ofNullable(scn2);
    }

    private OracleOffsetContext determineSnapshotOffset(OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection, Scn scn, Map<String, Scn> map, String str) throws SQLException {
        if (isPendingTransactionSkip(oracleConnectorConfig)) {
            LOGGER.info("\tNo in-progress transactions will be captured.");
        } else if (isPendingTransactionViewOnly(oracleConnectorConfig)) {
            LOGGER.info("\tSkipping transaction logs for resolving snapshot offset, only using {}.", str);
        } else {
            LOGGER.info("\tConsulting {} and transaction logs for resolving snapshot offset.", str);
            getPendingTransactionsFromLogs(oracleConnection, scn, map);
        }
        if (!map.isEmpty()) {
            for (Map.Entry<String, Scn> entry : map.entrySet()) {
                LOGGER.info("\tFound in-progress transaction {}, starting at SCN {}", entry.getKey(), entry.getValue());
            }
        } else if (!isPendingTransactionSkip(oracleConnectorConfig)) {
            LOGGER.info("\tFound no in-progress transactions.");
        }
        return OracleOffsetContext.create().logicalName(oracleConnectorConfig).scn(scn).snapshotScn(scn).snapshotPendingTransactions(map).transactionContext(new TransactionContext()).incrementalSnapshotContext(new SignalBasedIncrementalSnapshotContext()).build();
    }

    private void addLogsToSession(List<LogFile> list, OracleConnection oracleConnection) throws SQLException {
        for (LogFile logFile : list) {
            LOGGER.debug("\tAdding log: {}", logFile.getFileName());
            oracleConnection.executeWithoutCommitting(SqlUtils.addLogFileStatement("DBMS_LOGMNR.ADDFILE", logFile.getFileName()));
        }
    }

    private void startSession(OracleConnection oracleConnection) throws SQLException {
        LOGGER.debug("\tStarting mining session");
        oracleConnection.executeWithoutCommitting("BEGIN sys.dbms_logmnr.start_logmnr(OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + DBMS_LOGMNR.NO_ROWID_IN_STMT);END;");
    }

    private void stopSession(OracleConnection oracleConnection) throws SQLException {
        try {
            LOGGER.debug("\tStopping mining session");
            oracleConnection.executeWithoutCommitting("BEGIN SYS.DBMS_LOGMNR.END_LOGMNR(); END;");
        } catch (SQLException e) {
            if (!e.getMessage().toUpperCase().contains("ORA-01307")) {
                throw e;
            }
            LOGGER.debug("LogMiner mining session is already closed.");
        }
    }

    private Scn getOldestScnAvailableInLogs(OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection) throws SQLException {
        return (Scn) oracleConnection.queryAndMap(SqlUtils.oldestFirstChangeQuery(oracleConnectorConfig.getLogMiningArchiveLogRetention(), oracleConnectorConfig.getLogMiningArchiveDestinationName()), resultSet -> {
            if (resultSet.next()) {
                String string = resultSet.getString(1);
                if (!Strings.isNullOrEmpty(string)) {
                    return Scn.valueOf(string);
                }
            }
            return Scn.NULL;
        });
    }

    private List<LogFile> getOrderedLogsFromScn(OracleConnectorConfig oracleConnectorConfig, Scn scn, OracleConnection oracleConnection) throws SQLException {
        return (List) LogMinerHelper.getLogFilesForOffsetScn(oracleConnection, scn, oracleConnectorConfig.getLogMiningArchiveLogRetention(), oracleConnectorConfig.isArchiveLogOnlyMode(), oracleConnectorConfig.getLogMiningArchiveDestinationName()).stream().sorted(Comparator.comparing((v0) -> {
            return v0.getSequence();
        })).collect(Collectors.toList());
    }

    private void getPendingTransactionsFromLogs(OracleConnection oracleConnection, Scn scn, Map<String, Scn> map) throws SQLException {
        List<LogFile> orderedLogsFromScn = getOrderedLogsFromScn(this.connectorConfig, getOldestScnAvailableInLogs(this.connectorConfig, oracleConnection), oracleConnection);
        if (orderedLogsFromScn.isEmpty()) {
            return;
        }
        try {
            try {
                addLogsToSession(getMostRecentLogFilesForSearch(orderedLogsFromScn), oracleConnection);
                startSession(oracleConnection);
                LOGGER.info("\tQuerying transaction logs, please wait...");
                oracleConnection.query("SELECT START_SCN, XID FROM V$LOGMNR_CONTENTS WHERE OPERATION_CODE=7 AND SCN >= " + scn + " AND START_SCN < " + scn, resultSet -> {
                    while (resultSet.next()) {
                        String convertToHexString = HexConverter.convertToHexString(resultSet.getBytes("XID"));
                        if (!Strings.isNullOrBlank(resultSet.getString("START_SCN"))) {
                            Scn valueOf = Scn.valueOf(resultSet.getString("START_SCN"));
                            if (!map.containsKey(convertToHexString)) {
                                LOGGER.info("\tTransaction '{}' started at SCN '{}'", convertToHexString, valueOf);
                                map.put(convertToHexString, valueOf);
                            }
                        }
                    }
                });
                stopSession(oracleConnection);
            } catch (Exception e) {
                throw new DebeziumException("Failed to resolve snapshot offset", e);
            }
        } catch (Throwable th) {
            stopSession(oracleConnection);
            throw th;
        }
    }

    private List<LogFile> getMostRecentLogFilesForSearch(List<LogFile> list) {
        HashMap hashMap = new HashMap();
        for (LogFile logFile : list) {
            if (!hashMap.containsKey(Integer.valueOf(logFile.getThread())) && logFile.isCurrent()) {
                hashMap.put(Integer.valueOf(logFile.getThread()), new ArrayList());
                ((List) hashMap.get(Integer.valueOf(logFile.getThread()))).add(logFile);
                list.stream().filter(logFile2 -> {
                    return logFile.getThread() == logFile2.getThread() && logFile.getSequence().compareTo(logFile2.getSequence()) > 0;
                }).max(Comparator.comparing((v0) -> {
                    return v0.getSequence();
                })).ifPresent(logFile3 -> {
                    ((List) hashMap.get(Integer.valueOf(logFile.getThread()))).add(logFile3);
                });
            }
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = hashMap.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.addAll((Collection) ((Map.Entry) it.next()).getValue());
        }
        return arrayList;
    }

    private boolean isPendingTransactionSkip(OracleConnectorConfig oracleConnectorConfig) {
        return oracleConnectorConfig.getLogMiningTransactionSnapshotBoundaryMode() == OracleConnectorConfig.TransactionSnapshotBoundaryMode.SKIP;
    }

    public boolean isPendingTransactionViewOnly(OracleConnectorConfig oracleConnectorConfig) {
        return oracleConnectorConfig.getLogMiningTransactionSnapshotBoundaryMode() == OracleConnectorConfig.TransactionSnapshotBoundaryMode.TRANSACTION_VIEW_ONLY;
    }

    private static String getTransactionTableName(OracleConnectorConfig oracleConnectorConfig) {
        return (oracleConnectorConfig.getRacNodes() == null || oracleConnectorConfig.getRacNodes().isEmpty()) ? "V$TRANSACTION" : "GV$TRANSACTION";
    }
}
