/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
import org.apache.nifi.processors.standard.util.JmsProperties;
import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.IntegerHolder;
import org.apache.nifi.util.StopWatch;

public abstract class JmsConsumer
extends AbstractProcessor {
    public static final String MAP_MESSAGE_PREFIX = "jms.mapmessage.";
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are routed to success").build();
    private final Set<Relationship> relationships;
    private final List<PropertyDescriptor> propertyDescriptors;

    public JmsConsumer() {
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(REL_SUCCESS);
        this.relationships = Collections.unmodifiableSet(rels);
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(JmsProperties.JMS_PROVIDER);
        descriptors.add(JmsProperties.URL);
        descriptors.add(JmsProperties.DESTINATION_NAME);
        descriptors.add(JmsProperties.TIMEOUT);
        descriptors.add(JmsProperties.BATCH_SIZE);
        descriptors.add(JmsProperties.USERNAME);
        descriptors.add(JmsProperties.PASSWORD);
        descriptors.add(JmsProperties.SSL_CONTEXT_SERVICE);
        descriptors.add(JmsProperties.ACKNOWLEDGEMENT_MODE);
        descriptors.add(JmsProperties.MESSAGE_SELECTOR);
        descriptors.add(JmsProperties.JMS_PROPS_TO_ATTRIBUTES);
        descriptors.add(JmsProperties.CLIENT_ID_PREFIX);
        this.propertyDescriptors = Collections.unmodifiableList(descriptors);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.propertyDescriptors;
    }

    public void consume(ProcessContext context, ProcessSession session, WrappedMessageConsumer wrappedConsumer) throws ProcessException {
        ProcessorLog logger = this.getLogger();
        MessageConsumer consumer = wrappedConsumer.getConsumer();
        boolean clientAcknowledge = context.getProperty(JmsProperties.ACKNOWLEDGEMENT_MODE).getValue().equalsIgnoreCase("Client Acknowledge");
        long timeout = context.getProperty(JmsProperties.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
        boolean addAttributes = context.getProperty(JmsProperties.JMS_PROPS_TO_ATTRIBUTES).asBoolean();
        int batchSize = context.getProperty(JmsProperties.BATCH_SIZE).asInteger();
        JmsProcessingSummary processingSummary = new JmsProcessingSummary();
        StopWatch stopWatch = new StopWatch(true);
        for (int i = 0; i < batchSize; ++i) {
            Message message;
            try {
                message = processingSummary.getMessagesReceived() == 0 ? consumer.receive(timeout) : consumer.receiveNoWait();
            }
            catch (JMSException e) {
                logger.error("Failed to receive JMS Message due to {}", (Throwable)e);
                wrappedConsumer.close(logger);
                break;
            }
            if (message == null) break;
            try {
                processingSummary.add(JmsConsumer.map2FlowFile(context, session, message, addAttributes, logger));
                continue;
            }
            catch (Exception e) {
                logger.error("Failed to receive JMS Message due to {}", (Throwable)e);
                wrappedConsumer.close(logger);
                break;
            }
        }
        if (processingSummary.getFlowFilesCreated() == 0) {
            context.yield();
            return;
        }
        session.commit();
        stopWatch.stop();
        if (processingSummary.getFlowFilesCreated() > 0) {
            float secs = (float)stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000.0f;
            float messagesPerSec = (float)processingSummary.getMessagesReceived() / secs;
            String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
            logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), Float.valueOf(messagesPerSec), dataRate});
        }
        Message lastMessage = processingSummary.getLastMessageReceived();
        if (clientAcknowledge && lastMessage != null) {
            try {
                lastMessage.acknowledge();
            }
            catch (JMSException e) {
                logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{processingSummary.getMessagesReceived(), e});
            }
        }
    }

    public static JmsProcessingSummary map2FlowFile(ProcessContext context, ProcessSession session, final Message message, boolean addAttributes, ProcessorLog logger) throws Exception {
        IntegerHolder msgsThisFlowFile = new IntegerHolder(1);
        FlowFile flowFile = session.create();
        try {
            if (message instanceof MapMessage) {
                MapMessage mapMessage = (MapMessage)message;
                flowFile = session.putAllAttributes(flowFile, JmsConsumer.createMapMessageValues(mapMessage));
            } else {
                flowFile = session.write(flowFile, new OutputStreamCallback(){

                    public void process(OutputStream rawOut) throws IOException {
                        try (BufferedOutputStream out = new BufferedOutputStream(rawOut, 65536);){
                            byte[] messageBody = JmsFactory.createByteArray(message);
                            out.write(messageBody);
                        }
                        catch (JMSException e) {
                            throw new ProcessException("Failed to receive JMS Message due to {}", (Throwable)e);
                        }
                    }
                });
            }
            if (addAttributes) {
                flowFile = session.putAllAttributes(flowFile, JmsFactory.createAttributeMap(message));
            }
            session.getProvenanceReporter().receive(flowFile, context.getProperty(JmsProperties.URL).getValue());
            session.transfer(flowFile, REL_SUCCESS);
            logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()});
            return new JmsProcessingSummary(flowFile.getSize(), message, flowFile);
        }
        catch (Exception e) {
            session.remove(flowFile);
            throw e;
        }
    }

    public static Map<String, String> createMapMessageValues(MapMessage mapMessage) throws JMSException {
        HashMap<String, String> valueMap = new HashMap<String, String>();
        Enumeration enumeration = mapMessage.getMapNames();
        while (enumeration.hasMoreElements()) {
            String name = (String)enumeration.nextElement();
            Object value = mapMessage.getObject(name);
            if (value == null) {
                valueMap.put(MAP_MESSAGE_PREFIX + name, "");
                continue;
            }
            valueMap.put(MAP_MESSAGE_PREFIX + name, value.toString());
        }
        return valueMap;
    }
}

