package org.wso2.carbon.sample.kafka.performance;

import java.text.DecimalFormat;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/wso2/carbon/sample/kafka/performance/ConsumerTest.class */
public class ConsumerTest implements Runnable {
    private KafkaStream kafkaStream;
    private static Log log = LogFactory.getLog(ConsumerTest.class);
    private static int elapsedCount = 100000;
    private static AtomicLong eventCount = new AtomicLong(0);
    private static AtomicLong latency = new AtomicLong(0);
    private static AtomicLong lastTime = new AtomicLong(System.currentTimeMillis());
    DecimalFormat decimalFormat = new DecimalFormat("#");
    private long sentTime = 0;
    private long receivedTime = 0;

    public ConsumerTest(KafkaStream kafkaStream) {
        this.kafkaStream = kafkaStream;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            log.info("start consuming");
            ConsumerIterator it = this.kafkaStream.iterator();
            Pattern compile = Pattern.compile("(timestamp\":(\\d+))");
            while (it.hasNext()) {
                String str = new String((byte[]) it.next().message());
                this.receivedTime = System.currentTimeMillis();
                Matcher matcher = compile.matcher(str);
                if (matcher.find()) {
                    this.sentTime = Long.parseLong(matcher.group(2));
                } else {
                    log.error("unable to extract timestamp from received event");
                }
                latency.addAndGet(this.receivedTime - this.sentTime);
                eventCount.incrementAndGet();
                if (eventCount.get() % elapsedCount == 0) {
                    long currentTimeMillis = System.currentTimeMillis();
                    long andSet = currentTimeMillis - lastTime.getAndSet(currentTimeMillis);
                    log.info("Received " + elapsedCount + " sensor events in " + andSet + " milliseconds with total throughput of " + this.decimalFormat.format((elapsedCount / andSet) * 1000.0d) + " events per second. Average latency is " + (latency.get() / elapsedCount) + " milliseconds per event.");
                    latency.set(0L);
                }
            }
            log.info("Received Total of " + eventCount.get() + " sensor events");
        } catch (Throwable th) {
            log.error("Error when receiving messages", th);
        }
    }
}
