/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.mongodb;

import com.mongodb.BasicDBObject;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.util.JSON;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mongodb.AbstractMongoProcessor;
import org.apache.nifi.stream.io.StreamUtils;
import org.bson.Document;
import org.bson.conversions.Bson;

@EventDriven
@Tags(value={"mongodb", "insert", "update", "write", "put"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Writes the contents of a FlowFile to MongoDB")
public class PutMongo
extends AbstractMongoProcessor {
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are written to MongoDB are routed to this relationship").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build();
    static final String MODE_INSERT = "insert";
    static final String MODE_UPDATE = "update";
    static final AllowableValue UPDATE_WITH_DOC = new AllowableValue("doc", "With whole document");
    static final AllowableValue UPDATE_WITH_OPERATORS = new AllowableValue("operators", "With operators enabled");
    static final PropertyDescriptor MODE = new PropertyDescriptor.Builder().name("Mode").description("Indicates whether the processor should insert or update content").required(true).allowableValues(new String[]{"insert", "update"}).defaultValue("insert").build();
    static final PropertyDescriptor UPSERT = new PropertyDescriptor.Builder().name("Upsert").description("When true, inserts a document if no document matches the update query criteria; this property is valid only when using update mode, otherwise it is ignored").required(true).allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).defaultValue("false").build();
    static final PropertyDescriptor UPDATE_QUERY_KEY = new PropertyDescriptor.Builder().name("Update Query Key").description("Key name used to build the update query criteria; this property is valid only when using update mode, otherwise it is ignored").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("_id").build();
    static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder().displayName("Update Mode").name("put-mongo-update-mode").required(true).allowableValues(new AllowableValue[]{UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS}).defaultValue(UPDATE_WITH_DOC.getValue()).description("Choose an update mode. You can either supply a JSON document to use as a direct replacement or specify a document that contains update operators like $set and $unset").build();
    static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder().name("Character Set").description("The Character Set in which the data is encoded").required(true).addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).defaultValue("UTF-8").build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        ComponentLog logger = this.getLogger();
        Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
        String mode = context.getProperty(MODE).getValue();
        String updateMode = context.getProperty(UPDATE_MODE).getValue();
        WriteConcern writeConcern = this.getWriteConcern(context);
        MongoCollection collection = this.getCollection(context, flowFile).withWriteConcern(writeConcern);
        try {
            Document doc;
            final byte[] content = new byte[(int)flowFile.getSize()];
            session.read(flowFile, new InputStreamCallback(){

                public void process(InputStream in) throws IOException {
                    StreamUtils.fillBuffer((InputStream)in, (byte[])content, (boolean)true);
                }
            });
            Object object = doc = mode.equals(MODE_INSERT) || mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue()) ? Document.parse((String)new String(content, charset)) : JSON.parse((String)new String(content, charset));
            if (MODE_INSERT.equalsIgnoreCase(mode)) {
                collection.insertOne((Object)doc);
                logger.info("inserted {} into MongoDB", new Object[]{flowFile});
            } else {
                boolean upsert = context.getProperty(UPSERT).asBoolean();
                String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue();
                Document query = new Document(updateKey, ((Map)doc).get(updateKey));
                if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
                    collection.replaceOne((Bson)query, (Object)doc, new UpdateOptions().upsert(upsert));
                } else {
                    BasicDBObject update = (BasicDBObject)doc;
                    update.remove((Object)"_id");
                    collection.updateOne((Bson)query, (Bson)update, new UpdateOptions().upsert(upsert));
                }
                logger.info("updated {} into MongoDB", new Object[]{flowFile});
            }
            session.getProvenanceReporter().send(flowFile, this.getURI(context));
            session.transfer(flowFile, REL_SUCCESS);
        }
        catch (Exception e) {
            logger.error("Failed to insert {} into MongoDB due to {}", new Object[]{flowFile, e}, (Throwable)e);
            session.transfer(flowFile, REL_FAILURE);
            context.yield();
        }
    }

    @Override
    protected WriteConcern getWriteConcern(ProcessContext context) {
        String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue();
        WriteConcern writeConcern = null;
        switch (writeConcernProperty) {
            case "ACKNOWLEDGED": {
                writeConcern = WriteConcern.ACKNOWLEDGED;
                break;
            }
            case "UNACKNOWLEDGED": {
                writeConcern = WriteConcern.UNACKNOWLEDGED;
                break;
            }
            case "FSYNCED": {
                writeConcern = WriteConcern.FSYNCED;
                break;
            }
            case "JOURNALED": {
                writeConcern = WriteConcern.JOURNALED;
                break;
            }
            case "REPLICA_ACKNOWLEDGED": {
                writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED;
                break;
            }
            case "MAJORITY": {
                writeConcern = WriteConcern.MAJORITY;
                break;
            }
            default: {
                writeConcern = WriteConcern.ACKNOWLEDGED;
            }
        }
        return writeConcern;
    }

    static {
        ArrayList<PropertyDescriptor> _propertyDescriptors = new ArrayList<PropertyDescriptor>();
        _propertyDescriptors.addAll(descriptors);
        _propertyDescriptors.add(MODE);
        _propertyDescriptors.add(UPSERT);
        _propertyDescriptors.add(UPDATE_QUERY_KEY);
        _propertyDescriptors.add(UPDATE_MODE);
        _propertyDescriptors.add(WRITE_CONCERN);
        _propertyDescriptors.add(CHARACTER_SET);
        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        _relationships.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(_relationships);
    }
}

