/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.azure.eventhub;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.servicebus.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.IllegalConnectionStringFormatException;
import com.microsoft.azure.servicebus.ServiceBusException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

@SupportsBatching
@Tags(value={"microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Sends the contents of a FlowFile to a Windows Azure Event Hub. Note: the content of the FlowFile will be buffered into memory before being sent, so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available.")
@SystemResourceConsideration(resource=SystemResource.MEMORY)
public class PutAzureEventHub
extends AbstractProcessor {
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("Event Hub Name").description("The name of the Azure Event Hub to send to").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("Event Hub Namespace").description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder().name("Shared Access Policy Name").description("The name of the Event Hub Shared Access Policy. This Policy must have Send permissions.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder().name("Shared Access Policy Primary Key").description("The primary key of the Event Hub Shared Access Policy").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).sensitive(true).required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully sent to the Azure Event Hub will be transferred to this Relationship.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that could not be sent to the Azure Event Hub will be transferred to this Relationship.").build();
    private volatile BlockingQueue<EventHubClient> senderQueue = new LinkedBlockingQueue<EventHubClient>();
    private static final List<PropertyDescriptor> propertyDescriptors;
    private static final Set<Relationship> relationships;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @OnScheduled
    public final void setupClient(ProcessContext context) throws ProcessException {
        String policyName = context.getProperty(ACCESS_POLICY).getValue();
        String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
        String namespace = context.getProperty(NAMESPACE).getValue();
        String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
        int numThreads = context.getMaxConcurrentTasks();
        this.senderQueue = new LinkedBlockingQueue<EventHubClient>(numThreads);
        for (int i = 0; i < numThreads; ++i) {
            EventHubClient client = this.createEventHubClient(namespace, eventHubName, policyName, policyKey);
            if (null == client) continue;
            this.senderQueue.offer(client);
        }
    }

    @OnStopped
    public void tearDown() {
        EventHubClient sender;
        while ((sender = (EventHubClient)this.senderQueue.poll()) != null) {
            sender.close();
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        StopWatch stopWatch = new StopWatch(true);
        byte[] buffer = new byte[(int)flowFile.getSize()];
        session.read(flowFile, in -> StreamUtils.fillBuffer((InputStream)in, (byte[])buffer));
        try {
            this.sendMessage(buffer);
        }
        catch (ProcessException processException) {
            this.getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[]{flowFile, processException}, (Throwable)processException);
            session.transfer(session.penalize(flowFile), REL_FAILURE);
            return;
        }
        String namespace = context.getProperty(NAMESPACE).getValue();
        String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
        session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
        session.transfer(flowFile, REL_SUCCESS);
    }

    protected EventHubClient createEventHubClient(String namespace, String eventHubName, String policyName, String policyKey) throws ProcessException {
        try {
            return (EventHubClient)EventHubClient.createFromConnectionString((String)this.getConnectionString(namespace, eventHubName, policyName, policyKey)).get();
        }
        catch (IllegalConnectionStringFormatException | ServiceBusException | IOException | InterruptedException | ExecutionException e) {
            this.getLogger().error("Failed to create EventHubClient due to {}", e);
            throw new ProcessException(e);
        }
    }

    protected String getConnectionString(String namespace, String eventHubName, String policyName, String policyKey) {
        return new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString();
    }

    protected void sendMessage(byte[] buffer) throws ProcessException {
        EventHubClient sender = (EventHubClient)this.senderQueue.poll();
        if (null != sender) {
            try {
                sender.sendSync(new EventData(buffer));
            }
            catch (ServiceBusException sbe) {
                throw new ProcessException("Caught exception trying to send message to eventbus", (Throwable)sbe);
            }
            finally {
                this.senderQueue.offer(sender);
            }
        } else {
            throw new ProcessException("No EventHubClients are configured for sending");
        }
    }

    static {
        ArrayList<PropertyDescriptor> _propertyDescriptors = new ArrayList<PropertyDescriptor>();
        _propertyDescriptors.add(EVENT_HUB_NAME);
        _propertyDescriptors.add(NAMESPACE);
        _propertyDescriptors.add(ACCESS_POLICY);
        _propertyDescriptors.add(POLICY_PRIMARY_KEY);
        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        _relationships.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(_relationships);
    }
}

