/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.MergeContent;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.util.IntegerHolder;
import org.apache.nifi.util.ObjectHolder;

@EventDriven
@SideEffectFree
@SupportsBatching
@Tags(value={"split", "text"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines.")
@WritesAttributes(value={@WritesAttribute(attribute="text.line.count", description="The number of lines of text from the original FlowFile that were copied to this FlowFile"), @WritesAttribute(attribute="fragment.identifier", description="All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), @WritesAttribute(attribute="fragment.index", description="A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"), @WritesAttribute(attribute="fragment.count", description="The number of split FlowFiles generated from the parent FlowFile"), @WritesAttribute(attribute="segment.original.filename ", description="The filename of the parent FlowFile")})
@SeeAlso(value={MergeContent.class})
public class SplitText
extends AbstractProcessor {
    public static final String SPLIT_LINE_COUNT = "text.line.count";
    public static final String FRAGMENT_ID = "fragment.identifier";
    public static final String FRAGMENT_INDEX = "fragment.index";
    public static final String FRAGMENT_COUNT = "fragment.count";
    public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
    public static final PropertyDescriptor LINE_SPLIT_COUNT = new PropertyDescriptor.Builder().name("Line Split Count").description("The number of lines that will be added to each split file (excluding the header, if the Header Line Count property is greater than 0).").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor HEADER_LINE_COUNT = new PropertyDescriptor.Builder().name("Header Line Count").description("The number of lines that should be considered part of the header; the header lines will be duplicated to all split files").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).defaultValue("0").build();
    public static final PropertyDescriptor REMOVE_TRAILING_NEWLINES = new PropertyDescriptor.Builder().name("Remove Trailing Newlines").description("Whether to remove newlines at the end of each split file. This should be false if you intend to merge the split files later. If this is set to 'true' and a FlowFile is generated that contains only 'empty lines' (i.e., consists only of \r and \n characters), the FlowFile will not be emitted. Note, however, that if the Header Line Count is greater than 0, the resultant FlowFile will never be empty as it will consist of the header lines, so a FlowFile may be emitted that contians only the header lines.").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original input file will be routed to this destination when it has been successfully split into 1 or more files").build();
    public static final Relationship REL_SPLITS = new Relationship.Builder().name("splits").description("The split files will be routed to this destination when an input file is successfully split into 1 or more split files").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere").build();
    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(LINE_SPLIT_COUNT);
        properties.add(HEADER_LINE_COUNT);
        properties.add(REMOVE_TRAILING_NEWLINES);
        this.properties = Collections.unmodifiableList(properties);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_ORIGINAL);
        relationships.add(REL_SPLITS);
        relationships.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    private int readLines(InputStream in, int maxNumLines, OutputStream out, boolean keepAllNewLines, byte[] leadingNewLineBytes) throws IOException {
        EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
        int numLines = 0;
        byte[] leadingBytes = leadingNewLineBytes;
        for (int i = 0; i < maxNumLines; ++i) {
            EndOfLineMarker eolMarker = this.locateEndOfLine(in, out, false, eolBuffer, leadingBytes);
            leadingBytes = eolMarker.getLeadingNewLineBytes();
            if (keepAllNewLines && out != null) {
                if (leadingBytes != null) {
                    out.write(leadingBytes);
                    leadingBytes = null;
                }
                eolBuffer.drainTo(out);
            }
            if (eolBuffer.length() > 0 || eolMarker.getBytesConsumed() > 0L) {
                ++numLines;
            }
            if (eolMarker.isStreamEnded()) break;
        }
        return numLines;
    }

    private EndOfLineMarker locateEndOfLine(InputStream in, OutputStream out, boolean includeLineDelimiter, EndOfLineBuffer eolBuffer, byte[] leadingNewLineBytes) throws IOException {
        int lastByte = -1;
        long bytesRead = 0L;
        byte[] bytesToWriteFirst = leadingNewLineBytes;
        while (true) {
            boolean isNewLineChar;
            in.mark(1);
            int nextByte = in.read();
            boolean bl = isNewLineChar = nextByte == 13 || nextByte == 10;
            if (nextByte == -1) {
                if (lastByte == 13) {
                    eolBuffer.addEndOfLine(true, false);
                }
                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);
            }
            if (!isNewLineChar) {
                if (bytesToWriteFirst != null) {
                    if (out != null) {
                        out.write(bytesToWriteFirst);
                    }
                    bytesToWriteFirst = null;
                }
                if (out != null) {
                    eolBuffer.drainTo(out);
                }
                eolBuffer.clear();
            }
            ++bytesRead;
            if (out != null && (includeLineDelimiter || !isNewLineChar)) {
                if (bytesToWriteFirst != null) {
                    out.write(bytesToWriteFirst);
                    bytesToWriteFirst = null;
                }
                out.write(nextByte);
            }
            if (nextByte == 10) {
                eolBuffer.addEndOfLine(lastByte == 13, true);
                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
            }
            if (lastByte == 13) {
                in.reset();
                eolBuffer.addEndOfLine(true, false);
                return new EndOfLineMarker(--bytesRead, eolBuffer, false, bytesToWriteFirst);
            }
            lastByte = nextByte;
        }
    }

    private SplitInfo locateSplitPoint(InputStream in, int numLines, boolean keepAllNewLines) throws IOException {
        SplitInfo info = new SplitInfo();
        EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
        while (info.lengthLines < (long)numLines) {
            boolean isLastLine;
            boolean keepNewLine = keepAllNewLines || info.lengthLines != (long)(numLines - 1);
            EndOfLineMarker eolMarker = this.locateEndOfLine(in, null, keepNewLine, eolBuffer, null);
            long bytesTillNext = eolMarker.getBytesConsumed();
            ++info.lengthLines;
            boolean bl = isLastLine = eolMarker.isStreamEnded() || info.lengthLines >= (long)numLines;
            if (isLastLine && !keepAllNewLines) {
                bytesTillNext -= (long)eolBuffer.length();
            }
            info.lengthBytes += bytesTillNext;
            if (!eolMarker.isStreamEnded()) continue;
            info.endOfStream = true;
            break;
        }
        return info;
    }

    public void onTrigger(ProcessContext context, final ProcessSession session) {
        final FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        final ProcessorLog logger = this.getLogger();
        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
        final int splitCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
        final boolean removeTrailingNewlines = context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
        final ObjectHolder errorMessage = new ObjectHolder(null);
        final ArrayList splitInfos = new ArrayList();
        final long startNanos = System.nanoTime();
        final ArrayList<FlowFile> splits = new ArrayList<FlowFile>();
        session.read(flowFile, new InputStreamCallback(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void process(InputStream rawIn) throws IOException {
                try (BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
                     final ByteCountingInputStream in = new ByteCountingInputStream((InputStream)bufferedIn);){
                    byte[] headerBytesWithoutTrailingNewLines;
                    byte[] headerNewLineBytes;
                    byte headerByte;
                    ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
                    int headerLinesCopied = SplitText.this.readLines((InputStream)in, headerCount, (OutputStream)headerStream, true, null);
                    if (headerLinesCopied < headerCount) {
                        errorMessage.set((Object)("Header Line Count is set to " + headerCount + " but file had only " + headerLinesCopied + " lines"));
                        return;
                    }
                    byte[] headerBytes = headerStream.toByteArray();
                    int headerNewLineByteCount = 0;
                    for (int i = headerBytes.length - 1; i >= 0 && ((headerByte = headerBytes[i]) == 13 || headerByte == 10); --i) {
                        ++headerNewLineByteCount;
                    }
                    if (headerNewLineByteCount == 0) {
                        headerNewLineBytes = null;
                        headerBytesWithoutTrailingNewLines = headerBytes;
                    } else {
                        headerNewLineBytes = new byte[headerNewLineByteCount];
                        System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
                        headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
                        System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
                    }
                    while (true) {
                        if (headerCount > 0) {
                            final IntegerHolder linesCopied = new IntegerHolder(0);
                            FlowFile splitFile = session.create(flowFile);
                            try {
                                splitFile = session.write(splitFile, new OutputStreamCallback(){

                                    public void process(OutputStream rawOut) throws IOException {
                                        try (BufferedOutputStream out = new BufferedOutputStream(rawOut);){
                                            out.write(headerBytesWithoutTrailingNewLines);
                                            linesCopied.set((Object)SplitText.this.readLines((InputStream)in, splitCount, (OutputStream)out, !removeTrailingNewlines, headerNewLineBytes));
                                        }
                                    }
                                });
                                splitFile = session.putAttribute(splitFile, SplitText.SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
                                logger.debug("Created Split File {} with {} lines", new Object[]{splitFile, linesCopied.get()});
                            }
                            finally {
                                if ((Integer)linesCopied.get() > 0) {
                                    splits.add(splitFile);
                                } else {
                                    session.remove(splitFile);
                                }
                            }
                            if ((Integer)linesCopied.get() >= splitCount) continue;
                            break;
                        }
                        long beforeReadingLines = in.getBytesConsumed();
                        SplitInfo info = SplitText.this.locateSplitPoint((InputStream)in, splitCount, !removeTrailingNewlines);
                        if (info.lengthBytes > 0L) {
                            info.offsetBytes = beforeReadingLines;
                            splitInfos.add(info);
                            long procNanos = System.nanoTime() - startNanos;
                            long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
                            logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; total splits = {}; total processing time = {} ms", new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
                        }
                        if (info.endOfStream) break;
                    }
                }
            }
        });
        if (errorMessage.get() != null) {
            logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()});
            session.transfer(flowFile, REL_FAILURE);
            if (splits != null && !splits.isEmpty()) {
                session.remove(splits);
            }
            return;
        }
        if (!splitInfos.isEmpty()) {
            for (SplitInfo info : splitInfos) {
                FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes);
                split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines));
                splits.add(split);
            }
        }
        this.finishFragmentAttributes(session, flowFile, splits);
        if (splits.size() > 10) {
            logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()});
        } else {
            logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits});
        }
        session.transfer(flowFile, REL_ORIGINAL);
        session.transfer(splits, REL_SPLITS);
    }

    private void finishFragmentAttributes(ProcessSession session, FlowFile source, List<FlowFile> splits) {
        String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
        String fragmentId = UUID.randomUUID().toString();
        ArrayList<FlowFile> newList = new ArrayList<FlowFile>(splits);
        splits.clear();
        for (int i = 1; i <= newList.size(); ++i) {
            FlowFile ff = newList.get(i - 1);
            HashMap<String, String> attributes = new HashMap<String, String>();
            attributes.put(FRAGMENT_ID, fragmentId);
            attributes.put(FRAGMENT_INDEX, String.valueOf(i));
            attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
            attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
            FlowFile newFF = session.putAllAttributes(ff, attributes);
            splits.add(newFF);
        }
    }

    public static class EndOfLineMarker {
        private final long bytesConsumed;
        private final EndOfLineBuffer eolBuffer;
        private final boolean streamEnded;
        private final byte[] leadingNewLineBytes;

        public EndOfLineMarker(long bytesCounted, EndOfLineBuffer eolBuffer, boolean streamEnded, byte[] leadingNewLineBytes) {
            this.bytesConsumed = bytesCounted;
            this.eolBuffer = eolBuffer;
            this.streamEnded = streamEnded;
            this.leadingNewLineBytes = leadingNewLineBytes;
        }

        public long getBytesConsumed() {
            return this.bytesConsumed;
        }

        public EndOfLineBuffer getEndOfLineBuffer() {
            return this.eolBuffer;
        }

        public boolean isStreamEnded() {
            return this.streamEnded;
        }

        public byte[] getLeadingNewLineBytes() {
            return this.leadingNewLineBytes;
        }
    }

    public static class EndOfLineBuffer {
        private static final byte CARRIAGE_RETURN = 13;
        private static final byte NEWLINE = 10;
        private final BitSet buffer = new BitSet();
        private int index = 0;

        public void clear() {
            this.index = 0;
        }

        public void addEndOfLine(boolean carriageReturn, boolean newLine) {
            this.buffer.set(this.index++, carriageReturn);
            this.buffer.set(this.index++, newLine);
        }

        private void drainTo(OutputStream out) throws IOException {
            for (int i = 0; i < this.index; i += 2) {
                boolean cr = this.buffer.get(i);
                boolean nl = this.buffer.get(i + 1);
                if (!cr && !nl) {
                    return;
                }
                if (cr) {
                    out.write(13);
                }
                if (!nl) continue;
                out.write(10);
            }
            this.clear();
        }

        public int length() {
            return this.index / 2;
        }
    }

    private static class SplitInfo {
        public long offsetBytes;
        public long lengthBytes;
        public long lengthLines;
        public boolean endOfStream;

        public SplitInfo() {
            this.offsetBytes = 0L;
            this.lengthBytes = 0L;
            this.lengthLines = 0L;
            this.endOfStream = false;
        }

        public SplitInfo(long offsetBytes, long lengthBytes, long lengthLines) {
            this.offsetBytes = offsetBytes;
            this.lengthBytes = lengthBytes;
            this.lengthLines = lengthLines;
        }
    }
}

