/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.jms.processors;

import java.io.InputStream;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.processors.AbstractJMSProcessor;
import org.apache.nifi.jms.processors.ConsumeJMS;
import org.apache.nifi.jms.processors.JMSPublisher;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.stream.io.StreamUtils;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;

@Tags(value={"jms", "put", "message", "send", "publish"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Creates a JMS Message from the contents of a FlowFile and sends it to a JMS Destination (queue or topic) as JMS BytesMessage or TextMessage. FlowFile attributes will be added as JMS headers and/or properties to the outgoing JMS message.")
@ReadsAttributes(value={@ReadsAttribute(attribute="jms_deliveryMode", description="This attribute becomes the JMSDeliveryMode message header. Must be an integer."), @ReadsAttribute(attribute="jms_expiration", description="This attribute becomes the JMSExpiration message header. Must be an integer."), @ReadsAttribute(attribute="jms_priority", description="This attribute becomes the JMSPriority message header. Must be an integer."), @ReadsAttribute(attribute="jms_redelivered", description="This attribute becomes the JMSRedelivered message header."), @ReadsAttribute(attribute="jms_timestamp", description="This attribute becomes the JMSTimestamp message header. Must be a long."), @ReadsAttribute(attribute="jms_correlationId", description="This attribute becomes the JMSCorrelationID message header."), @ReadsAttribute(attribute="jms_type", description="This attribute becomes the JMSType message header. Must be an integer."), @ReadsAttribute(attribute="jms_replyTo", description="This attribute becomes the JMSReplyTo message header. Must be an integer."), @ReadsAttribute(attribute="jms_destination", description="This attribute becomes the JMSDestination message header. Must be an integer."), @ReadsAttribute(attribute="other attributes", description="All other attributes that do not start with jms_ are added as message properties."), @ReadsAttribute(attribute="other attributes .type", description="When an attribute will be added as a message property, a second attribute of the same name but with an extra `.type` at the end will cause the message property to be sent using that strong type. For example, attribute `delay` with value `12000` and another attribute `delay.type` with value `integer` will cause a JMS message property `delay` to be sent as an Integer rather than a String. Supported types are boolean, byte, short, integer, long, float, double, and string (which is the default).")})
@SeeAlso(value={ConsumeJMS.class, JMSConnectionFactoryProvider.class})
@SystemResourceConsideration(resource=SystemResource.MEMORY)
public class PublishJMS
extends AbstractJMSProcessor<JMSPublisher> {
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are sent to the JMS destination are routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be sent to JMS destination are routed to this relationship").build();
    private static final Set<Relationship> relationships;

    @Override
    protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession, JMSPublisher publisher) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile != null) {
            try {
                String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
                String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
                switch (context.getProperty(MESSAGE_BODY).getValue()) {
                    case "text": {
                        try {
                            publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), (Map<String, String>)flowFile.getAttributes());
                            break;
                        }
                        catch (Exception e) {
                            publisher.setValid(false);
                            throw e;
                        }
                    }
                    default: {
                        try {
                            publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), (Map<String, String>)flowFile.getAttributes());
                            break;
                        }
                        catch (Exception e) {
                            publisher.setValid(false);
                            throw e;
                        }
                    }
                }
                processSession.transfer(flowFile, REL_SUCCESS);
                processSession.getProvenanceReporter().send(flowFile, destinationName);
            }
            catch (Exception e) {
                processSession.transfer(flowFile, REL_FAILURE);
                this.getLogger().error("Failed while sending message to JMS via " + publisher, (Throwable)e);
                context.yield();
                publisher.setValid(false);
            }
        }
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override
    protected JMSPublisher finishBuildingJmsWorker(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext) {
        return new JMSPublisher(connectionFactory, jmsTemplate, this.getLogger());
    }

    private byte[] extractMessageBody(FlowFile flowFile, ProcessSession session) {
        byte[] messageContent = new byte[(int)flowFile.getSize()];
        session.read(flowFile, in -> StreamUtils.fillBuffer((InputStream)in, (byte[])messageContent, (boolean)true));
        return messageContent;
    }

    private String extractTextMessageBody(FlowFile flowFile, ProcessSession session, String charset) {
        StringWriter writer = new StringWriter();
        session.read(flowFile, in -> IOUtils.copy((InputStream)in, (Writer)writer, (Charset)Charset.forName(charset)));
        return writer.toString();
    }

    static {
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        _relationships.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(_relationships);
    }
}

