import ballerina/io;
import ballerina/runtime;type StatusCount {
string status;
int totalCount;
};type Teacher {
string name;
int age;
string status;
string batch;
string school;
};function testAggregationQuery (stream<StatusCount> filteredStatusCountStream,
stream<Teacher> teacherStream) {
forever{
from teacherStream where age > 18 window lengthBatch(3)
select status, count(status) as totalCount
group by status
having totalCount > 1
=> (StatusCount [] status) {
filteredStatusCountStream.publish(status);
}
}
}function main (string[] args) {
stream<StatusCount> filteredStatusCountStream;
stream<Teacher> teacherStream;
testAggregationQuery(filteredStatusCountStream, teacherStream);
Teacher t1 = {name:"Raja", age:25, status:"single", batch:"LK2014", school:"Hindu College"};
Teacher t2 = {name:"Shareek", age:33, status:"single", batch:"LK1998", school:"Thomas College"};
Teacher t3 = {name:"Nimal", age:45, status:"married", batch:"LK1988", school:"Ananda College"};
filteredStatusCountStream.subscribe(printStatusCount);
teacherStream.publish(t1);
teacherStream.publish(t2);
teacherStream.publish(t3); runtime:sleepCurrentWorker(1000);
}
function printStatusCount (StatusCount s) {
io:println("Event received; status: " + s.status +" and total occurrences: "+s.totalCount);
}
StreamsBallerina provides first class support for streams. This is powered by Siddhi CEP engine. You can build streaming queries by using the user friendly syntax provided by Ballerina The queries include, projection, filtering, windows, stream joins and patterns. |
|
import ballerina/io;
import ballerina/runtime;
|
|
type StatusCount {
string status;
int totalCount;
};
|
|
type Teacher {
string name;
int age;
string status;
string batch;
string school;
};
|
|
function testAggregationQuery (stream<StatusCount> filteredStatusCountStream,
stream<Teacher> teacherStream) {
|
|
forever{
from teacherStream where age > 18 window lengthBatch(3)
select status, count(status) as totalCount
group by status
having totalCount > 1
=> (StatusCount [] status) {
filteredStatusCountStream.publish(status);
}
}
}
|
Create forever statement block with respective streaming query. |
function main (string[] args) {
|
|
stream<StatusCount> filteredStatusCountStream;
|
Create stream constrained by the StatusCount struct type. |
stream<Teacher> teacherStream;
|
Create stream constrained by the Teacher struct type. |
testAggregationQuery(filteredStatusCountStream, teacherStream);
|
Invoke the method which contains the forever streaming statement. |
Teacher t1 = {name:"Raja", age:25, status:"single", batch:"LK2014", school:"Hindu College"};
Teacher t2 = {name:"Shareek", age:33, status:"single", batch:"LK1998", school:"Thomas College"};
Teacher t3 = {name:"Nimal", age:45, status:"married", batch:"LK1988", school:"Ananda College"};
|
Create some events to pump into the input stream ‘teacherStream’. |
filteredStatusCountStream.subscribe(printStatusCount);
|
Subscribe to the stream ‘filteredStatusCountStream’ which contains the output events. |
teacherStream.publish(t1);
teacherStream.publish(t2);
teacherStream.publish(t3);
|
Publish events that generated in earlier step. |
runtime:sleepCurrentWorker(1000);
}
|
|
function printStatusCount (StatusCount s) {
io:println("Event received; status: " + s.status +" and total occurrences: "+s.totalCount);
}
|
Print the output events. |
$ ballerina run streams.bal
|
|