/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.azure.eventhub;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.IEventProcessorFactory;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import com.microsoft.azure.servicebus.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.ReceiverDisconnectedException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;

@Tags(value={"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@CapabilityDescription(value="Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@WritesAttributes(value={@WritesAttribute(attribute="eventhub.enqueued.timestamp", description="The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"), @WritesAttribute(attribute="eventhub.offset", description="The offset into the partition at which the message was stored"), @WritesAttribute(attribute="eventhub.sequence", description="The Azure Sequence number associated with the message"), @WritesAttribute(attribute="eventhub.name", description="The name of the Event Hub from which the message was pulled"), @WritesAttribute(attribute="eventhub.partition", description="The name of the Azure Partition from which the message was pulled")})
public class ConsumeAzureEventHub
extends AbstractSessionFactoryProcessor {
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("event-hub-namespace").displayName("Event Hub Namespace").description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("event-hub-name").displayName("Event Hub Name").description("The name of the Azure Event Hub to pull messages from.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor ACCESS_POLICY_NAME = new PropertyDescriptor.Builder().name("event-hub-shared-access-policy-name").displayName("Shared Access Policy Name").description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder().name("event-hub-shared-access-policy-primary-key").displayName("Shared Access Policy Primary Key").description("The primary key of the Event Hub Shared Access Policy.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).sensitive(true).required(true).build();
    static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder().name("event-hub-consumer-group").displayName("Event Hub Consumer Group").description("The name of the Event Hub Consumer Group to use.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("$Default").required(true).build();
    static final PropertyDescriptor CONSUMER_HOSTNAME = new PropertyDescriptor.Builder().name("event-hub-consumer-hostname").displayName("Event Hub Consumer Hostname").description("The hostname of this Event Hub Consumer instance. If not specified, an unique identifier is generated in 'nifi-<UUID>' format.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(false).build();
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The Record Reader to use for reading received messages. The Event Hub name can be referred by Expression Language '${eventhub.name}' to access a schema.").identifiesControllerService(RecordReaderFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("The Record Writer to use for serializing Records to an output FlowFile. The Event Hub name can be referred by Expression Language '${eventhub.name}' to access a schema. If not specified, each message will create a FlowFile.").identifiesControllerService(RecordSetWriterFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final AllowableValue INITIAL_OFFSET_START_OF_STREAM = new AllowableValue("start-of-stream", "Start of stream", "Read from the oldest message retained in the stream.");
    static final AllowableValue INITIAL_OFFSET_END_OF_STREAM = new AllowableValue("end-of-stream", "End of stream", "Ignore old retained messages even if exist, start reading new ones from now.");
    static final PropertyDescriptor INITIAL_OFFSET = new PropertyDescriptor.Builder().name("event-hub-initial-offset").displayName("Initial Offset").description("Specify where to start receiving messages if offset is not stored in Azure Storage.").required(true).allowableValues(new AllowableValue[]{INITIAL_OFFSET_START_OF_STREAM, INITIAL_OFFSET_END_OF_STREAM}).defaultValue(INITIAL_OFFSET_END_OF_STREAM.getValue()).build();
    static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder().name("event-hub-prefetch-count").displayName("Prefetch Count").defaultValue("The number of messages to fetch from Event Hub before processing. This parameter affects throughput. The more prefetch count, the better throughput in general, but consumes more resources (RAM). NOTE: Even though Event Hub client API provides this option, actual number of messages can be pre-fetched is depend on the Event Hub server implementation. It is reported that only one event is received at a time in certain situation. https://github.com/Azure/azure-event-hubs-java/issues/125").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("300").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("event-hub-batch-size").displayName("Batch Size").description("The number of messages to process within a NiFi session. This parameter affects throughput and consistency. NiFi commits its session and Event Hub checkpoint after processing this number of messages. If NiFi session is committed, but failed to create an Event Hub checkpoint, then it is possible that the same messages to be retrieved again. The higher number, the higher throughput, but possibly less consistent.").defaultValue("10").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor RECEIVE_TIMEOUT = new PropertyDescriptor.Builder().name("event-hub-message-receive-timeout").displayName("Message Receive Timeout").description("The amount of time this consumer should wait to receive the Prefetch Count before returning.").defaultValue("1 min").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor STORAGE_ACCOUNT_NAME = new PropertyDescriptor.Builder().name("storage-account-name").displayName("Storage Account Name").description("Name of the Azure Storage account to store Event Hub Consumer Group state.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage Account Key").description("The Azure Storage account key to store Event Hub Consumer Group state.").sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor STORAGE_CONTAINER_NAME = new PropertyDescriptor.Builder().name("storage-container-name").displayName("Storage Container Name").description("Name of the Azure Storage Container to store Event Hub Consumer Group state. If not specified, Event Hub name is used.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(false).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles received from Event Hub.").build();
    static final Relationship REL_PARSE_FAILURE = new Relationship.Builder().name("parse.failure").description("If a message from Event Hub cannot be parsed using the configured Record Reader or failed to be written by the configured Record Writer, the contents of the message will be routed to this Relationship as its own individual FlowFile.").build();
    private static final Set<Relationship> RELATIONSHIPS;
    private static final Set<Relationship> RECORD_RELATIONSHIPS;
    private static final List<PropertyDescriptor> PROPERTIES;
    private volatile EventProcessorHost eventProcessorHost;
    private volatile ProcessSessionFactory processSessionFactory;
    private volatile RecordReaderFactory readerFactory;
    private volatile RecordSetWriterFactory writerFactory;
    private volatile String namespaceName;
    private volatile boolean isRecordReaderSet = false;
    private volatile boolean isRecordWriterSet = false;

    void setProcessSessionFactory(ProcessSessionFactory processSessionFactory) {
        this.processSessionFactory = processSessionFactory;
    }

    void setNamespaceName(String namespaceName) {
        this.namespaceName = namespaceName;
    }

    public void setReaderFactory(RecordReaderFactory readerFactory) {
        this.readerFactory = readerFactory;
    }

    public void setWriterFactory(RecordSetWriterFactory writerFactory) {
        this.writerFactory = writerFactory;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    public Set<Relationship> getRelationships() {
        return this.isRecordReaderSet && this.isRecordWriterSet ? RECORD_RELATIONSHIPS : RELATIONSHIPS;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        ControllerService recordReader = validationContext.getProperty(RECORD_READER).asControllerService();
        ControllerService recordWriter = validationContext.getProperty(RECORD_WRITER).asControllerService();
        if (recordReader != null && recordWriter == null || recordReader == null && recordWriter != null) {
            results.add(new ValidationResult.Builder().subject("Record Reader and Writer").explanation(String.format("Both %s and %s should be set in order to write FlowFiles as Records.", RECORD_READER.getDisplayName(), RECORD_WRITER.getDisplayName())).valid(false).build());
        }
        return results;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (RECORD_READER.equals((Object)descriptor)) {
            this.isRecordReaderSet = !StringUtils.isEmpty((String)newValue);
        } else if (RECORD_WRITER.equals((Object)descriptor)) {
            this.isRecordWriterSet = !StringUtils.isEmpty((String)newValue);
        }
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        if (this.eventProcessorHost == null) {
            try {
                this.registerEventProcessor(context);
            }
            catch (IllegalArgumentException e) {
                throw e;
            }
            catch (Exception e) {
                throw new ProcessException("Failed to register the event processor due to " + e, (Throwable)e);
            }
            this.processSessionFactory = sessionFactory;
            this.readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
            this.writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        }
        context.yield();
    }

    @OnStopped
    public void unregisterEventProcessor(ProcessContext context) {
        if (this.eventProcessorHost != null) {
            try {
                this.eventProcessorHost.unregisterEventProcessor();
                this.eventProcessorHost = null;
                this.processSessionFactory = null;
                this.readerFactory = null;
                this.writerFactory = null;
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to unregister the event processor due to " + e, e);
            }
        }
    }

    private void registerEventProcessor(ProcessContext context) throws Exception {
        Integer batchSize;
        String consumerGroupName = context.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue();
        this.validateRequiredProperty(CONSUMER_GROUP, consumerGroupName);
        this.namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
        this.validateRequiredProperty(NAMESPACE, this.namespaceName);
        String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
        this.validateRequiredProperty(EVENT_HUB_NAME, eventHubName);
        String sasName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
        this.validateRequiredProperty(ACCESS_POLICY_NAME, sasName);
        String sasKey = context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue();
        this.validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey);
        String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
        this.validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName);
        String storageAccountKey = context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
        this.validateRequiredProperty(STORAGE_ACCOUNT_KEY, storageAccountKey);
        String consumerHostname = this.orDefault(context.getProperty(CONSUMER_HOSTNAME).evaluateAttributeExpressions().getValue(), EventProcessorHost.createHostName((String)"nifi"));
        String containerName = this.orDefault(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName);
        EventProcessorOptions options = new EventProcessorOptions();
        String initialOffset = context.getProperty(INITIAL_OFFSET).getValue();
        if (INITIAL_OFFSET_START_OF_STREAM.getValue().equals(initialOffset)) {
            options.setInitialOffsetProvider((Function)new EventProcessorOptions.StartOfStreamInitialOffsetProvider(options));
        } else if (INITIAL_OFFSET_END_OF_STREAM.getValue().equals(initialOffset)) {
            options.setInitialOffsetProvider((Function)new EventProcessorOptions.EndOfStreamInitialOffsetProvider(options));
        } else {
            throw new IllegalArgumentException("Initial offset " + initialOffset + " is not allowed.");
        }
        Integer prefetchCount = context.getProperty(PREFETCH_COUNT).evaluateAttributeExpressions().asInteger();
        if (prefetchCount != null && prefetchCount > 0) {
            options.setPrefetchCount(prefetchCount.intValue());
        }
        if ((batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()) != null && batchSize > 0) {
            options.setMaxBatchSize(batchSize.intValue());
        }
        Long receiveTimeoutMillis = context.getProperty(RECEIVE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
        options.setReceiveTimeOut(Duration.ofMillis(receiveTimeoutMillis));
        String storageConnectionString = String.format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s", storageAccountName, storageAccountKey);
        ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder(this.namespaceName, eventHubName, sasName, sasKey);
        this.eventProcessorHost = new EventProcessorHost(consumerHostname, eventHubName, consumerGroupName, eventHubConnectionString.toString(), storageConnectionString, containerName);
        options.setExceptionNotification(e -> this.getLogger().error("An error occurred while receiving messages from Azure Event Hub {} at consumer group {} and partition {}, action={}, hostname={}, exception={}", new Object[]{eventHubName, consumerGroupName, e.getPartitionId(), e.getAction(), e.getHostname()}, (Throwable)e.getException()));
        this.eventProcessorHost.registerEventProcessorFactory((IEventProcessorFactory)new EventProcessorFactory(), options).get();
    }

    private String orDefault(String value, String defaultValue) {
        return StringUtils.isEmpty((String)value) ? defaultValue : value;
    }

    private void validateRequiredProperty(PropertyDescriptor property, String value) {
        if (StringUtils.isEmpty((String)value)) {
            throw new IllegalArgumentException(String.format("'%s' is required, but not specified.", property.getDisplayName()));
        }
    }

    static {
        PROPERTIES = Collections.unmodifiableList(Arrays.asList(NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, CONSUMER_GROUP, CONSUMER_HOSTNAME, RECORD_READER, RECORD_WRITER, INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT, STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_CONTAINER_NAME));
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
        relationships.add(REL_PARSE_FAILURE);
        RECORD_RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    }

    public class EventProcessor
    implements IEventProcessor {
        public void onOpen(PartitionContext context) throws Exception {
            ConsumeAzureEventHub.this.getLogger().info("Consumer group {} opened partition {} of {}", new Object[]{context.getConsumerGroupName(), context.getPartitionId(), context.getEventHubPath()});
        }

        public void onClose(PartitionContext context, CloseReason reason) throws Exception {
            ConsumeAzureEventHub.this.getLogger().info("Consumer group {} closed partition {} of {}. reason={}", new Object[]{context.getConsumerGroupName(), context.getPartitionId(), context.getEventHubPath(), reason});
        }

        public void onEvents(PartitionContext context, Iterable<EventData> messages) throws Exception {
            ProcessSession session = ConsumeAzureEventHub.this.processSessionFactory.createSession();
            try {
                StopWatch stopWatch = new StopWatch(true);
                if (ConsumeAzureEventHub.this.readerFactory != null && ConsumeAzureEventHub.this.writerFactory != null) {
                    this.writeRecords(context, messages, session, stopWatch);
                } else {
                    this.writeFlowFiles(context, messages, session, stopWatch);
                }
                session.commit();
                context.checkpoint();
            }
            catch (Exception e) {
                ConsumeAzureEventHub.this.getLogger().error("Unable to fully process received message due to " + e, (Throwable)e);
                session.rollback();
            }
        }

        private void putEventHubAttributes(Map<String, String> attributes, String eventHubName, String partitionId, EventData eventData) {
            EventData.SystemProperties systemProperties = eventData.getSystemProperties();
            if (null != systemProperties) {
                attributes.put("eventhub.enqueued.timestamp", String.valueOf(systemProperties.getEnqueuedTime()));
                attributes.put("eventhub.offset", systemProperties.getOffset());
                attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber()));
            }
            attributes.put("eventhub.name", eventHubName);
            attributes.put("eventhub.partition", partitionId);
        }

        private void writeFlowFiles(PartitionContext context, Iterable<EventData> messages, ProcessSession session, StopWatch stopWatch) {
            String eventHubName = context.getEventHubPath();
            String partitionId = context.getPartitionId();
            String consumerGroup = context.getConsumerGroupName();
            messages.forEach(eventData -> {
                FlowFile flowFile = session.create();
                HashMap<String, String> attributes = new HashMap<String, String>();
                this.putEventHubAttributes((Map<String, String>)attributes, eventHubName, partitionId, (EventData)eventData);
                flowFile = session.putAllAttributes(flowFile, attributes);
                flowFile = session.write(flowFile, out -> out.write(eventData.getBytes()));
                this.transferTo(REL_SUCCESS, session, stopWatch, eventHubName, partitionId, consumerGroup, flowFile);
            });
        }

        private void transferTo(Relationship relationship, ProcessSession session, StopWatch stopWatch, String eventHubName, String partitionId, String consumerGroup, FlowFile flowFile) {
            session.transfer(flowFile, relationship);
            String transitUri = "amqps://" + ConsumeAzureEventHub.this.namespaceName + ".servicebus.windows.net/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId;
            session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
        }

        private void writeRecords(PartitionContext context, Iterable<EventData> messages, ProcessSession session, StopWatch stopWatch) throws SchemaNotFoundException, IOException {
            String eventHubName = context.getEventHubPath();
            String partitionId = context.getPartitionId();
            String consumerGroup = context.getConsumerGroupName();
            HashMap<String, String> schemaRetrievalVariables = new HashMap<String, String>();
            schemaRetrievalVariables.put("eventhub.name", eventHubName);
            ComponentLog logger = ConsumeAzureEventHub.this.getLogger();
            FlowFile flowFile = session.create();
            HashMap<String, String> attributes = new HashMap<String, String>();
            RecordSetWriter writer = null;
            EventData lastEventData = null;
            WriteResult lastWriteResult = null;
            int recordCount = 0;
            try (OutputStream out = session.write(flowFile);){
                for (EventData eventData : messages) {
                    try {
                        ByteArrayInputStream in = new ByteArrayInputStream(eventData.getBytes());
                        Throwable throwable = null;
                        try {
                            Record record;
                            RecordReader reader = ConsumeAzureEventHub.this.readerFactory.createRecordReader(schemaRetrievalVariables, (InputStream)in, logger);
                            while ((record = reader.nextRecord()) != null) {
                                if (writer == null) {
                                    RecordSchema readerSchema = record.getSchema();
                                    RecordSchema writeSchema = ConsumeAzureEventHub.this.writerFactory.getSchema(schemaRetrievalVariables, readerSchema);
                                    writer = ConsumeAzureEventHub.this.writerFactory.createWriter(logger, writeSchema, out);
                                    writer.beginRecordSet();
                                }
                                lastWriteResult = writer.write(record);
                                recordCount += lastWriteResult.getRecordCount();
                            }
                            lastEventData = eventData;
                        }
                        catch (Throwable throwable2) {
                            throwable = throwable2;
                            throw throwable2;
                        }
                        finally {
                            if (in == null) continue;
                            if (throwable != null) {
                                try {
                                    ((InputStream)in).close();
                                }
                                catch (Throwable throwable3) {
                                    throwable.addSuppressed(throwable3);
                                }
                                continue;
                            }
                            ((InputStream)in).close();
                        }
                    }
                    catch (Exception e) {
                        logger.error("Failed to parse message from Azure Event Hub using configured Record Reader and Writer due to " + e, (Throwable)e);
                        FlowFile failed = session.create();
                        session.write(failed, o -> o.write(eventData.getBytes()));
                        this.putEventHubAttributes(attributes, eventHubName, partitionId, eventData);
                        failed = session.putAllAttributes(failed, attributes);
                        this.transferTo(REL_PARSE_FAILURE, session, stopWatch, eventHubName, partitionId, consumerGroup, failed);
                    }
                }
                if (lastEventData != null) {
                    this.putEventHubAttributes(attributes, eventHubName, partitionId, lastEventData);
                    attributes.put("record.count", String.valueOf(recordCount));
                    if (writer != null) {
                        writer.finishRecordSet();
                        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                        if (lastWriteResult != null) {
                            attributes.putAll(lastWriteResult.getAttributes());
                        }
                        try {
                            writer.close();
                        }
                        catch (IOException e) {
                            logger.warn("Failed to close Record Writer due to {}" + e, (Throwable)e);
                        }
                    }
                }
            }
            if (lastEventData != null) {
                flowFile = session.putAllAttributes(flowFile, attributes);
                this.transferTo(REL_SUCCESS, session, stopWatch, eventHubName, partitionId, consumerGroup, flowFile);
            } else {
                session.remove(flowFile);
            }
        }

        public void onError(PartitionContext context, Throwable e) {
            if (e instanceof ReceiverDisconnectedException && e.getMessage().startsWith("New receiver with higher epoch of ")) {
                ConsumeAzureEventHub.this.getLogger().info("New receiver took over partition {} of Azure Event Hub {}, consumerGroupName={}, message={}", new Object[]{context.getPartitionId(), context.getEventHubPath(), context.getConsumerGroupName(), e.getMessage()});
                return;
            }
            ConsumeAzureEventHub.this.getLogger().error("An error occurred while receiving messages from Azure Event Hub {} at partition {}, consumerGroupName={}, exception={}", new Object[]{context.getEventHubPath(), context.getPartitionId(), context.getConsumerGroupName(), e}, e);
        }
    }

    public class EventProcessorFactory
    implements IEventProcessorFactory {
        public IEventProcessor createEventProcessor(PartitionContext context) throws Exception {
            EventProcessor eventProcessor = new EventProcessor();
            return eventProcessor;
        }
    }
}

