package io.debezium.connector.mongodb;

import com.mongodb.CursorType;
import com.mongodb.ServerAddress;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import io.debezium.DebeziumException;
import io.debezium.connector.mongodb.ConnectionContext;
import io.debezium.data.Envelope;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Threads;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.class */
public class MongoDbStreamingChangeEventSource implements StreamingChangeEventSource<MongoDbPartition, MongoDbOffsetContext> {
    private static final Logger LOGGER;
    private static final String AUTHORIZATION_FAILURE_MESSAGE = "Command failed with error 13";
    private static final String OPERATION_FIELD = "op";
    private static final String OBJECT_FIELD = "o";
    private static final String OPERATION_CONTROL = "c";
    private static final String TX_OPS = "applyOps";
    private final MongoDbConnectorConfig connectorConfig;
    private final EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final ConnectionContext connectionContext;
    private final ReplicaSets replicaSets;
    private final MongoDbTaskContext taskContext;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource$ReplicaSetOplogContext.class */
    public class ReplicaSetOplogContext {
        private final ReplicaSetPartition partition;
        private final ReplicaSetOffsetContext offset;
        private final ConnectionContext.MongoPrimary primary;
        private final ReplicaSet replicaSet;
        private BsonTimestamp incompleteEventTimestamp;
        private long incompleteTxOrder = 0;

        ReplicaSetOplogContext(ReplicaSetPartition replicaSetPartition, ReplicaSetOffsetContext replicaSetOffsetContext, ConnectionContext.MongoPrimary mongoPrimary, ReplicaSet replicaSet) {
            this.partition = replicaSetPartition;
            this.offset = replicaSetOffsetContext;
            this.primary = mongoPrimary;
            this.replicaSet = replicaSet;
        }

        ReplicaSetPartition getPartition() {
            return this.partition;
        }

        ReplicaSetOffsetContext getOffset() {
            return this.offset;
        }

        ConnectionContext.MongoPrimary getPrimary() {
            return this.primary;
        }

        String getReplicaSetName() {
            return this.replicaSet.replicaSetName();
        }

        BsonTimestamp getIncompleteEventTimestamp() {
            return this.incompleteEventTimestamp;
        }

        public void setIncompleteEventTimestamp(BsonTimestamp bsonTimestamp) {
            this.incompleteEventTimestamp = bsonTimestamp;
        }

        public long getIncompleteTxOrder() {
            return this.incompleteTxOrder;
        }

        public void setIncompleteTxOrder(long j) {
            this.incompleteTxOrder = j;
        }
    }

    public MongoDbStreamingChangeEventSource(MongoDbConnectorConfig mongoDbConnectorConfig, MongoDbTaskContext mongoDbTaskContext, ReplicaSets replicaSets, EventDispatcher<MongoDbPartition, CollectionId> eventDispatcher, ErrorHandler errorHandler, Clock clock) {
        this.connectorConfig = mongoDbConnectorConfig;
        this.connectionContext = mongoDbTaskContext.getConnectionContext();
        this.dispatcher = eventDispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.replicaSets = replicaSets;
        this.taskContext = mongoDbTaskContext;
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbPartition mongoDbPartition, MongoDbOffsetContext mongoDbOffsetContext) throws InterruptedException {
        List<ReplicaSet> validReplicaSets = this.replicaSets.validReplicaSets();
        if (mongoDbOffsetContext == null) {
            mongoDbOffsetContext = initializeOffsets(this.connectorConfig, mongoDbPartition, this.replicaSets);
        }
        try {
            if (validReplicaSets.size() == 1) {
                streamChangesForReplicaSet(changeEventSourceContext, mongoDbPartition, validReplicaSets.get(0), mongoDbOffsetContext);
            } else if (validReplicaSets.size() > 1) {
                streamChangesForReplicaSets(changeEventSourceContext, mongoDbPartition, validReplicaSets, mongoDbOffsetContext);
            }
        } finally {
            this.taskContext.getConnectionContext().shutdown();
        }
    }

