/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.mongodb;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.mongodb.impl.MongoUtilities;
import com.hazelcast.jet.mongodb.impl.ReadMongoP;
import com.hazelcast.jet.mongodb.impl.ReadMongoParams;
import com.hazelcast.jet.sql.impl.connector.mongodb.ConversionsFromBson;
import com.hazelcast.jet.sql.impl.connector.mongodb.MongoTable;
import com.hazelcast.jet.sql.impl.connector.mongodb.PlaceholderReplacer;
import com.hazelcast.jet.sql.impl.connector.mongodb.ProjectionData;
import com.hazelcast.shaded.com.google.common.base.Preconditions;
import com.hazelcast.sql.impl.expression.ExpressionEvalContext;
import com.hazelcast.sql.impl.row.JetSqlRow;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import java.io.Serializable;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.bson.BsonDateTime;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;

public class SelectProcessorSupplier
implements ProcessorSupplier {
    private transient SupplierEx<? extends MongoClient> clientSupplier;
    private final String databaseName;
    private final String collectionName;
    private final boolean stream;
    private final FunctionEx<ExpressionEvalContext, EventTimePolicy<JetSqlRow>> eventTimePolicyProvider;
    private final Document predicate;
    private final List<ProjectionData> projection;
    private final String[] externalNames;
    private final Long startAt;
    private final String connectionString;
    private final String dataConnectionName;
    private transient ExpressionEvalContext evalContext;
    private final boolean forceMongoParallelismOne;

    SelectProcessorSupplier(MongoTable table, Document predicate, List<ProjectionData> projection, BsonTimestamp startAt, boolean stream, FunctionEx<ExpressionEvalContext, EventTimePolicy<JetSqlRow>> eventTimePolicyProvider) {
        Preconditions.checkArgument(projection != null && !projection.isEmpty(), "projection cannot be empty");
        this.predicate = predicate;
        this.projection = projection;
        this.connectionString = table.connectionString;
        this.dataConnectionName = table.dataConnectionName;
        this.databaseName = table.databaseName;
        this.collectionName = table.collectionName;
        this.startAt = startAt == null ? null : Long.valueOf(startAt.getValue());
        this.stream = stream;
        this.eventTimePolicyProvider = eventTimePolicyProvider;
        this.forceMongoParallelismOne = table.isForceMongoParallelismOne();
        this.externalNames = table.externalNames();
    }

    SelectProcessorSupplier(MongoTable table, Document predicate, List<ProjectionData> projection, BsonTimestamp startAt, FunctionEx<ExpressionEvalContext, EventTimePolicy<JetSqlRow>> eventTimePolicyProvider) {
        this(table, predicate, projection, startAt, true, eventTimePolicyProvider);
    }

    SelectProcessorSupplier(MongoTable table, Document predicate, List<ProjectionData> projection) {
        this(table, predicate, projection, null, false, null);
    }

    public void init(@Nonnull ProcessorSupplier.Context context) {
        if (this.connectionString != null) {
            this.clientSupplier = (SupplierEx & Serializable)() -> MongoClients.create((String)this.connectionString);
        }
        this.evalContext = ExpressionEvalContext.from(context);
    }

    @Nonnull
    public Collection<? extends Processor> get(int count) {
        ArrayList<Bson> aggregates = new ArrayList<Bson>();
        if (this.predicate != null) {
            Document filterWithParams = PlaceholderReplacer.replacePlaceholders(this.predicate, this.evalContext, (Object[])null, this.externalNames, false);
            aggregates.add(Aggregates.match((Bson)filterWithParams.toBsonDocument()));
        }
        Bson proj = Projections.fields(this.projection.stream().map(p -> p.projectionExpr).collect(Collectors.toList()));
        List projectedNames = this.projection.stream().map(p -> p.externalName).collect(Collectors.toList());
        if (!projectedNames.contains("_id") && !this.stream) {
            aggregates.add(Aggregates.project((Bson)Projections.fields((Bson[])new Bson[]{Projections.excludeId(), proj})));
        } else {
            aggregates.add(Aggregates.project((Bson)proj));
        }
        ArrayList<ReadMongoP> processors = new ArrayList<ReadMongoP>();
        EventTimePolicy eventTimePolicy = this.eventTimePolicyProvider == null ? EventTimePolicy.noEventTime() : (EventTimePolicy)this.eventTimePolicyProvider.apply((Object)this.evalContext);
        SupplierEx<? extends MongoClient> clientSupplierEx = this.clientSupplier;
        for (int i = 0; i < count; ++i) {
            ReadMongoP processor = new ReadMongoP(new ReadMongoParams(this.stream).setClientSupplier(clientSupplierEx).setDataConnectionRef(this.dataConnectionName).setAggregates(aggregates).setDatabaseName(this.databaseName).setCollectionName(this.collectionName).setMapItemFn(this::convertDocToRow).setMapStreamFn(this::convertStreamDocToRow).setStartAtTimestamp(this.startAt == null ? null : new BsonTimestamp(this.startAt.longValue())).setEventTimePolicy(eventTimePolicy).setNonDistributed(this.forceMongoParallelismOne));
            processors.add(processor);
        }
        return processors;
    }

    private JetSqlRow convertDocToRow(Document doc) {
        Object[] row = new Object[this.projection.size()];
        for (ProjectionData entry : this.projection) {
            Object fromDoc = doc.get((Object)entry.externalName);
            int index = entry.index;
            if (entry.type != null) {
                row[index] = ConversionsFromBson.convertFromBson(fromDoc, entry.type);
                continue;
            }
            row[index] = fromDoc;
        }
        return new JetSqlRow((SerializationService)this.evalContext.getSerializationService(), row);
    }

    private JetSqlRow convertStreamDocToRow(ChangeStreamDocument<Document> changeStreamDocument, Long ts) {
        Document doc = (Document)changeStreamDocument.getFullDocument();
        Objects.requireNonNull(doc, "Document is empty");
        Object[] row = new Object[this.projection.size()];
        for (ProjectionData entry : this.projection) {
            Object fromDoc = doc.get((Object)entry.externalName.replaceFirst("fullDocument.", ""));
            int index = entry.index;
            if (entry.type != null) {
                row[index] = ConversionsFromBson.convertFromBson(fromDoc, entry.type);
                continue;
            }
            row[index] = fromDoc;
        }
        this.addIfInProjection(changeStreamDocument.getOperationType().getValue(), "operationType", row);
        this.addIfInProjection(changeStreamDocument.getResumeToken().toString(), "resumeToken", row);
        this.addIfInProjection(LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault()), "ts", row);
        this.addIfInProjection(MongoUtilities.bsonDateTimeToLocalDateTime((BsonDateTime)changeStreamDocument.getWallTime()), "wallTime", row);
        this.addIfInProjection(MongoUtilities.bsonTimestampToLocalDateTime((BsonTimestamp)changeStreamDocument.getClusterTime()), "clusterTime", row);
        return new JetSqlRow((SerializationService)this.evalContext.getSerializationService(), row);
    }

    private void addIfInProjection(Object value, String field, Object[] row) {
        int index = this.indexInProjection(field);
        if (index == -1) {
            return;
        }
        row[index] = value;
    }

    private int indexInProjection(String columnName) {
        return this.projection.stream().filter(p -> p.externalName.equals(columnName)).map(p -> p.index).findAny().orElse(-1);
    }
}

