/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.jms.processors;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.processors.AbstractJMSProcessor;
import org.apache.nifi.jms.processors.JMSConsumer;
import org.apache.nifi.jms.processors.PublishJMS;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.springframework.jms.core.JmsTemplate;

@Tags(value={"jms", "get", "message", "receive", "consume"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@CapabilityDescription(value="Consumes JMS Message of type BytesMessage or TextMessage transforming its content to a FlowFile and transitioning it to 'success' relationship. JMS attributes such as headers and properties will be copied as FlowFile attributes.")
@SeeAlso(value={PublishJMS.class, JMSConnectionFactoryProvider.class})
public class ConsumeJMS
extends AbstractJMSProcessor<JMSConsumer> {
    static final AllowableValue AUTO_ACK = new AllowableValue(String.valueOf(1), "AUTO_ACKNOWLEDGE (" + String.valueOf(1) + ")", "Automatically acknowledges a client's receipt of a message, regardless if NiFi session has been commited. Can result in data loss in the event where NiFi abruptly stopped before session was commited.");
    static final AllowableValue CLIENT_ACK = new AllowableValue(String.valueOf(2), "CLIENT_ACKNOWLEDGE (" + String.valueOf(2) + ")", "(DEFAULT) Manually acknowledges a client's receipt of a message after NiFi Session was commited, thus ensuring no data loss");
    static final AllowableValue DUPS_OK = new AllowableValue(String.valueOf(3), "DUPS_OK_ACKNOWLEDGE (" + String.valueOf(3) + ")", "This acknowledgment mode instructs the session to lazily acknowledge the delivery of messages. May result in both data duplication and data loss while achieving the best throughput.");
    public static final String JMS_SOURCE_DESTINATION_NAME = "jms.source.destination";
    static final PropertyDescriptor ACKNOWLEDGEMENT_MODE = new PropertyDescriptor.Builder().name("Acknowledgement Mode").description("The JMS Acknowledgement Mode. Using Auto Acknowledge can cause messages to be lost on restart of NiFi but may provide better performance than Client Acknowledge.").required(true).allowableValues(new AllowableValue[]{AUTO_ACK, CLIENT_ACK, DUPS_OK}).defaultValue(CLIENT_ACK.getValue()).build();
    static final PropertyDescriptor DURABLE_SUBSCRIBER = new PropertyDescriptor.Builder().name("Durable subscription").description("If destination is Topic if present then make it the consumer durable. @see https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createDurableConsumer-javax.jms.Topic-java.lang.String-").required(false).expressionLanguageSupported(true).defaultValue("false").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor SHARED_SUBSCRIBER = new PropertyDescriptor.Builder().name("Shared subscription").description("If destination is Topic if present then make it the consumer shared. @see https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createSharedConsumer-javax.jms.Topic-java.lang.String-").required(false).expressionLanguageSupported(true).defaultValue("false").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor SUBSCRIPTION_NAME = new PropertyDescriptor.Builder().name("Subscription Name").description("The name of the subscription to use if destination is Topic and is shared or durable.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received from the JMS Destination are routed to this relationship").build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> thisPropertyDescriptors;

    @Override
    protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession) throws ProcessException {
        final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
        Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
        boolean durable = durableBoolean == null ? false : durableBoolean;
        Boolean sharedBoolean = context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
        boolean shared = sharedBoolean == null ? false : sharedBoolean;
        String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
        ((JMSConsumer)this.targetResource).consume(destinationName, durable, shared, subscriptionName, new JMSConsumer.ConsumerCallback(){

            @Override
            public void accept(final JMSConsumer.JMSResponse response) {
                if (response != null) {
                    FlowFile flowFile = processSession.create();
                    flowFile = processSession.write(flowFile, new OutputStreamCallback(){

                        public void process(OutputStream out) throws IOException {
                            out.write(response.getMessageBody());
                        }
                    });
                    Map<String, Object> jmsHeaders = response.getMessageHeaders();
                    Map<String, String> jmsProperties = Collections.unmodifiableMap(response.getMessageProperties());
                    flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
                    flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
                    flowFile = processSession.putAttribute(flowFile, ConsumeJMS.JMS_SOURCE_DESTINATION_NAME, destinationName);
                    processSession.getProvenanceReporter().receive(flowFile, destinationName);
                    processSession.transfer(flowFile, REL_SUCCESS);
                    processSession.commit();
                } else {
                    context.yield();
                }
            }
        });
    }

    @Override
    protected JMSConsumer finishBuildingTargetResource(JmsTemplate jmsTemplate, ProcessContext processContext) {
        int ackMode = processContext.getProperty(ACKNOWLEDGEMENT_MODE).asInteger();
        jmsTemplate.setSessionAcknowledgeMode(ackMode);
        return new JMSConsumer(jmsTemplate, this.getLogger());
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return thisPropertyDescriptors;
    }

    private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, Object> jmsAttributes, FlowFile flowFile, ProcessSession processSession) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        for (Map.Entry<String, Object> entry : jmsAttributes.entrySet()) {
            attributes.put(entry.getKey(), String.valueOf(entry.getValue()));
        }
        flowFile = processSession.putAllAttributes(flowFile, attributes);
        return flowFile;
    }

    static {
        ArrayList<PropertyDescriptor> _propertyDescriptors = new ArrayList<PropertyDescriptor>();
        _propertyDescriptors.addAll(propertyDescriptors);
        _propertyDescriptors.add(ACKNOWLEDGEMENT_MODE);
        _propertyDescriptors.add(DURABLE_SUBSCRIBER);
        _propertyDescriptors.add(SHARED_SUBSCRIBER);
        _propertyDescriptors.add(SUBSCRIPTION_NAME);
        thisPropertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        relationships = Collections.unmodifiableSet(_relationships);
    }
}

