/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.ignite.cache;

import java.io.IOException;
import java.io.InputStream;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.ignite.cache.AbstractIgniteCacheProcessor;
import org.apache.nifi.processors.ignite.cache.GetIgniteCache;
import org.apache.nifi.stream.io.StreamUtils;

@EventDriven
@SupportsBatching
@Tags(value={"Ignite", "insert", "update", "stream", "write", "put", "cache", "key"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Stream the contents of a FlowFile to Ignite Cache using DataStreamer. The processor uses the value of FlowFile attribute (Ignite cache entry key) as the cache key and the byte array of the FlowFile as the value of the cache entry value.  Both the string key and a  non-empty byte array value are required otherwise the FlowFile is transferred to the failure relation. Note - The Ignite Kernel periodically outputs node performance statistics to the logs. This message  can be turned off by setting the log level for logger 'org.apache.ignite' to WARN in the logback.xml configuration file.")
@WritesAttributes(value={@WritesAttribute(attribute="ignite.cache.batch.flow.file.total.count", description="The total number of FlowFile in the batch"), @WritesAttribute(attribute="ignite.cache.batch.flow.file.item.number", description="The item number of FlowFile in the batch"), @WritesAttribute(attribute="ignite.cache.batch.flow.file.successful.number", description="The successful FlowFile item number"), @WritesAttribute(attribute="ignite.cache.batch.flow.file.successful.count", description="The number of successful FlowFiles"), @WritesAttribute(attribute="ignite.cache.batch.flow.file.failed.number", description="The failed FlowFile item number"), @WritesAttribute(attribute="ignite.cache.batch.flow.file.failed.count", description="The total number of failed FlowFiles in the batch"), @WritesAttribute(attribute="ignite.cache.batch.flow.file.failed.reason", description="The failed reason attribute key")})
@SeeAlso(value={GetIgniteCache.class})
@SystemResourceConsideration(resource=SystemResource.MEMORY)
public class PutIgniteCache
extends AbstractIgniteCacheProcessor {
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().displayName("Batch Size For Entries").name("batch-size-for-entries").description("Batch size for entries (1-500).").defaultValue("250").required(true).addValidator(StandardValidators.createLongValidator((long)1L, (long)500L, (boolean)true)).sensitive(false).build();
    public static final PropertyDescriptor DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS = new PropertyDescriptor.Builder().displayName("Data Streamer Per Node Parallel Operations").name("data-streamer-per-node-parallel-operations").description("Data streamer per node parallelism").defaultValue("5").required(true).addValidator(StandardValidators.createLongValidator((long)1L, (long)10L, (boolean)true)).sensitive(false).build();
    public static final PropertyDescriptor DATA_STREAMER_PER_NODE_BUFFER_SIZE = new PropertyDescriptor.Builder().displayName("Data Streamer Per Node Buffer Size").name("data-streamer-per-node-buffer-size").description("Data streamer per node buffer size (1-500).").defaultValue("250").required(true).addValidator(StandardValidators.createLongValidator((long)1L, (long)500L, (boolean)true)).sensitive(false).build();
    public static final PropertyDescriptor DATA_STREAMER_AUTO_FLUSH_FREQUENCY = new PropertyDescriptor.Builder().displayName("Data Streamer Auto Flush Frequency in millis").name("data-streamer-auto-flush-frequency-in-millis").description("Data streamer flush interval in millis seconds").defaultValue("10").required(true).addValidator(StandardValidators.createLongValidator((long)1L, (long)100L, (boolean)true)).sensitive(false).build();
    public static final PropertyDescriptor DATA_STREAMER_ALLOW_OVERRIDE = new PropertyDescriptor.Builder().displayName("Data Streamer Allow Override").name("data-streamer-allow-override").description("Whether to override values already in the cache").defaultValue("false").required(true).allowableValues(new AllowableValue[]{new AllowableValue("true"), new AllowableValue("false")}).sensitive(false).build();
    public static final String IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT = "ignite.cache.batch.flow.file.total.count";
    public static final String IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER = "ignite.cache.batch.flow.file.item.number";
    public static final String IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT = "ignite.cache.batch.flow.file.successful.count";
    public static final String IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER = "ignite.cache.batch.flow.file.successful.number";
    public static final String IGNITE_BATCH_FLOW_FILE_FAILED_COUNT = "ignite.cache.batch.flow.file.failed.count";
    public static final String IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER = "ignite.cache.batch.flow.file.failed.number";
    public static final String IGNITE_BATCH_FLOW_FILE_FAILED_FILE_SIZE = "ignite.cache.batch.flow.file.failed.size";
    public static final String IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY = "ignite.cache.batch.flow.file.failed.reason";
    public static final String IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE = "The FlowFile key attribute was missing";
    public static final String IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE = "The FlowFile size was zero";
    protected static final List<PropertyDescriptor> descriptors = Arrays.asList(IGNITE_CONFIGURATION_FILE, CACHE_NAME, BATCH_SIZE, IGNITE_CACHE_ENTRY_KEY, DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS, DATA_STREAMER_PER_NODE_BUFFER_SIZE, DATA_STREAMER_AUTO_FLUSH_FREQUENCY, DATA_STREAMER_ALLOW_OVERRIDE);
    private transient IgniteDataStreamer<String, byte[]> igniteDataStreamer;

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnStopped
    public final void closeIgniteDataStreamer() {
        if (this.igniteDataStreamer != null) {
            this.getLogger().info("Closing ignite data streamer");
            this.igniteDataStreamer.flush();
            this.igniteDataStreamer = null;
        }
    }

    @OnShutdown
    public final void closeIgniteDataStreamerAndCache() {
        this.closeIgniteDataStreamer();
        super.closeIgniteCache();
    }

    protected IgniteDataStreamer<String, byte[]> getIgniteDataStreamer() {
        return this.igniteDataStreamer;
    }

    @OnScheduled
    public final void initializeIgniteDataStreamer(ProcessContext context) throws ProcessException {
        super.initializeIgniteCache(context);
        if (this.getIgniteDataStreamer() != null) {
            return;
        }
        this.getLogger().info("Creating Ignite Datastreamer");
        try {
            int perNodeParallelOperations = context.getProperty(DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS).asInteger();
            int perNodeBufferSize = context.getProperty(DATA_STREAMER_PER_NODE_BUFFER_SIZE).asInteger();
            int autoFlushFrequency = context.getProperty(DATA_STREAMER_AUTO_FLUSH_FREQUENCY).asInteger();
            boolean allowOverride = context.getProperty(DATA_STREAMER_ALLOW_OVERRIDE).asBoolean();
            this.igniteDataStreamer = this.getIgnite().dataStreamer(this.getIgniteCache().getName());
            this.igniteDataStreamer.perNodeBufferSize(perNodeBufferSize);
            this.igniteDataStreamer.perNodeParallelOperations(perNodeParallelOperations);
            this.igniteDataStreamer.autoFlushFrequency((long)autoFlushFrequency);
            this.igniteDataStreamer.allowOverwrite(allowOverride);
        }
        catch (Exception e) {
            this.getLogger().error("Failed to schedule PutIgnite due to {}", new Object[]{e}, (Throwable)e);
            throw new ProcessException((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        String key;
        List<FlowFile> failedFlowFiles;
        List<FlowFile> successfulFlowFiles;
        List flowFiles;
        block14: {
            int batchSize = context.getProperty(BATCH_SIZE).asInteger();
            flowFiles = session.get(batchSize);
            if (flowFiles.isEmpty()) {
                return;
            }
            ArrayList<AbstractMap.SimpleEntry<String, byte[]>> cacheItems = new ArrayList<AbstractMap.SimpleEntry<String, byte[]>>();
            successfulFlowFiles = new ArrayList<FlowFile>();
            failedFlowFiles = new ArrayList<FlowFile>();
            try {
                for (int i = 0; i < flowFiles.size(); ++i) {
                    FlowFile flowFile = null;
                    try {
                        flowFile = (FlowFile)flowFiles.get(i);
                        key = context.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile).getValue();
                        if (this.isFailedFlowFile(flowFile, key)) {
                            failedFlowFiles.add(flowFile);
                            continue;
                        }
                        final byte[] byteArray = new byte[(int)flowFile.getSize()];
                        session.read(flowFile, new InputStreamCallback(){

                            public void process(InputStream in) throws IOException {
                                StreamUtils.fillBuffer((InputStream)in, (byte[])byteArray, (boolean)true);
                            }
                        });
                        cacheItems.add(new AbstractMap.SimpleEntry<String, byte[]>(key, byteArray));
                        successfulFlowFiles.add(flowFile);
                        continue;
                    }
                    catch (Exception e) {
                        this.getLogger().error("Failed to insert {} into IgniteDB due to {}", new Object[]{flowFile, e}, (Throwable)e);
                        session.transfer(flowFile, REL_FAILURE);
                        context.yield();
                    }
                }
                if (cacheItems.isEmpty()) break block14;
            }
            catch (Throwable throwable) {
                if (!cacheItems.isEmpty()) {
                    IgniteFuture futures = this.igniteDataStreamer.addData(cacheItems);
                    Object result = futures.get();
                    this.getLogger().debug("Result {} of addData", new Object[]{result});
                }
                if (!successfulFlowFiles.isEmpty()) {
                    successfulFlowFiles = this.updateSuccessfulFlowFileAttributes(flowFiles, successfulFlowFiles, session);
                    session.transfer(successfulFlowFiles, REL_SUCCESS);
                    for (FlowFile flowFile : successfulFlowFiles) {
                        String key2 = context.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile).getValue();
                        session.getProvenanceReporter().send(flowFile, "ignite://cache/" + this.getIgniteCache().getName() + "/" + key2);
                    }
                }
                if (!failedFlowFiles.isEmpty()) {
                    failedFlowFiles = this.updateFailedFlowFileAttributes(flowFiles, failedFlowFiles, session, context);
                    session.transfer(failedFlowFiles, REL_FAILURE);
                }
                throw throwable;
            }
            IgniteFuture futures = this.igniteDataStreamer.addData(cacheItems);
            Object result = futures.get();
            this.getLogger().debug("Result {} of addData", new Object[]{result});
        }
        if (!successfulFlowFiles.isEmpty()) {
            successfulFlowFiles = this.updateSuccessfulFlowFileAttributes(flowFiles, successfulFlowFiles, session);
            session.transfer(successfulFlowFiles, REL_SUCCESS);
            for (FlowFile flowFile : successfulFlowFiles) {
                key = context.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile).getValue();
                session.getProvenanceReporter().send(flowFile, "ignite://cache/" + this.getIgniteCache().getName() + "/" + key);
            }
        }
        if (!failedFlowFiles.isEmpty()) {
            failedFlowFiles = this.updateFailedFlowFileAttributes(flowFiles, failedFlowFiles, session, context);
            session.transfer(failedFlowFiles, REL_FAILURE);
        }
    }

    private boolean isFailedFlowFile(FlowFile flowFile, String key) {
        if (StringUtils.isEmpty((CharSequence)key)) {
            return true;
        }
        return flowFile.getSize() == 0L;
    }

    protected List<FlowFile> updateSuccessfulFlowFileAttributes(List<FlowFile> flowFiles, List<FlowFile> successfulFlowFiles, ProcessSession session) {
        int flowFileCount = flowFiles.size();
        int flowFileSuccessful = successfulFlowFiles.size();
        ArrayList<FlowFile> updatedSuccessfulFlowFiles = new ArrayList<FlowFile>();
        for (int i = 0; i < flowFileSuccessful; ++i) {
            FlowFile flowFile = successfulFlowFiles.get(i);
            HashMap<String, String> attributes = new HashMap<String, String>();
            attributes.put(IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, Integer.toString(i));
            attributes.put(IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, Integer.toString(flowFileCount));
            attributes.put(IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, Integer.toString(flowFiles.indexOf(flowFile)));
            attributes.put(IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, Integer.toString(flowFileSuccessful));
            flowFile = session.putAllAttributes(flowFile, attributes);
            updatedSuccessfulFlowFiles.add(flowFile);
        }
        return updatedSuccessfulFlowFiles;
    }

    protected List<FlowFile> updateFailedFlowFileAttributes(List<FlowFile> flowFiles, List<FlowFile> failedFlowFiles, ProcessSession session, ProcessContext context) {
        int flowFileCount = flowFiles.size();
        int flowFileFailed = failedFlowFiles.size();
        ArrayList<FlowFile> updatedFailedFlowFiles = new ArrayList<FlowFile>();
        for (int i = 0; i < flowFileFailed; ++i) {
            FlowFile flowFile = failedFlowFiles.get(i);
            HashMap<String, String> attributes = new HashMap<String, String>();
            attributes.put(IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, Integer.toString(i));
            attributes.put(IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, Integer.toString(flowFileCount));
            attributes.put(IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, Integer.toString(flowFiles.indexOf(flowFile)));
            attributes.put(IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, Integer.toString(flowFileFailed));
            String key = context.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile).getValue();
            if (StringUtils.isEmpty((CharSequence)key)) {
                attributes.put(IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE);
            } else if (flowFile.getSize() == 0L) {
                attributes.put(IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE);
            } else {
                throw new ProcessException("Unknown reason for failing file: " + flowFile);
            }
            flowFile = session.putAllAttributes(flowFile, attributes);
            updatedFailedFlowFiles.add(flowFile);
        }
        return updatedFailedFlowFiles;
    }
}

