/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.carbon.sample.kafka.performance;

import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.text.DecimalFormat;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class KafkaClient {
    private static Log log = LogFactory.getLog(KafkaClient.class);
    private static String url;
    private static String topic;
    private static long noOfEvents;
    private static int elapsedCount;
    private static int noOfPublishers;
    private static AtomicLong count;
    private AtomicLong lastIndex = new AtomicLong(0L);
    private AtomicBoolean calcInProgress = new AtomicBoolean(false);

    public static void main(String[] args) {
        log.info((Object)"Starting Kafka Client");
        url = args[0];
        topic = args[1];
        try {
            noOfEvents = Long.parseLong(args[2]);
            elapsedCount = Integer.parseInt(args[3]);
            noOfPublishers = Integer.parseInt(args[4]);
            KafkaClient kafkaClient = new KafkaClient();
            kafkaClient.start();
        }
        catch (NumberFormatException e) {
            log.error((Object)"Entered value for no of events is invalid. Please enter an integer", (Throwable)e);
        }
    }

    private void start() {
        ExecutorService executor = Executors.newFixedThreadPool(noOfPublishers);
        for (int i = 0; i < noOfPublishers; ++i) {
            executor.execute(new KafkaProducer());
        }
    }

    static {
        count = new AtomicLong(0L);
    }

    public class KafkaProducer
    implements Runnable {
        @Override
        public void run() {
            try {
                Properties props = new Properties();
                props.put("metadata.broker.list", url);
                props.put("serializer.class", "kafka.serializer.StringEncoder");
                props.put("producer.type", "async");
                ProducerConfig config = new ProducerConfig(props);
                Producer producer = new Producer(config);
                log.info((Object)"Sending messages..");
                long lastTime = System.currentTimeMillis();
                DecimalFormat decimalFormat = new DecimalFormat("#");
                while (count.getAndIncrement() < noOfEvents) {
                    String message = "{\"event\": " + this.getRandomEvent(count.get()).toString() + "}";
                    KeyedMessage data = new KeyedMessage(topic, (Object)message);
                    producer.send(data);
                    long index = count.get() / (long)elapsedCount;
                    if (KafkaClient.this.lastIndex.get() == index || !KafkaClient.this.calcInProgress.compareAndSet(false, true)) continue;
                    KafkaClient.this.lastIndex.set(index);
                    long currentTime = System.currentTimeMillis();
                    long elapsedTime = currentTime - lastTime;
                    double throughputPerSecond = (double)elapsedCount / (double)elapsedTime * 1000.0;
                    lastTime = currentTime;
                    log.info((Object)("Sent " + elapsedCount + " sensor events in " + elapsedTime + " milliseconds with total throughput of " + decimalFormat.format(throughputPerSecond) + " events per second."));
                    KafkaClient.this.calcInProgress.set(false);
                }
                log.info((Object)("Sent " + (count.get() - 1L) + " sensor events"));
            }
            catch (Throwable t) {
                log.error((Object)"Error when sending the messages", t);
            }
        }

        private JsonObject getRandomEvent(long count) {
            JsonObject event = new JsonObject();
            JsonObject metaData = new JsonObject();
            JsonObject correlationData = new JsonObject();
            JsonObject payLoadData = new JsonObject();
            metaData.addProperty("timestamp", (Number)System.currentTimeMillis());
            metaData.addProperty("isPowerSaverEnabled", Boolean.valueOf(false));
            metaData.addProperty("sensorId", (Number)count);
            metaData.addProperty("sensorName", "temperature");
            correlationData.addProperty("longitude", (Number)2332.424);
            correlationData.addProperty("latitude", (Number)2323.23232);
            payLoadData.addProperty("humidity", (Number)Float.valueOf(2.3f));
            payLoadData.addProperty("sensorValue", (Number)23423.234);
            event.add("metaData", (JsonElement)metaData);
            event.add("correlationData", (JsonElement)correlationData);
            event.add("payloadData", (JsonElement)payLoadData);
            return event;
        }
    }
}