    private void streamChangesForReplicaSet(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbPartition mongoDbPartition, ReplicaSet replicaSet, MongoDbOffsetContext mongoDbOffsetContext) {
        ConnectionContext.MongoPrimary mongoPrimary = null;
        try {
            try {
                mongoPrimary = establishConnectionToPrimary(mongoDbPartition, replicaSet);
                if (mongoPrimary != null) {
                    AtomicReference atomicReference = new AtomicReference(mongoPrimary);
                    mongoPrimary.execute("read from oplog on '" + replicaSet + "'", mongoClient -> {
                        if (this.taskContext.getCaptureMode().isChangeStreams()) {
                            readChangeStream(mongoClient, (ConnectionContext.MongoPrimary) atomicReference.get(), replicaSet, changeEventSourceContext, mongoDbOffsetContext);
                        } else {
                            readOplog(mongoClient, (ConnectionContext.MongoPrimary) atomicReference.get(), replicaSet, changeEventSourceContext, mongoDbOffsetContext);
                        }
                    });
                }
                if (mongoPrimary != null) {
                    mongoPrimary.stop();
                }
            } catch (Throwable th) {
                LOGGER.error("Streaming for replica set {} failed", replicaSet.replicaSetName(), th);
                this.errorHandler.setProducerThrowable(th);
                if (mongoPrimary != null) {
                    mongoPrimary.stop();
                }
            }
        } catch (Throwable th2) {
            if (mongoPrimary != null) {
                mongoPrimary.stop();
            }
            throw th2;
        }
    }

