package org.apache.nifi.jms.processors.ioconcept.reader;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.processors.ioconcept.reader.record.RecordSupplier;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.StopWatch;

/* loaded from: input_file:org/apache/nifi/jms/processors/ioconcept/reader/StateTrackingFlowFileReader.class */
public class StateTrackingFlowFileReader implements FlowFileReader {
    public static final String ATTR_READ_FAILED_INDEX_SUFFIX = ".read.failed.index";
    private final String identifier;
    private final RecordSupplier recordSupplier;
    private final ComponentLog logger;

    public StateTrackingFlowFileReader(String str, RecordSupplier recordSupplier, ComponentLog componentLog) {
        this.identifier = str;
        this.recordSupplier = recordSupplier;
        this.logger = componentLog;
    }

    @Override // org.apache.nifi.jms.processors.ioconcept.reader.FlowFileReader
    public void read(ProcessSession processSession, FlowFile flowFile, MessageHandler messageHandler, FlowFileReaderCallback flowFileReaderCallback) {
        StopWatch stopWatch = new StopWatch(true);
        AtomicInteger atomicInteger = new AtomicInteger();
        String str = this.identifier + ".read.failed.index";
        try {
            Long l = (Long) Optional.ofNullable(flowFile.getAttribute(str)).map(Long::valueOf).orElse(null);
            processSession.read(flowFile, inputStream -> {
                this.recordSupplier.process(flowFile, inputStream, atomicInteger, l, this.logger, messageHandler);
            });
            FlowFile flowFile2 = flowFile;
            boolean z = l != null;
            if (z) {
                flowFile2 = processSession.removeAttribute(flowFile, str);
            }
            flowFileReaderCallback.onSuccess(flowFile2, atomicInteger.get(), z, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
        } catch (Exception e) {
            this.logger.error("An error happened while processing records. Routing to failure.", e);
            flowFileReaderCallback.onFailure(processSession.putAttribute(flowFile, str, String.valueOf(atomicInteger.get())), atomicInteger.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS), e);
        }
    }
}
