/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.carbon.sample.kafka.performance;

import java.text.DecimalFormat;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ConsumerTest
implements Runnable {
    private static Log log = LogFactory.getLog(ConsumerTest.class);
    DecimalFormat decimalFormat = new DecimalFormat("#");
    private KafkaStream kafkaStream;
    private long sentTime = 0L;
    private long receivedTime = 0L;
    private static int elapsedCount = 100000;
    private static AtomicLong eventCount = new AtomicLong(0L);
    private static AtomicLong latency = new AtomicLong(0L);
    private static AtomicLong lastTime = new AtomicLong(System.currentTimeMillis());

    public ConsumerTest(KafkaStream stream) {
        this.kafkaStream = stream;
    }

    @Override
    public void run() {
        try {
            log.info((Object)"start consuming");
            ConsumerIterator iterator = this.kafkaStream.iterator();
            Pattern eventPattern = Pattern.compile("(timestamp\":(\\d+))");
            while (iterator.hasNext()) {
                String message = new String((byte[])iterator.next().message());
                this.receivedTime = System.currentTimeMillis();
                Matcher eventPatternMatcher = eventPattern.matcher(message);
                if (eventPatternMatcher.find()) {
                    this.sentTime = Long.parseLong(eventPatternMatcher.group(2));
                } else {
                    log.error((Object)"unable to extract timestamp from received event");
                }
                latency.addAndGet(this.receivedTime - this.sentTime);
                eventCount.incrementAndGet();
                if (eventCount.get() % (long)elapsedCount != 0L) continue;
                long currentTime = System.currentTimeMillis();
                long elapsedTime = currentTime - lastTime.getAndSet(currentTime);
                double throughputPerSecond = (double)elapsedCount / (double)elapsedTime * 1000.0;
                log.info((Object)("Received " + elapsedCount + " sensor events in " + elapsedTime + " milliseconds with total throughput of " + this.decimalFormat.format(throughputPerSecond) + " events per second. Average latency is " + (double)latency.get() / (double)elapsedCount + " milliseconds per event."));
                latency.set(0L);
            }
            log.info((Object)("Received Total of " + eventCount.get() + " sensor events"));
        }
        catch (Throwable t) {
            log.error((Object)"Error when receiving messages", t);
        }
    }
}

