/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.performance;

import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.stream.output.StreamCallback;
import java.util.concurrent.atomic.AtomicInteger;

public class SimplePartitionedFilterQueryPerformance {
    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();
        String siddhiApp = "define stream cseEventStream (symbol string, price float, volume long, timestamp long);partition with (symbol of cseEventStream) begin    @info(name = 'query1')    from cseEventStream[700 > price]    select *    insert into outputStream ;end;";
        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
        siddhiAppRuntime.addCallback("outputStream", new StreamCallback(){
            public AtomicInteger eventCount = new AtomicInteger();
            public int timeSpent = 0;
            long startTime = System.currentTimeMillis();

            public void receive(Event[] events) {
                for (Event event : events) {
                    int count = this.eventCount.incrementAndGet();
                    this.timeSpent = (int)((long)this.timeSpent + (System.currentTimeMillis() - (Long)event.getData(3)));
                    if (count % 10000000 != 0) continue;
                    System.out.println("Throughput : " + (long)(count * 1000) / (System.currentTimeMillis() - this.startTime));
                    System.out.println("Time spent :  " + (double)this.timeSpent * 1.0 / (double)count);
                    this.startTime = System.currentTimeMillis();
                    this.eventCount.set(0);
                    this.timeSpent = 0;
                }
            }
        });
        InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
        siddhiAppRuntime.start();
        for (int i = 0; i <= 10; ++i) {
            EventPublisher eventPublisher = new EventPublisher(inputHandler);
            eventPublisher.start();
        }
    }

    static class EventPublisher
    extends Thread {
        InputHandler inputHandler;

        EventPublisher(InputHandler inputHandler) {
            this.inputHandler = inputHandler;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    while (true) {
                        this.inputHandler.send(new Object[]{"1", Float.valueOf(55.6f), 100, System.currentTimeMillis()});
                        this.inputHandler.send(new Object[]{"2", Float.valueOf(75.6f), 100, System.currentTimeMillis()});
                        this.inputHandler.send(new Object[]{"3", Float.valueOf(100.0f), 80, System.currentTimeMillis()});
                        this.inputHandler.send(new Object[]{"4", Float.valueOf(75.6f), 100, System.currentTimeMillis()});
                        this.inputHandler.send(new Object[]{"5", Float.valueOf(55.6f), 100, System.currentTimeMillis()});
                        this.inputHandler.send(new Object[]{"6", Float.valueOf(75.6f), 100, System.currentTimeMillis()});
                        this.inputHandler.send(new Object[]{"7", Float.valueOf(100.0f), 80, System.currentTimeMillis()});
                        this.inputHandler.send(new Object[]{"8", Float.valueOf(75.6f), 100, System.currentTimeMillis()});
                        this.inputHandler.send(new Object[]{"9", Float.valueOf(75.6f), 100, System.currentTimeMillis()});
                        this.inputHandler.send(new Object[]{"10", Float.valueOf(75.6f), 100, System.currentTimeMillis()});
                    }
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                    continue;
                }
                break;
            }
        }
    }
}

