/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.JmsProperties;
import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.BooleanHolder;
import org.apache.nifi.util.IntegerHolder;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;

public abstract class JmsConsumer
extends AbstractProcessor {
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are routed to success").build();
    private final Set<Relationship> relationships;
    private final List<PropertyDescriptor> propertyDescriptors;

    public JmsConsumer() {
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(REL_SUCCESS);
        this.relationships = Collections.unmodifiableSet(rels);
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(JmsProperties.JMS_PROVIDER);
        descriptors.add(JmsProperties.URL);
        descriptors.add(JmsProperties.DESTINATION_NAME);
        descriptors.add(JmsProperties.TIMEOUT);
        descriptors.add(JmsProperties.BATCH_SIZE);
        descriptors.add(JmsProperties.USERNAME);
        descriptors.add(JmsProperties.PASSWORD);
        descriptors.add(JmsProperties.ACKNOWLEDGEMENT_MODE);
        descriptors.add(JmsProperties.MESSAGE_SELECTOR);
        descriptors.add(JmsProperties.JMS_PROPS_TO_ATTRIBUTES);
        descriptors.add(JmsProperties.CLIENT_ID_PREFIX);
        this.propertyDescriptors = Collections.unmodifiableList(descriptors);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.propertyDescriptors;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void consume(ProcessContext context, ProcessSession session, WrappedMessageConsumer wrappedConsumer) throws ProcessException {
        final ProcessorLog logger = this.getLogger();
        MessageConsumer consumer = wrappedConsumer.getConsumer();
        boolean clientAcknowledge = context.getProperty(JmsProperties.ACKNOWLEDGEMENT_MODE).getValue().equalsIgnoreCase("Client Acknowledge");
        long timeout = context.getProperty(JmsProperties.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
        final boolean addAttributes = context.getProperty(JmsProperties.JMS_PROPS_TO_ATTRIBUTES).asBoolean();
        int batchSize = context.getProperty(JmsProperties.BATCH_SIZE).asInteger();
        final ObjectHolder lastMessageReceived = new ObjectHolder(null);
        final ObjectHolder attributesFromJmsProps = new ObjectHolder(null);
        HashSet<FlowFile> allFlowFilesCreated = new HashSet<FlowFile>();
        final IntegerHolder messagesReceived = new IntegerHolder(0);
        final LongHolder bytesReceived = new LongHolder(0L);
        StopWatch stopWatch = new StopWatch(true);
        for (int i = 0; i < batchSize; ++i) {
            FlowFile flowFile;
            IntegerHolder msgsThisFlowFile;
            block14: {
                Message message;
                final BooleanHolder failure = new BooleanHolder(false);
                try {
                    message = (Integer)messagesReceived.get() == 0 ? consumer.receive(timeout) : consumer.receiveNoWait();
                }
                catch (JMSException e) {
                    logger.error("Failed to receive JMS Message due to {}", (Throwable)e);
                    wrappedConsumer.close(logger);
                    failure.set((Object)true);
                    break;
                }
                if (message == null) break;
                msgsThisFlowFile = new IntegerHolder(0);
                flowFile = session.create();
                try {
                    flowFile = session.write(flowFile, new OutputStreamCallback(){

                        public void process(OutputStream rawOut) throws IOException {
                            try (BufferedOutputStream out = new BufferedOutputStream(rawOut, 65536);){
                                messagesReceived.getAndIncrement();
                                Map<String, String> attributes = addAttributes ? JmsFactory.createAttributeMap(message) : null;
                                attributesFromJmsProps.set(attributes);
                                byte[] messageBody = JmsFactory.createByteArray(message);
                                out.write(messageBody);
                                bytesReceived.addAndGet((long)messageBody.length);
                                msgsThisFlowFile.incrementAndGet();
                                lastMessageReceived.set((Object)message);
                            }
                            catch (JMSException e) {
                                logger.error("Failed to receive JMS Message due to {}", (Throwable)e);
                                failure.set((Object)true);
                            }
                        }
                    });
                    if (!((Boolean)failure.get()).booleanValue()) break block14;
                }
                catch (Throwable throwable) {
                    if (((Boolean)failure.get()).booleanValue()) {
                        session.remove(flowFile);
                        wrappedConsumer.close(logger);
                    } else {
                        allFlowFilesCreated.add(flowFile);
                        Map attributes = (Map)attributesFromJmsProps.get();
                        if (attributes != null) {
                            flowFile = session.putAllAttributes(flowFile, attributes);
                        }
                        session.getProvenanceReporter().receive(flowFile, context.getProperty(JmsProperties.URL).getValue());
                        session.transfer(flowFile, REL_SUCCESS);
                        logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()});
                    }
                    throw throwable;
                }
                session.remove(flowFile);
                wrappedConsumer.close(logger);
                continue;
            }
            allFlowFilesCreated.add(flowFile);
            Map attributes = (Map)attributesFromJmsProps.get();
            if (attributes != null) {
                flowFile = session.putAllAttributes(flowFile, attributes);
            }
            session.getProvenanceReporter().receive(flowFile, context.getProperty(JmsProperties.URL).getValue());
            session.transfer(flowFile, REL_SUCCESS);
            logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()});
        }
        if (allFlowFilesCreated.isEmpty()) {
            context.yield();
            return;
        }
        session.commit();
        stopWatch.stop();
        if (!allFlowFilesCreated.isEmpty()) {
            float secs = (float)stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000.0f;
            float messagesPerSec = (float)((Integer)messagesReceived.get()).intValue() / secs;
            String dataRate = stopWatch.calculateDataRate(((Long)bytesReceived.get()).longValue());
            logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{messagesReceived.get(), stopWatch.getDuration(TimeUnit.MILLISECONDS), Float.valueOf(messagesPerSec), dataRate});
        }
        Message lastMessage = (Message)lastMessageReceived.get();
        if (clientAcknowledge && lastMessage != null) {
            try {
                lastMessage.acknowledge();
            }
            catch (JMSException e) {
                logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{messagesReceived.get(), e});
            }
        }
    }
}

