/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.annotation.CapabilityDescription;
import org.apache.nifi.processor.annotation.OnStopped;
import org.apache.nifi.processor.annotation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.JmsProperties;
import org.apache.nifi.processors.standard.util.WrappedMessageProducer;
import org.apache.nifi.stream.io.StreamUtils;

@Tags(value={"jms", "send", "put"})
@CapabilityDescription(value="Creates a JMS Message from the contents of a FlowFile and sends the message to a JMS Server")
public class PutJMS
extends AbstractProcessor {
    public static final Charset UTF8 = Charset.forName("UTF-8");
    public static final int DEFAULT_MESSAGE_PRIORITY = 4;
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are sent to the JMS destination are routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be routed to the JMS destination are routed to this relationship").build();
    private final java.util.Queue<WrappedMessageProducer> producerQueue = new LinkedBlockingQueue<WrappedMessageProducer>();
    private final List<PropertyDescriptor> properties;
    private final Set<Relationship> relationships;

    public PutJMS() {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(JmsProperties.JMS_PROVIDER);
        descriptors.add(JmsProperties.URL);
        descriptors.add(JmsProperties.DESTINATION_NAME);
        descriptors.add(JmsProperties.DESTINATION_TYPE);
        descriptors.add(JmsProperties.TIMEOUT);
        descriptors.add(JmsProperties.BATCH_SIZE);
        descriptors.add(JmsProperties.USERNAME);
        descriptors.add(JmsProperties.PASSWORD);
        descriptors.add(JmsProperties.MESSAGE_TYPE);
        descriptors.add(JmsProperties.MESSAGE_PRIORITY);
        descriptors.add(JmsProperties.REPLY_TO_QUEUE);
        descriptors.add(JmsProperties.MAX_BUFFER_SIZE);
        descriptors.add(JmsProperties.MESSAGE_TTL);
        descriptors.add(JmsProperties.ATTRIBUTES_TO_JMS_PROPS);
        descriptors.add(JmsProperties.CLIENT_ID_PREFIX);
        this.properties = Collections.unmodifiableList(descriptors);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @OnStopped
    public void cleanupResources() {
        WrappedMessageProducer wrappedProducer = this.producerQueue.poll();
        while (wrappedProducer != null) {
            wrappedProducer.close(this.getLogger());
            wrappedProducer = this.producerQueue.poll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        ProcessorLog logger = this.getLogger();
        List flowFiles = session.get(context.getProperty(JmsProperties.BATCH_SIZE).asInteger().intValue());
        if (flowFiles.isEmpty()) {
            return;
        }
        WrappedMessageProducer wrappedProducer = this.producerQueue.poll();
        if (wrappedProducer == null) {
            try {
                wrappedProducer = JmsFactory.createMessageProducer(context, true);
                logger.info("Connected to JMS server {}", new Object[]{context.getProperty(JmsProperties.URL).getValue()});
            }
            catch (JMSException e) {
                logger.error("Failed to connect to JMS Server due to {}", new Object[]{e});
                session.transfer((Collection)flowFiles, REL_FAILURE);
                context.yield();
                return;
            }
        }
        Session jmsSession = wrappedProducer.getSession();
        MessageProducer producer = wrappedProducer.getProducer();
        int maxBufferSize = context.getProperty(JmsProperties.MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        try {
            HashSet<FlowFile> successfulFlowFiles = new HashSet<FlowFile>();
            for (FlowFile flowFile : flowFiles) {
                if (flowFile.getSize() > (long)maxBufferSize) {
                    session.transfer(flowFile, REL_FAILURE);
                    logger.warn("Routing {} to failure because its size exceeds the configured max", new Object[]{flowFile});
                    continue;
                }
                final byte[] messageContent = new byte[(int)flowFile.getSize()];
                session.read(flowFile, new InputStreamCallback(){

                    public void process(InputStream in) throws IOException {
                        StreamUtils.fillBuffer((InputStream)in, (byte[])messageContent, (boolean)true);
                    }
                });
                Long ttl = context.getProperty(JmsProperties.MESSAGE_TTL).asTimePeriod(TimeUnit.MILLISECONDS);
                String replyToQueueName = context.getProperty(JmsProperties.REPLY_TO_QUEUE).evaluateAttributeExpressions(flowFile).getValue();
                Queue replyToQueue = replyToQueueName == null ? null : JmsFactory.createQueue(context, replyToQueueName);
                int priority = 4;
                try {
                    Integer priorityInt = context.getProperty(JmsProperties.MESSAGE_PRIORITY).evaluateAttributeExpressions(flowFile).asInteger();
                    priority = priorityInt == null ? priority : priorityInt;
                }
                catch (NumberFormatException e) {
                    logger.warn("Invalid value for JMS Message Priority: {}; defaulting to priority of {}", new Object[]{context.getProperty(JmsProperties.MESSAGE_PRIORITY).evaluateAttributeExpressions(flowFile).getValue(), 4});
                }
                try {
                    Message message = this.createMessage(jmsSession, context, messageContent, flowFile, (Destination)replyToQueue, priority);
                    if (ttl == null) {
                        producer.setTimeToLive(0L);
                    } else {
                        producer.setTimeToLive(ttl.longValue());
                    }
                    producer.send(message);
                }
                catch (JMSException e) {
                    logger.error("Failed to send {} to JMS Server due to {}", new Object[]{flowFile, e});
                    session.transfer((Collection)flowFiles, REL_FAILURE);
                    context.yield();
                    try {
                        jmsSession.rollback();
                    }
                    catch (JMSException jmse) {
                        logger.warn("Unable to roll back JMS Session due to {}", new Object[]{jmse});
                    }
                    wrappedProducer.close(logger);
                    if (!wrappedProducer.isClosed()) {
                        this.producerQueue.offer(wrappedProducer);
                    }
                    return;
                }
                successfulFlowFiles.add(flowFile);
                session.getProvenanceReporter().send(flowFile, "jms://" + context.getProperty(JmsProperties.URL).getValue());
            }
            try {
                jmsSession.commit();
                session.transfer(successfulFlowFiles, REL_SUCCESS);
                String flowFileDescription = successfulFlowFiles.size() > 10 ? successfulFlowFiles.size() + " FlowFiles" : ((Object)successfulFlowFiles).toString();
                logger.info("Sent {} to JMS Server and transferred to 'success'", new Object[]{flowFileDescription});
            }
            catch (JMSException e) {
                logger.error("Failed to commit JMS Session due to {}; rolling back session", new Object[]{e});
                session.rollback();
                wrappedProducer.close(logger);
            }
        }
        finally {
            if (!wrappedProducer.isClosed()) {
                this.producerQueue.offer(wrappedProducer);
            }
        }
    }

    private Message createMessage(Session jmsSession, ProcessContext context, byte[] messageContent, FlowFile flowFile, Destination replyToQueue, Integer priority) throws JMSException {
        TextMessage message;
        switch (context.getProperty(JmsProperties.MESSAGE_TYPE).getValue()) {
            case "empty": {
                message = jmsSession.createTextMessage("");
                break;
            }
            case "stream": {
                StreamMessage streamMessage = jmsSession.createStreamMessage();
                streamMessage.writeBytes(messageContent);
                message = streamMessage;
                break;
            }
            case "text": {
                message = jmsSession.createTextMessage(new String(messageContent, UTF8));
                break;
            }
            default: {
                BytesMessage bytesMessage = jmsSession.createBytesMessage();
                bytesMessage.writeBytes(messageContent);
                message = bytesMessage;
            }
        }
        message.setJMSTimestamp(System.currentTimeMillis());
        if (replyToQueue != null) {
            message.setJMSReplyTo(replyToQueue);
        }
        if (priority != null) {
            message.setJMSPriority(priority.intValue());
        }
        if (context.getProperty(JmsProperties.ATTRIBUTES_TO_JMS_PROPS).asBoolean().booleanValue()) {
            this.copyAttributesToJmsProps(flowFile, (Message)message);
        }
        return message;
    }

    private void copyAttributesToJmsProps(FlowFile flowFile, Message message) throws JMSException {
        ProcessorLog logger = this.getLogger();
        Map attributes = flowFile.getAttributes();
        for (Map.Entry entry : attributes.entrySet()) {
            String key = (String)entry.getKey();
            String value = (String)entry.getValue();
            if (!key.toLowerCase().startsWith("jms.".toLowerCase()) || key.toLowerCase().endsWith(".type".toLowerCase())) continue;
            String jmsPropName = key.substring("jms.".length());
            String type = (String)attributes.get(key + ".type");
            try {
                if (type == null || type.equalsIgnoreCase("string")) {
                    message.setStringProperty(jmsPropName, value);
                    continue;
                }
                if (type.equalsIgnoreCase("integer")) {
                    message.setIntProperty(jmsPropName, Integer.parseInt(value));
                    continue;
                }
                if (type.equalsIgnoreCase("boolean")) {
                    message.setBooleanProperty(jmsPropName, Boolean.parseBoolean(value));
                    continue;
                }
                if (type.equalsIgnoreCase("short")) {
                    message.setShortProperty(jmsPropName, Short.parseShort(value));
                    continue;
                }
                if (type.equalsIgnoreCase("long")) {
                    message.setLongProperty(jmsPropName, Long.parseLong(value));
                    continue;
                }
                if (type.equalsIgnoreCase("byte")) {
                    message.setByteProperty(jmsPropName, Byte.parseByte(value));
                    continue;
                }
                if (type.equalsIgnoreCase("double")) {
                    message.setDoubleProperty(jmsPropName, Double.parseDouble(value));
                    continue;
                }
                if (type.equalsIgnoreCase("float")) {
                    message.setFloatProperty(jmsPropName, Float.parseFloat(value));
                    continue;
                }
                if (type.equalsIgnoreCase("object")) {
                    message.setObjectProperty(jmsPropName, (Object)value);
                    continue;
                }
                logger.warn("Attribute key '{}' for {} has value '{}', but expected one of: integer, string, object, byte, double, float, long, short, boolean; not adding this property", new Object[]{key, flowFile, value});
            }
            catch (NumberFormatException e) {
                logger.warn("Attribute key '{}' for {} has value '{}', but attribute key '{}' has value '{}'. Not adding this JMS property", new Object[]{key, flowFile, value, key + ".type", "integer"});
            }
        }
    }
}