    private void streamChangesForReplicaSets(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbPartition mongoDbPartition, List<ReplicaSet> list, MongoDbOffsetContext mongoDbOffsetContext) {
        int size = list.size();
        ExecutorService newFixedThreadPool = Threads.newFixedThreadPool(MongoDbConnector.class, this.taskContext.serverName(), "replicator-streaming", size);
        CountDownLatch countDownLatch = new CountDownLatch(size);
        LOGGER.info("Starting {} thread(s) to stream changes for replica sets: {}", Integer.valueOf(size), list);
        list.forEach(replicaSet -> {
            newFixedThreadPool.submit(() -> {
                try {
                    streamChangesForReplicaSet(changeEventSourceContext, mongoDbPartition, replicaSet, mongoDbOffsetContext);
                    countDownLatch.countDown();
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            });
        });
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        newFixedThreadPool.shutdown();
    }

    private ConnectionContext.MongoPrimary establishConnectionToPrimary(MongoDbPartition mongoDbPartition, ReplicaSet replicaSet) {
        return this.connectionContext.primaryFor(replicaSet, this.taskContext.filters(), (str, th) -> {
            if (th.getMessage() != null && th.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
                throw new ConnectException("Error while attempting to " + str, th);
            }
            this.dispatcher.dispatchConnectorEvent(mongoDbPartition, new DisconnectEvent());
            LOGGER.error("Error while attempting to {}: {}", new Object[]{str, th.getMessage(), th});
            throw new ConnectException("Error while attempting to " + str, th);
        });
    }

    private void readOplog(MongoClient mongoClient, ConnectionContext.MongoPrimary mongoPrimary, ReplicaSet replicaSet, ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbOffsetContext mongoDbOffsetContext) {
        Bson and;
        ReplicaSetPartition replicaSetPartition = mongoDbOffsetContext.getReplicaSetPartition(replicaSet);
        ReplicaSetOffsetContext replicaSetOffsetContext = mongoDbOffsetContext.getReplicaSetOffsetContext(replicaSet);
        BsonTimestamp lastOffsetTimestamp = replicaSetOffsetContext.lastOffsetTimestamp();
        OptionalLong lastOffsetTxOrder = replicaSetOffsetContext.lastOffsetTxOrder();
        ServerAddress primaryAddress = MongoUtil.getPrimaryAddress(mongoClient);
        LOGGER.info("Reading oplog for '{}' primary {} starting at {}", new Object[]{replicaSet, primaryAddress, lastOffsetTimestamp});
        MongoCollection<Document> collection = mongoClient.getDatabase("local").getCollection("oplog.rs");
        if (!isStartPositionInOplog(lastOffsetTimestamp, collection)) {
            throw new DebeziumException("Failed to find starting position '" + lastOffsetTimestamp + "' in oplog");
        }
        ReplicaSetOplogContext replicaSetOplogContext = new ReplicaSetOplogContext(replicaSetPartition, replicaSetOffsetContext, mongoPrimary, replicaSet);
        if (lastOffsetTxOrder.isPresent()) {
            LOGGER.info("The last event processed was transactional, resuming at the oplog event '{}', expecting to skip '{}' events", lastOffsetTimestamp, Long.valueOf(lastOffsetTxOrder.getAsLong()));
            and = com.mongodb.client.model.Filters.and(new Bson[]{com.mongodb.client.model.Filters.gte("ts", lastOffsetTimestamp), com.mongodb.client.model.Filters.exists("fromMigrate", false)});
            replicaSetOplogContext.setIncompleteEventTimestamp(lastOffsetTimestamp);
            replicaSetOplogContext.setIncompleteTxOrder(lastOffsetTxOrder.getAsLong());
        } else {
            LOGGER.info("The last event processed was not transactional, resuming at the oplog event after '{}'", lastOffsetTimestamp);
            and = com.mongodb.client.model.Filters.and(new Bson[]{com.mongodb.client.model.Filters.gt("ts", lastOffsetTimestamp), com.mongodb.client.model.Filters.exists("fromMigrate", false)});
        }
        Bson oplogSkippedOperationsFilter = getOplogSkippedOperationsFilter();
        if (oplogSkippedOperationsFilter != null) {
            and = com.mongodb.client.model.Filters.and(new Bson[]{and, oplogSkippedOperationsFilter});
        }
        FindIterable noCursorTimeout = collection.find(and).sort(new Document("$natural", 1)).oplogReplay(true).cursorType(CursorType.TailableAwait).noCursorTimeout(true);
        if (this.connectorConfig.getCursorMaxAwaitTime() > 0) {
            noCursorTimeout = noCursorTimeout.maxAwaitTime(this.connectorConfig.getCursorMaxAwaitTime(), TimeUnit.MILLISECONDS);
        }
        MongoCursor it = noCursorTimeout.iterator();
        try {
            Metronome sleeper = Metronome.sleeper(Duration.ofMillis(500L), this.clock);
            while (changeEventSourceContext.isRunning()) {
                Document document = (Document) it.tryNext();
                if (document == null) {
                    try {
                        sleeper.pause();
                    } catch (InterruptedException e) {
                    }
                } else if (!handleOplogEvent(primaryAddress, document, document, 0L, replicaSetOplogContext)) {
                    if (it != null) {
                        it.close();
                        return;
                    }
                    return;
                } else {
                    try {
                        this.dispatcher.dispatchHeartbeatEvent(replicaSetOplogContext.getPartition(), replicaSetOplogContext.getOffset());
                    } catch (InterruptedException e2) {
                        LOGGER.info("Replicator thread is interrupted");
                        Thread.currentThread().interrupt();
                        if (it != null) {
                            it.close();
                            return;
                        }
                        return;
                    }
                }
            }
            if (it != null) {
                it.close();
            }
        } catch (Throwable th) {
            if (it != null) {
                try {
                    it.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private List<String> getChangeStreamSkippedOperationsFilter() {
        EnumSet skippedOperations = this.taskContext.getConnectorConfig().getSkippedOperations();
        ArrayList arrayList = new ArrayList();
        if (!skippedOperations.contains(Envelope.Operation.CREATE)) {
            arrayList.add("insert");
        }
        if (!skippedOperations.contains(Envelope.Operation.UPDATE)) {
            arrayList.add("update");
            arrayList.add("replace");
        }
        if (!skippedOperations.contains(Envelope.Operation.DELETE)) {
            arrayList.add("delete");
        }
        return arrayList;
    }

    private void readChangeStream(MongoClient mongoClient, ConnectionContext.MongoPrimary mongoPrimary, ReplicaSet replicaSet, ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbOffsetContext mongoDbOffsetContext) {
        ReplicaSetPartition replicaSetPartition = mongoDbOffsetContext.getReplicaSetPartition(replicaSet);
        ReplicaSetOffsetContext replicaSetOffsetContext = mongoDbOffsetContext.getReplicaSetOffsetContext(replicaSet);
        BsonTimestamp lastOffsetTimestamp = replicaSetOffsetContext.lastOffsetTimestamp();
        OptionalLong lastOffsetTxOrder = replicaSetOffsetContext.lastOffsetTxOrder();
        ReplicaSetOplogContext replicaSetOplogContext = new ReplicaSetOplogContext(replicaSetPartition, replicaSetOffsetContext, mongoPrimary, replicaSet);
        LOGGER.info("Reading change stream for '{}' primary {} starting at {}", new Object[]{replicaSet, MongoUtil.getPrimaryAddress(mongoClient), lastOffsetTimestamp});
        Bson in = com.mongodb.client.model.Filters.in("operationType", getChangeStreamSkippedOperationsFilter());
        if (replicaSetOffsetContext.lastResumeToken() == null) {
            in = com.mongodb.client.model.Filters.and(new Bson[]{in, com.mongodb.client.model.Filters.ne("clusterTime", lastOffsetTimestamp)});
        }
        ChangeStreamIterable watch = mongoClient.watch(Arrays.asList(Aggregates.match(in)));
        if (this.taskContext.getCaptureMode().isFullUpdate()) {
            watch.fullDocument(FullDocument.UPDATE_LOOKUP);
        }
        if (replicaSetOffsetContext.lastResumeToken() != null) {
            LOGGER.info("Resuming streaming from token '{}'", replicaSetOffsetContext.lastResumeToken());
            BsonDocument bsonDocument = new BsonDocument();
            bsonDocument.put("_data", new BsonString(replicaSetOffsetContext.lastResumeToken()));
            watch.resumeAfter(bsonDocument);
        } else if (lastOffsetTimestamp.getTime() > 0) {
            LOGGER.info("Resume token not available, starting streaming from time '{}'", lastOffsetTimestamp);
            watch.startAtOperationTime(lastOffsetTimestamp);
        }
        if (this.connectorConfig.getCursorMaxAwaitTime() > 0) {
            watch.maxAwaitTime(this.connectorConfig.getCursorMaxAwaitTime(), TimeUnit.MILLISECONDS);
        }
        MongoCursor it = watch.iterator();
        try {
            Metronome sleeper = Metronome.sleeper(Duration.ofMillis(500L), this.clock);
            while (changeEventSourceContext.isRunning()) {
                ChangeStreamDocument<Document> changeStreamDocument = (ChangeStreamDocument) it.tryNext();
                if (changeStreamDocument != null) {
                    LOGGER.trace("Arrived Change Stream event: {}", changeStreamDocument);
                    if (this.taskContext.filters().databaseFilter().test(changeStreamDocument.getDatabaseName())) {
                        replicaSetOplogContext.getOffset().changeStreamEvent(changeStreamDocument, lastOffsetTxOrder);
                        replicaSetOplogContext.getOffset().getOffset();
                        CollectionId collectionId = new CollectionId(replicaSet.replicaSetName(), changeStreamDocument.getNamespace().getDatabaseName(), changeStreamDocument.getNamespace().getCollectionName());
                        if (this.taskContext.filters().collectionFilter().test(collectionId)) {
                            try {
                                this.dispatcher.dispatchDataChangeEvent(replicaSetOplogContext.getPartition(), collectionId, new MongoDbChangeStreamChangeRecordEmitter(replicaSetOplogContext.getPartition(), replicaSetOplogContext.getOffset(), this.clock, changeStreamDocument));
                            } catch (Exception e) {
                                this.errorHandler.setProducerThrowable(e);
                                if (it != null) {
                                    it.close();
                                    return;
                                }
                                return;
                            }
                        }
                    } else {
                        LOGGER.debug("Skipping the event for database '{}' based on database include/exclude list", changeStreamDocument.getDatabaseName());
                    }
                    try {
                        this.dispatcher.dispatchHeartbeatEvent(replicaSetOplogContext.getPartition(), replicaSetOplogContext.getOffset());
                    } catch (InterruptedException e2) {
                        LOGGER.info("Replicator thread is interrupted");
                        Thread.currentThread().interrupt();
                        if (it != null) {
                            it.close();
                            return;
                        }
                        return;
                    }
                } else {
                    try {
                        sleeper.pause();
                    } catch (InterruptedException e3) {
                    }
                }
            }
            if (it != null) {
                it.close();
            }
        } catch (Throwable th) {
            if (it != null) {
                try {
                    it.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private boolean isStartPositionInOplog(BsonTimestamp bsonTimestamp, MongoCollection<Document> mongoCollection) {
        BsonTimestamp bsonTimestamp2;
        MongoCursor it = mongoCollection.find().iterator();
        return it.hasNext() && (bsonTimestamp2 = (BsonTimestamp) ((Document) it.next()).get("ts", BsonTimestamp.class)) != null && bsonTimestamp2.compareTo(bsonTimestamp) <= 0;
    }

    private Bson getOplogSkippedOperationsFilter() {
        EnumSet skippedOperations = this.taskContext.getConnectorConfig().getSkippedOperations();
        if (skippedOperations.isEmpty()) {
            return null;
        }
        Bson bson = null;
        Iterator it = skippedOperations.iterator();
        while (it.hasNext()) {
            Bson ne = com.mongodb.client.model.Filters.ne(OPERATION_FIELD, ((Envelope.Operation) it.next()).code());
            bson = bson == null ? ne : com.mongodb.client.model.Filters.or(new Bson[]{bson, ne});
        }
        return bson;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean handleOplogEvent(ServerAddress serverAddress, Document document, Document document2, long j, ReplicaSetOplogContext replicaSetOplogContext) {
        String string = document.getString(SourceInfo.NAMESPACE);
        Document document3 = (Document) document.get(OBJECT_FIELD, Document.class);
        if (Objects.isNull(document3)) {
            if (!LOGGER.isWarnEnabled()) {
                return true;
            }
            LOGGER.warn("Missing 'o' field in event, so skipping {}", document.toJson());
            return true;
        }
        if (Objects.isNull(string) || string.isEmpty()) {
            if ("new primary".equals(document3.getString("msg"))) {
                AtomicReference atomicReference = new AtomicReference();
                try {
                    replicaSetOplogContext.getPrimary().executeBlocking("conn", mongoClient -> {
                        atomicReference.set(MongoUtil.getPrimaryAddress(mongoClient));
                    });
                } catch (InterruptedException e) {
                    LOGGER.error("Get current primary executeBlocking", e);
                }
                ServerAddress serverAddress2 = (ServerAddress) atomicReference.get();
                if (!Objects.nonNull(serverAddress2) || serverAddress2.equals(serverAddress)) {
                    LOGGER.info("Found new primary event in oplog, current {} is new primary. Continue to process oplog event.", serverAddress);
                } else {
                    LOGGER.info("Found new primary event in oplog, so stopping use of {} to continue with new primary {}", serverAddress, serverAddress2);
                }
                this.dispatcher.dispatchConnectorEvent(replicaSetOplogContext.getPartition(), new PrimaryElectionEvent(serverAddress2));
            }
            if (!LOGGER.isDebugEnabled()) {
                return true;
            }
            LOGGER.debug("Skipping event with no namespace: {}", document.toJson());
            return true;
        }
        List<Document> transactionChanges = transactionChanges(document);
        if (!transactionChanges.isEmpty()) {
            if (Objects.nonNull(replicaSetOplogContext.getIncompleteEventTimestamp())) {
                if (replicaSetOplogContext.getIncompleteEventTimestamp().equals(SourceInfo.extractEventTimestamp(document))) {
                    for (Document document4 : transactionChanges) {
                        j++;
                        if (j <= replicaSetOplogContext.getIncompleteTxOrder()) {
                            LOGGER.debug("Skipping record as it is expected to be already processed: {}", document4);
                        } else if (!handleOplogEvent(serverAddress, document4, document, j, replicaSetOplogContext)) {
                            return false;
                        }
                    }
                }
                replicaSetOplogContext.setIncompleteEventTimestamp(null);
                return true;
            }
            try {
                this.dispatcher.dispatchTransactionStartedEvent(replicaSetOplogContext.getPartition(), getTransactionId(document), replicaSetOplogContext.getOffset());
                Iterator<Document> it = transactionChanges.iterator();
                while (it.hasNext()) {
                    long j2 = j + 1;
                    j = document;
                    if (!handleOplogEvent(serverAddress, it.next(), document, j2, replicaSetOplogContext)) {
                        return false;
                    }
                }
                this.dispatcher.dispatchTransactionCommittedEvent(replicaSetOplogContext.getPartition(), replicaSetOplogContext.getOffset());
                return true;
            } catch (InterruptedException e2) {
                LOGGER.error("Streaming transaction changes for replica set '{}' was interrupted", replicaSetOplogContext.getReplicaSetName());
                throw new ConnectException("Streaming of transaction changes was interrupted for replica set " + replicaSetOplogContext.getReplicaSetName(), e2);
            }
        }
        String string2 = document.getString(OPERATION_FIELD);
        if (!MongoDbChangeSnapshotOplogRecordEmitter.isValidOperation(string2)) {
            LOGGER.debug("Skipping event with \"op={}\"", string2);
            return true;
        }
        int indexOf = string.indexOf(46);
        if (indexOf <= 0) {
            return true;
        }
        if (!$assertionsDisabled && indexOf + 1 >= string.length()) {
            throw new AssertionError();
        }
        String substring = string.substring(0, indexOf);
        String substring2 = string.substring(indexOf + 1);
        if ("$cmd".equals(substring2)) {
            LOGGER.debug("Skipping database command event: {}", document.toJson());
            return true;
        }
        if (!this.taskContext.filters().databaseFilter().test(substring)) {
            LOGGER.debug("Skipping the event for database '{}' based on database include/exclude list", substring);
            return true;
        }
        replicaSetOplogContext.getOffset().oplogEvent(document, document2, Long.valueOf(j));
        replicaSetOplogContext.getOffset().getOffset();
        CollectionId collectionId = new CollectionId(replicaSetOplogContext.getReplicaSetName(), substring, substring2);
        if (!this.taskContext.filters().collectionFilter().test(collectionId)) {
            return true;
        }
        try {
            return this.dispatcher.dispatchDataChangeEvent(replicaSetOplogContext.getPartition(), collectionId, new MongoDbChangeSnapshotOplogRecordEmitter(replicaSetOplogContext.getPartition(), replicaSetOplogContext.getOffset(), this.clock, document, false));
        } catch (Exception e3) {
            this.errorHandler.setProducerThrowable(e3);
            return false;
        }
    }

    private List<Document> transactionChanges(Document document) {
        String string = document.getString(OPERATION_FIELD);
        Document document2 = (Document) document.get(OBJECT_FIELD, Document.class);
        return (OPERATION_CONTROL.equals(string) && Objects.nonNull(document2) && document2.containsKey(TX_OPS)) ? (List) document2.get(TX_OPS, List.class) : Collections.emptyList();
    }

    protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig mongoDbConnectorConfig, MongoDbPartition mongoDbPartition, ReplicaSets replicaSets) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        replicaSets.onEachReplicaSet(replicaSet -> {
            LOGGER.info("Determine Snapshot Offset for replica-set {}", replicaSet.replicaSetName());
            ConnectionContext.MongoPrimary establishConnectionToPrimary = establishConnectionToPrimary(mongoDbPartition, replicaSet);
            if (establishConnectionToPrimary != null) {
                try {
                    establishConnectionToPrimary.execute("get oplog position", mongoClient -> {
                        linkedHashMap.put(replicaSet, MongoUtil.getOplogEntry(mongoClient, -1, LOGGER));
                    });
                    LOGGER.info("Stopping primary client");
                    establishConnectionToPrimary.stop();
                } catch (Throwable th) {
                    LOGGER.info("Stopping primary client");
                    establishConnectionToPrimary.stop();
                    throw th;
                }
            }
        });
        return new MongoDbOffsetContext(new SourceInfo(mongoDbConnectorConfig), new TransactionContext(), new MongoDbIncrementalSnapshotContext(false), linkedHashMap);
    }

    private static String getTransactionId(Document document) {
        Long l = document.getLong(SourceInfo.OPERATION_ID);
        return (l == null || l.longValue() == 0) ? MongoUtil.getOplogSessionTransactionId(document) : Long.toString(l.longValue());
    }

    static {
        $assertionsDisabled = !MongoDbStreamingChangeEventSource.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(MongoDbStreamingChangeEventSource.class);
    }
}
