package org.wso2.carbon.sample.kafka.performance;

import com.google.gson.JsonObject;
import java.text.DecimalFormat;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/wso2/carbon/sample/kafka/performance/KafkaClient.class */
public class KafkaClient {
    private static String url;
    private static String topic;
    private static long noOfEvents;
    private static int elapsedCount;
    private static int noOfPublishers;
    private AtomicLong lastIndex = new AtomicLong(0);
    private AtomicBoolean calcInProgress = new AtomicBoolean(false);
    private static Log log = LogFactory.getLog(KafkaClient.class);
    private static AtomicLong count = new AtomicLong(0);

    /* loaded from: input_file:org/wso2/carbon/sample/kafka/performance/KafkaClient$KafkaProducer.class */
    public class KafkaProducer implements Runnable {
        public KafkaProducer() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Properties properties = new Properties();
                properties.put("metadata.broker.list", KafkaClient.url);
                properties.put("serializer.class", "kafka.serializer.StringEncoder");
                properties.put("producer.type", "async");
                Producer producer = new Producer(new ProducerConfig(properties));
                KafkaClient.log.info("Sending messages..");
                long currentTimeMillis = System.currentTimeMillis();
                DecimalFormat decimalFormat = new DecimalFormat("#");
                while (KafkaClient.count.getAndIncrement() < KafkaClient.noOfEvents) {
                    producer.send(new KeyedMessage(KafkaClient.topic, "{\"event\": " + getRandomEvent(KafkaClient.count.get()).toString() + "}"));
                    long j = KafkaClient.count.get() / KafkaClient.elapsedCount;
                    if (KafkaClient.this.lastIndex.get() != j && KafkaClient.this.calcInProgress.compareAndSet(false, true)) {
                        KafkaClient.this.lastIndex.set(j);
                        long currentTimeMillis2 = System.currentTimeMillis();
                        long j2 = currentTimeMillis2 - currentTimeMillis;
                        currentTimeMillis = currentTimeMillis2;
                        KafkaClient.log.info("Sent " + KafkaClient.elapsedCount + " sensor events in " + j2 + " milliseconds with total throughput of " + decimalFormat.format((KafkaClient.elapsedCount / j2) * 1000.0d) + " events per second.");
                        KafkaClient.this.calcInProgress.set(false);
                    }
                }
                KafkaClient.log.info("Sent " + (KafkaClient.count.get() - 1) + " sensor events");
            } catch (Throwable th) {
                KafkaClient.log.error("Error when sending the messages", th);
            }
        }

        private JsonObject getRandomEvent(long j) {
            JsonObject jsonObject = new JsonObject();
            JsonObject jsonObject2 = new JsonObject();
            JsonObject jsonObject3 = new JsonObject();
            JsonObject jsonObject4 = new JsonObject();
            jsonObject2.addProperty("timestamp", Long.valueOf(System.currentTimeMillis()));
            jsonObject2.addProperty("isPowerSaverEnabled", false);
            jsonObject2.addProperty("sensorId", Long.valueOf(j));
            jsonObject2.addProperty("sensorName", "temperature");
            jsonObject3.addProperty("longitude", Double.valueOf(2332.424d));
            jsonObject3.addProperty("latitude", Double.valueOf(2323.23232d));
            jsonObject4.addProperty("humidity", Float.valueOf(2.3f));
            jsonObject4.addProperty("sensorValue", Double.valueOf(23423.234d));
            jsonObject.add("metaData", jsonObject2);
            jsonObject.add("correlationData", jsonObject3);
            jsonObject.add("payloadData", jsonObject4);
            return jsonObject;
        }
    }

    public static void main(String[] strArr) {
        log.info("Starting Kafka Client");
        url = strArr[0];
        topic = strArr[1];
        try {
            noOfEvents = Long.parseLong(strArr[2]);
            elapsedCount = Integer.parseInt(strArr[3]);
            noOfPublishers = Integer.parseInt(strArr[4]);
            new KafkaClient().start();
        } catch (NumberFormatException e) {
            log.error("Entered value for no of events is invalid. Please enter an integer", e);
        }
    }

    private void start() {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(noOfPublishers);
        for (int i = 0; i < noOfPublishers; i++) {
            newFixedThreadPool.execute(new KafkaProducer());
        }
    }
}
