package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processors.standard.syslog.SyslogParser;
import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.JmsProperties;
import org.apache.nifi.processors.standard.util.WrappedMessageProducer;
import org.apache.nifi.stream.io.StreamUtils;

@CapabilityDescription("Creates a JMS Message from the contents of a FlowFile and sends the message to a JMS Server")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"jms", "send", "put"})
@SeeAlso({GetJMSQueue.class, GetJMSTopic.class})
/* loaded from: input_file:org/apache/nifi/processors/standard/PutJMS.class */
public class PutJMS extends AbstractProcessor {
    public static final int DEFAULT_MESSAGE_PRIORITY = 4;
    private final Queue<WrappedMessageProducer> producerQueue = new LinkedBlockingQueue();
    private final List<PropertyDescriptor> properties;
    private final Set<Relationship> relationships;
    public static final Charset UTF8 = Charset.forName(EvaluateXQuery.UTF8);
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are sent to the JMS destination are routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be routed to the JMS destination are routed to this relationship").build();

    public PutJMS() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(JmsProperties.JMS_PROVIDER);
        arrayList.add(JmsProperties.URL);
        arrayList.add(JmsProperties.DESTINATION_NAME);
        arrayList.add(JmsProperties.DESTINATION_TYPE);
        arrayList.add(JmsProperties.TIMEOUT);
        arrayList.add(JmsProperties.BATCH_SIZE);
        arrayList.add(JmsProperties.USERNAME);
        arrayList.add(JmsProperties.PASSWORD);
        arrayList.add(JmsProperties.SSL_CONTEXT_SERVICE);
        arrayList.add(JmsProperties.MESSAGE_TYPE);
        arrayList.add(JmsProperties.MESSAGE_PRIORITY);
        arrayList.add(JmsProperties.REPLY_TO_QUEUE);
        arrayList.add(JmsProperties.MAX_BUFFER_SIZE);
        arrayList.add(JmsProperties.MESSAGE_TTL);
        arrayList.add(JmsProperties.ATTRIBUTES_TO_JMS_PROPS);
        arrayList.add(JmsProperties.CLIENT_ID_PREFIX);
        this.properties = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @OnStopped
    public void cleanupResources() {
        WrappedMessageProducer poll = this.producerQueue.poll();
        while (true) {
            WrappedMessageProducer wrappedMessageProducer = poll;
            if (wrappedMessageProducer == null) {
                return;
            }
            wrappedMessageProducer.close(getLogger());
            poll = this.producerQueue.poll();
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        ProcessorLog logger = getLogger();
        List<FlowFile> list = processSession.get(processContext.getProperty(JmsProperties.BATCH_SIZE).asInteger().intValue());
        if (list.isEmpty()) {
            return;
        }
        WrappedMessageProducer poll = this.producerQueue.poll();
        if (poll == null) {
            try {
                poll = JmsFactory.createMessageProducer(processContext, true);
                logger.info("Connected to JMS server {}", new Object[]{processContext.getProperty(JmsProperties.URL).getValue()});
            } catch (JMSException e) {
                logger.error("Failed to connect to JMS Server due to {}", new Object[]{e});
                processSession.transfer(list, REL_FAILURE);
                processContext.yield();
                return;
            }
        }
        Session session = poll.getSession();
        MessageProducer producer = poll.getProducer();
        int intValue = processContext.getProperty(JmsProperties.MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        try {
            HashSet hashSet = new HashSet();
            for (FlowFile flowFile : list) {
                if (flowFile.getSize() > intValue) {
                    processSession.transfer(flowFile, REL_FAILURE);
                    logger.warn("Routing {} to failure because its size exceeds the configured max", new Object[]{flowFile});
                } else {
                    final byte[] bArr = new byte[(int) flowFile.getSize()];
                    processSession.read(flowFile, new InputStreamCallback() { // from class: org.apache.nifi.processors.standard.PutJMS.1
                        public void process(InputStream inputStream) throws IOException {
                            StreamUtils.fillBuffer(inputStream, bArr, true);
                        }
                    });
                    Long asTimePeriod = processContext.getProperty(JmsProperties.MESSAGE_TTL).asTimePeriod(TimeUnit.MILLISECONDS);
                    String value = processContext.getProperty(JmsProperties.REPLY_TO_QUEUE).evaluateAttributeExpressions(flowFile).getValue();
                    javax.jms.Queue createQueue = value == null ? null : JmsFactory.createQueue(processContext, value);
                    int i = 4;
                    try {
                        Integer asInteger = processContext.getProperty(JmsProperties.MESSAGE_PRIORITY).evaluateAttributeExpressions(flowFile).asInteger();
                        i = asInteger == null ? 4 : asInteger.intValue();
                    } catch (NumberFormatException e2) {
                        logger.warn("Invalid value for JMS Message Priority: {}; defaulting to priority of {}", new Object[]{processContext.getProperty(JmsProperties.MESSAGE_PRIORITY).evaluateAttributeExpressions(flowFile).getValue(), 4});
                    }
                    try {
                        Message createMessage = createMessage(session, processContext, bArr, flowFile, createQueue, Integer.valueOf(i));
                        if (asTimePeriod == null) {
                            producer.setTimeToLive(0L);
                        } else {
                            producer.setTimeToLive(asTimePeriod.longValue());
                        }
                        producer.send(createMessage);
                        hashSet.add(flowFile);
                        processSession.getProvenanceReporter().send(flowFile, processContext.getProperty(JmsProperties.URL).getValue());
                    } catch (JMSException e3) {
                        logger.error("Failed to send {} to JMS Server due to {}", new Object[]{flowFile, e3});
                        processSession.transfer(list, REL_FAILURE);
                        processContext.yield();
                        try {
                            session.rollback();
                        } catch (JMSException e4) {
                            logger.warn("Unable to roll back JMS Session due to {}", new Object[]{e4});
                        }
                        poll.close(logger);
                        if (poll.isClosed()) {
                            return;
                        }
                        this.producerQueue.offer(poll);
                        return;
                    }
                }
            }
            try {
                session.commit();
                processSession.transfer(hashSet, REL_SUCCESS);
                logger.info("Sent {} to JMS Server and transferred to 'success'", new Object[]{hashSet.size() > 10 ? hashSet.size() + " FlowFiles" : hashSet.toString()});
            } catch (JMSException e5) {
                logger.error("Failed to commit JMS Session due to {} and transferred to 'failure'", new Object[]{e5});
                processSession.transfer(list, REL_FAILURE);
                processContext.yield();
                poll.close(logger);
            }
            if (poll.isClosed()) {
                return;
            }
            this.producerQueue.offer(poll);
        } catch (Throwable th) {
            if (!poll.isClosed()) {
                this.producerQueue.offer(poll);
            }
            throw th;
        }
    }

    private Message createMessage(Session session, ProcessContext processContext, byte[] bArr, FlowFile flowFile, Destination destination, Integer num) throws JMSException {
        TextMessage textMessage;
        String value = processContext.getProperty(JmsProperties.MESSAGE_TYPE).getValue();
        boolean z = -1;
        switch (value.hashCode()) {
            case -891990144:
                if (value.equals(JmsProperties.MSG_TYPE_STREAM)) {
                    z = true;
                    break;
                }
                break;
            case 107868:
                if (value.equals(JmsProperties.MSG_TYPE_MAP)) {
                    z = 3;
                    break;
                }
                break;
            case 3039496:
                if (value.equals("byte")) {
                    z = 4;
                    break;
                }
                break;
            case 3556653:
                if (value.equals("text")) {
                    z = 2;
                    break;
                }
                break;
            case 96634189:
                if (value.equals(JmsProperties.MSG_TYPE_EMPTY)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case JmsFactory.DEFAULT_IS_TRANSACTED /* 0 */:
                textMessage = session.createTextMessage("");
                break;
            case SyslogParser.SYSLOG_PRIORITY_POS /* 1 */:
                TextMessage createStreamMessage = session.createStreamMessage();
                createStreamMessage.writeBytes(bArr);
                textMessage = createStreamMessage;
                break;
            case true:
                textMessage = session.createTextMessage(new String(bArr, UTF8));
                break;
            case SyslogParser.SYSLOG_TIMESTAMP_POS /* 3 */:
                textMessage = session.createMapMessage();
                break;
            case true:
            default:
                TextMessage createBytesMessage = session.createBytesMessage();
                createBytesMessage.writeBytes(bArr);
                textMessage = createBytesMessage;
                break;
        }
        textMessage.setJMSTimestamp(System.currentTimeMillis());
        if (destination != null) {
            textMessage.setJMSReplyTo(destination);
        }
        if (num != null) {
            textMessage.setJMSPriority(num.intValue());
        }
        if (processContext.getProperty(JmsProperties.ATTRIBUTES_TO_JMS_PROPS).asBoolean().booleanValue()) {
            copyAttributesToJmsProps(flowFile, textMessage);
        }
        return textMessage;
    }

    private void copyAttributesToJmsProps(FlowFile flowFile, Message message) throws JMSException {
        ProcessorLog logger = getLogger();
        Map attributes = flowFile.getAttributes();
        for (Map.Entry entry : attributes.entrySet()) {
            String str = (String) entry.getKey();
            String str2 = (String) entry.getValue();
            if (str.toLowerCase().startsWith(JmsFactory.ATTRIBUTE_PREFIX.toLowerCase()) && !str.toLowerCase().endsWith(JmsFactory.ATTRIBUTE_TYPE_SUFFIX.toLowerCase())) {
                String substring = str.substring(JmsFactory.ATTRIBUTE_PREFIX.length());
                String str3 = (String) attributes.get(str + JmsFactory.ATTRIBUTE_TYPE_SUFFIX);
                if (str3 != null) {
                    try {
                        if (!str3.equalsIgnoreCase("string")) {
                            if (str3.equalsIgnoreCase(JmsFactory.PROP_TYPE_INTEGER)) {
                                message.setIntProperty(substring, Integer.parseInt(str2));
                            } else if (str3.equalsIgnoreCase(JmsFactory.PROP_TYPE_BOOLEAN)) {
                                message.setBooleanProperty(substring, Boolean.parseBoolean(str2));
                            } else if (str3.equalsIgnoreCase(JmsFactory.PROP_TYPE_SHORT)) {
                                message.setShortProperty(substring, Short.parseShort(str2));
                            } else if (str3.equalsIgnoreCase(JmsFactory.PROP_TYPE_LONG)) {
                                message.setLongProperty(substring, Long.parseLong(str2));
                            } else if (str3.equalsIgnoreCase("byte")) {
                                message.setByteProperty(substring, Byte.parseByte(str2));
                            } else if (str3.equalsIgnoreCase(JmsFactory.PROP_TYPE_DOUBLE)) {
                                message.setDoubleProperty(substring, Double.parseDouble(str2));
                            } else if (str3.equalsIgnoreCase(JmsFactory.PROP_TYPE_FLOAT)) {
                                message.setFloatProperty(substring, Float.parseFloat(str2));
                            } else if (str3.equalsIgnoreCase(JmsFactory.PROP_TYPE_OBJECT)) {
                                message.setObjectProperty(substring, str2);
                            } else {
                                logger.warn("Attribute key '{}' for {} has value '{}', but expected one of: integer, string, object, byte, double, float, long, short, boolean; not adding this property", new Object[]{str, flowFile, str2});
                            }
                        }
                    } catch (NumberFormatException e) {
                        logger.warn("Attribute key '{}' for {} has value '{}', but attribute key '{}' has value '{}'. Not adding this JMS property", new Object[]{str, flowFile, str2, str + JmsFactory.ATTRIBUTE_TYPE_SUFFIX, JmsFactory.PROP_TYPE_INTEGER});
                    }
                }
                message.setStringProperty(substring, str2);
            }
        }
    }
}
