/*
 * Decompiled with CFR 0.152.
 */
package at.grahsl.kafka.connect.mongodb.writemodel.strategy;

import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
import at.grahsl.kafka.connect.mongodb.writemodel.strategy.WriteModelStrategy;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.conversions.Bson;

public class MonotonicWritesDefaultStrategy
implements WriteModelStrategy {
    public static final String FIELD_KAFKA_COORDS = "_kafkaCoords";
    public static final String FIELD_TOPIC = "_topic";
    public static final String FIELD_PARTITION = "_partition";
    public static final String FIELD_OFFSET = "_offset";
    private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true);

    @Override
    public WriteModel<BsonDocument> createWriteModel(SinkDocument document) {
        throw new DataException("error: the write model strategy " + MonotonicWritesDefaultStrategy.class.getName() + " needs the SinkRecord's data and thus cannot work on the SinkDocument param alone. please use the provided method overloading for this.");
    }

    @Override
    public WriteModel<BsonDocument> createWriteModel(SinkDocument document, SinkRecord record) {
        BsonDocument vd = document.getValueDoc().orElseThrow(() -> new DataException("error: cannot build the WriteModel since the value document was missing unexpectedly"));
        vd.append(FIELD_KAFKA_COORDS, (BsonValue)new BsonDocument(FIELD_TOPIC, (BsonValue)new BsonString(record.topic())).append(FIELD_PARTITION, (BsonValue)new BsonInt32(record.kafkaPartition().intValue())).append(FIELD_OFFSET, (BsonValue)new BsonInt64(record.kafkaOffset())));
        ArrayList<BsonDocument> conditionalUpdatePipeline = new ArrayList<BsonDocument>();
        conditionalUpdatePipeline.add(new BsonDocument("$replaceRoot", (BsonValue)new BsonDocument("newRoot", (BsonValue)new BsonDocument("$cond", (BsonValue)new BsonDocument("if", (BsonValue)new BsonDocument("$and", (BsonValue)new BsonArray(Arrays.asList(new BsonDocument("$eq", (BsonValue)new BsonArray(Arrays.asList(new BsonString("$$ROOT._kafkaCoords._topic"), new BsonString(record.topic())))), new BsonDocument("$eq", (BsonValue)new BsonArray(Arrays.asList(new BsonString("$$ROOT._kafkaCoords._partition"), new BsonInt32(record.kafkaPartition().intValue())))), new BsonDocument("$gte", (BsonValue)new BsonArray(Arrays.asList(new BsonString("$$ROOT._kafkaCoords._offset"), new BsonInt64(record.kafkaOffset())))))))).append("then", (BsonValue)new BsonString("$$ROOT")).append("else", (BsonValue)vd)))));
        return new UpdateOneModel((Bson)new BsonDocument("_id", vd.get((Object)"_id")), conditionalUpdatePipeline, UPDATE_OPTIONS);
    }
}

