/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.util.IntegerHolder;
import org.apache.nifi.util.ObjectHolder;

@EventDriven
@SideEffectFree
@SupportsBatching
@Tags(value={"split", "text"})
@CapabilityDescription(value="Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines")
public class SplitText
extends AbstractProcessor {
    public static final String SPLIT_LINE_COUNT = "text.line.count";
    public static final String FRAGMENT_ID = "fragment.identifier";
    public static final String FRAGMENT_INDEX = "fragment.index";
    public static final String FRAGMENT_COUNT = "fragment.count";
    public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
    public static final PropertyDescriptor LINE_SPLIT_COUNT = new PropertyDescriptor.Builder().name("Line Split Count").description("The number of lines that will be added to each split file").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor HEADER_LINE_COUNT = new PropertyDescriptor.Builder().name("Header Line Count").description("The number of lines that should be considered part of the header; the header lines will be duplicated to all split files").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).defaultValue("0").build();
    public static final PropertyDescriptor REMOVE_TRAILING_NEWLINES = new PropertyDescriptor.Builder().name("Remove Trailing Newlines").description("Whether to remove newlines at the end of each split file. This should be false if you intend to merge the split files later").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original input file will be routed to this destination when it has been successfully split into 1 or more files").build();
    public static final Relationship REL_SPLITS = new Relationship.Builder().name("splits").description("The split files will be routed to this destination when an input file is successfully split into 1 or more split files").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere").build();
    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(LINE_SPLIT_COUNT);
        properties.add(HEADER_LINE_COUNT);
        properties.add(REMOVE_TRAILING_NEWLINES);
        this.properties = Collections.unmodifiableList(properties);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_ORIGINAL);
        relationships.add(REL_SPLITS);
        relationships.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    private int readLines(InputStream in, int maxNumLines, OutputStream out, boolean keepAllNewLines) throws IOException {
        int numLines = 0;
        for (int i = 0; i < maxNumLines; ++i) {
            long bytes = this.countBytesToSplitPoint(in, out, keepAllNewLines || i != maxNumLines - 1);
            if (bytes <= 0L) {
                return numLines;
            }
            ++numLines;
        }
        return numLines;
    }

    private long countBytesToSplitPoint(InputStream in, OutputStream out, boolean includeLineDelimiter) throws IOException {
        int lastByte = -1;
        long bytesRead = 0L;
        while (true) {
            in.mark(1);
            int nextByte = in.read();
            if (nextByte == -1) {
                if (lastByte == 13) {
                    return includeLineDelimiter ? bytesRead : bytesRead - 1L;
                }
                return bytesRead;
            }
            ++bytesRead;
            if (out != null && (includeLineDelimiter || nextByte != 10 && nextByte != 13)) {
                out.write(nextByte);
            }
            if (nextByte == 10) {
                if (includeLineDelimiter) {
                    return bytesRead;
                }
                return lastByte == 13 ? bytesRead - 2L : bytesRead - 1L;
            }
            if (lastByte == 13) {
                in.reset();
                return includeLineDelimiter ? bytesRead : --bytesRead - 1L;
            }
            lastByte = nextByte;
        }
    }

    private SplitInfo countBytesToSplitPoint(InputStream in, int numLines, boolean keepAllNewLines) throws IOException {
        long bytesTillNext;
        SplitInfo info = new SplitInfo();
        while (info.lengthLines < (long)numLines && (bytesTillNext = this.countBytesToSplitPoint(in, null, keepAllNewLines || info.lengthLines != (long)(numLines - 1))) > 0L) {
            ++info.lengthLines;
            info.lengthBytes += bytesTillNext;
        }
        return info;
    }

    public void onTrigger(ProcessContext context, final ProcessSession session) {
        final FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        final ProcessorLog logger = this.getLogger();
        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
        final int splitCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
        final boolean removeTrailingNewlines = context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
        final ObjectHolder errorMessage = new ObjectHolder(null);
        final ArrayList splitInfos = new ArrayList();
        final long startNanos = System.nanoTime();
        final ArrayList<FlowFile> splits = new ArrayList<FlowFile>();
        session.read(flowFile, new InputStreamCallback(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void process(InputStream rawIn) throws IOException {
                try (BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
                     final ByteCountingInputStream in = new ByteCountingInputStream((InputStream)bufferedIn);){
                    final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
                    int headerLinesCopied = SplitText.this.readLines((InputStream)in, headerCount, (OutputStream)headerStream, true);
                    if (headerLinesCopied < headerCount) {
                        errorMessage.set((Object)("Header Line Count is set to " + headerCount + " but file had only " + headerLinesCopied + " lines"));
                        return;
                    }
                    while (true) {
                        if (headerCount > 0) {
                            final IntegerHolder linesCopied = new IntegerHolder(0);
                            FlowFile splitFile = session.create(flowFile);
                            try {
                                splitFile = session.write(splitFile, new OutputStreamCallback(){

                                    public void process(OutputStream rawOut) throws IOException {
                                        try (BufferedOutputStream out = new BufferedOutputStream(rawOut);){
                                            headerStream.writeTo((OutputStream)out);
                                            linesCopied.set((Object)SplitText.this.readLines((InputStream)in, splitCount, (OutputStream)out, !removeTrailingNewlines));
                                        }
                                    }
                                });
                                splitFile = session.putAttribute(splitFile, SplitText.SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
                                logger.debug("Created Split File {} with {} lines", new Object[]{splitFile, linesCopied.get()});
                            }
                            finally {
                                if ((Integer)linesCopied.get() > 0) {
                                    splits.add(splitFile);
                                } else {
                                    session.remove(splitFile);
                                }
                            }
                            if ((Integer)linesCopied.get() >= splitCount) continue;
                            break;
                        }
                        long beforeReadingLines = in.getBytesConsumed();
                        SplitInfo info = SplitText.this.countBytesToSplitPoint((InputStream)in, splitCount, !removeTrailingNewlines);
                        if (info.lengthBytes == 0L) {
                            break;
                        }
                        info.offsetBytes = beforeReadingLines;
                        splitInfos.add(info);
                        long procNanos = System.nanoTime() - startNanos;
                        long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
                        logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; total splits = {}; total processing time = {} ms", new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
                    }
                }
            }
        });
        if (errorMessage.get() != null) {
            logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()});
            session.transfer(flowFile, REL_FAILURE);
            if (splits != null && !splits.isEmpty()) {
                session.remove(splits);
            }
            return;
        }
        if (!splitInfos.isEmpty()) {
            for (SplitInfo info : splitInfos) {
                FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes);
                split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines));
                splits.add(split);
            }
        }
        this.finishFragmentAttributes(session, flowFile, splits);
        if (splits.size() > 10) {
            logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()});
        } else {
            logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits});
        }
        session.transfer(flowFile, REL_ORIGINAL);
        session.transfer(splits, REL_SPLITS);
    }

    private void finishFragmentAttributes(ProcessSession session, FlowFile source, List<FlowFile> splits) {
        String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
        String fragmentId = UUID.randomUUID().toString();
        ArrayList<FlowFile> newList = new ArrayList<FlowFile>(splits);
        splits.clear();
        for (int i = 1; i <= newList.size(); ++i) {
            FlowFile ff = newList.get(i - 1);
            HashMap<String, String> attributes = new HashMap<String, String>();
            attributes.put(FRAGMENT_ID, fragmentId);
            attributes.put(FRAGMENT_INDEX, String.valueOf(i));
            attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
            attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
            FlowFile newFF = session.putAllAttributes(ff, attributes);
            splits.add(newFF);
        }
    }

    private class SplitInfo {
        public long offsetBytes;
        public long lengthBytes;
        public long lengthLines;

        public SplitInfo() {
            this.offsetBytes = 0L;
            this.lengthBytes = 0L;
            this.lengthLines = 0L;
        }

        public SplitInfo(long offsetBytes, long lengthBytes, long lengthLines) {
            this.offsetBytes = offsetBytes;
            this.lengthBytes = lengthBytes;
            this.lengthLines = lengthLines;
        }
    }
}

