/*
 * Decompiled with CFR 0.152.
 */
package org.skyscreamer.nevado.jms.performance;

import java.io.Serializable;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.commons.lang.time.StopWatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.skyscreamer.nevado.jms.performance.PerformancePayload;
import org.skyscreamer.nevado.jms.performance.PerformanceResult;
import org.skyscreamer.nevado.jms.performance.PerformanceTally;

public class PerformanceService {
    private final Log _log = LogFactory.getLog(this.getClass());

    public PerformanceResult runSample(Connection connection, int numMessages, int messageSize, int numThreads, long messageSendDelayMs) throws JMSException, InterruptedException {
        Session[] sessions = new Session[numThreads];
        Queue[] queues = new Queue[numThreads];
        this.initializeSessionsAndQueues(connection, sessions, queues, numThreads);
        long sendTime = this.sendMessages(numMessages, messageSize, numThreads, messageSendDelayMs, sessions, queues);
        PerformanceTally tally = new PerformanceTally();
        long receiveTime = this.receiveMessages(numMessages, numThreads, sessions, queues, tally);
        PerformanceResult result = new PerformanceResult(numMessages * numThreads, tally, messageSize, numThreads, sendTime, receiveTime);
        return result;
    }

    private long receiveMessages(int numMessages, int numThreads, Session[] sessions, Queue[] queues, PerformanceTally totalTally) throws JMSException, InterruptedException {
        int i;
        StopWatch swReceive = new StopWatch();
        swReceive.start();
        PerformanceTally[] tallies = new PerformanceTally[numThreads];
        Thread[] threads = new Thread[numThreads];
        for (i = 0; i < numThreads; ++i) {
            tallies[i] = new PerformanceTally();
            threads[i] = this.createReceiverThread(tallies[i], numMessages, sessions[i], queues[i]);
            threads[i].start();
        }
        for (i = 0; i < numThreads; ++i) {
            threads[i].join();
        }
        swReceive.stop();
        for (i = 0; i < numThreads; ++i) {
            totalTally.add(tallies[i]);
        }
        return swReceive.getTime();
    }

    private Thread createReceiverThread(final PerformanceTally tally, final int numMessages, final Session session, final Queue queue) throws JMSException {
        return new Thread(new Runnable(){

            @Override
            public void run() {
                Integer[] msgCounter = PerformanceService.this.createMessageCounter(numMessages);
                int numOutOfOrder = 0;
                try {
                    numOutOfOrder = PerformanceService.this.receiveMessages(numMessages, session, queue, msgCounter);
                    tally.setNumOutOfOrder(numOutOfOrder);
                    tally.setNumMessagesReceived(PerformanceService.this.countAllMessages(msgCounter));
                    tally.setNumDupMessages(PerformanceService.this.countDupMessages(msgCounter));
                    tally.setNumMissedMessages(PerformanceService.this.countMissedMessages(msgCounter));
                }
                catch (JMSException e) {
                    PerformanceService.this._log.error((Object)"Error receiving message", (Throwable)e);
                }
            }
        });
    }

    private void initializeSessionsAndQueues(Connection connection, Session[] sessions, Queue[] queues, int numThreads) throws JMSException {
        for (int i = 0; i < numThreads; ++i) {
            sessions[i] = connection.createSession(false, 1);
            queues[i] = sessions[i].createTemporaryQueue();
        }
    }

    private long sendMessages(int numMessages, int messageSize, int numThreads, long messageSendDelayMs, Session[] sessions, Queue[] queues) throws InterruptedException {
        int i;
        StopWatch swSend = new StopWatch();
        swSend.start();
        Thread[] threads = new Thread[numThreads];
        for (i = 0; i < numThreads; ++i) {
            threads[i] = this.createSenderThread(numMessages, messageSize, messageSendDelayMs, sessions[i], queues[i]);
            threads[i].start();
        }
        for (i = 0; i < numThreads; ++i) {
            threads[i].join();
        }
        swSend.stop();
        return swSend.getTime();
    }

    private Thread createSenderThread(final int numMessages, final int messageSize, final long messageSendDelayMs, final Session session, final Queue queue) {
        return new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    PerformanceService.this.sendMessages(numMessages, messageSize, session, queue, messageSendDelayMs);
                }
                catch (Exception e) {
                    PerformanceService.this._log.error((Object)"Error sending messages", (Throwable)e);
                }
            }
        });
    }

    private int countAllMessages(Integer[] msgCounter) {
        int count = 0;
        for (int i = 0; i < msgCounter.length; ++i) {
            count += msgCounter[i].intValue();
        }
        return count;
    }

    private int countMissedMessages(Integer[] msgCounter) {
        int count = 0;
        for (int i = 0; i < msgCounter.length; ++i) {
            if (msgCounter[i] != 0) continue;
            ++count;
        }
        return count;
    }

    private int countDupMessages(Integer[] msgCounter) {
        int count = 0;
        for (int i = 0; i < msgCounter.length; ++i) {
            if (msgCounter[i] <= 1) continue;
            count += msgCounter[i] - 1;
        }
        return count;
    }

    private Integer[] createMessageCounter(int numMessages) {
        Integer[] msgCounter = new Integer[numMessages];
        for (int i = 0; i < numMessages; ++i) {
            msgCounter[i] = 0;
        }
        return msgCounter;
    }

    private int receiveMessages(int numMessages, Session session, Queue queue, Integer[] msgCounter) throws JMSException {
        Message msg;
        int lastMsgId = -1;
        int outOfOrderCount = 0;
        MessageConsumer consumer = session.createConsumer((Destination)queue);
        while ((msg = consumer.receive(100L)) != null) {
            PerformancePayload payload = (PerformancePayload)((ObjectMessage)msg).getObject();
            if (payload.getId() <= lastMsgId) {
                ++outOfOrderCount;
            }
            Integer[] integerArray = msgCounter;
            int n = payload.getId();
            Integer.valueOf(integerArray[n] + 1);
            lastMsgId = payload.getId();
        }
        return outOfOrderCount;
    }

    private void sendMessages(int numMessages, int messageSize, Session session, Queue queue, long messageSendDelayMs) throws JMSException, InterruptedException {
        MessageProducer producer = session.createProducer((Destination)queue);
        for (int i = 0; i < numMessages; ++i) {
            ObjectMessage msg = session.createObjectMessage((Serializable)new PerformancePayload(i, messageSize));
            producer.send((Message)msg);
            Thread.sleep(messageSendDelayMs);
        }
    }
}

