Module : streams
Module overview
The Streams module provides the stream processing capabilities to Ballerina:
Note: Ballerina Streaming capabilities are shipped as an experimental
feature in the latest release. Please use the --experimental
flag when
compiling Ballerina files which have streaming constructs.
The following topics explain the high-level concepts of Ballerina streaming.
Stream
A stream is a logical series of events ordered in time. Its schema is defined/constrained via the record definition. A record definition contains a unique name and a set of uniquely-identifiable attributes with specific types within the record. All the events of a specific stream have the same schema (i.e., have the same attributes in the same order).
Syntax
The syntax for defining a new stream is as follows.
type <record name> record {
<attribute type> <attribute name>;
<attribute type> <attribute name>;
<attribute type> <attribute name>;
...
};
stream<record name> <stream name> = new;
The following parameters are configured in a stream definition.
Parameter | Description |
---|---|
stream name | The name of the created stream. |
record name | The name of the record that constrains the stream. |
attribute name | The uniquely-identifiable attribute name. The schema of a record is defined by its attributes. |
attribute type | The type of each attribute defined in the record. |
Example
type Employee record {
string name;
int age;
string status;
};
stream<Employee> employeeStream = new;
The code given above creates a stream named employeeStream
that is
constrained by the Employee
type with the following attributes.
name
of typestring
age
of typeint
status
of typestring
Forever Statement
The forever
statement block can include one or more streaming queries
defining stream processing and complex event processing rules. The
forever
statement block lets streaming queries to run continuously
till the Ballerina program is exited. Here each streaming query within
the forever
block executes as an independent isolated processing unit.
Sample query
This query filters out the sensor events, which have the temperature
greater than 30 celsius, and for every 100 sensor events, it groups them
based on their type, count, number of sensor events for each type. Next,
it publishes all the types that have more than one event to the
highTemperatureSensorStream
stream.
forever {
from sensorTemperatureStream
where sensorTemperatureStream.temperature > 30
window lengthBatch (100)
select sensorTemperatureStream.type, count() as totalCount
group by sensorTemperatureStream.type
having totalCount > 1
=> (HighTemperature [] values) {
foreach var value in values {
highTemperatureSensorStream.publish(value);
}
}
}
Query
Each streaming query can consume one or more streams, process the events continuously in a streaming manner, and generate the output simultaneously. A query enables you to perform complex event processing and stream processing operations by processing incoming events one by one in the order they arrive.
Syntax
Each query contains an input and an output section. Some also contain a projection section. The following is a simple query with all three sections.
from <input stream>
select <attribute name>, <attribute name>, ...
=> (<array type> <parameter name>) {
...
...
}
Example
This query consumes events from the tempStream
stream (that is already
defined) and sends the room temperature and the room number to the
roomTempStream
stream as the output.
type temperature record {
int deviceID;
int roomNo;
float value;
};
type roomTemperature record {
int roomNo;
float value;
};
stream<temperature> tempStream = new;
stream<roomTemperature> roomTempStream = new;
public function initQuery() {
forever {
from tempStream
select tempStream.roomNo, tempStream.value
=> (roomTemperature[] temperatures) {
foreach var value in temperatures {
roomTempStream.publish(value);
}
}
}
}
SnapshottableStreamEvent | This record represents a stream event which can be persisted. |
AbstractOperatorProcessor | Abstract processor encapsulating operator functions. |
AbstractPatternProcessor | Abstract processor encapsulating pattern processor functions. |
Aggregator | Abstract object, which should be implemented in order to create a new aggregator. |
AndOperatorProcessor | Processor to perform AND stream operations. |
Average | Aggregator to calculate average in streams. |
CompoundPatternProcessor | Processor to perform compound stream operations. |
Count | Aggregator to count events in streams. |
DelayWindow | This window will delay the incoming events for a given amount of time.
E.g.
from inputStream window |
DistinctCount | Aggregator to get the distinct counts of values in streams. |
ExternalTimeBatchWindow | This is a batch (tumbling) time window based on external time, that holds events arrived during window time periods,
and gets updated for every window time.
E.g.
from inputStream window |
ExternalTimeWindow | This is a sliding time window based on external time, that holds events for that arrived during last window time
period from the external timestamp, and gets updated on every monotonically increasing timestamp.
E.g.
from inputStream window |
Filter | The |
FollowedByProcessor | Processor to perform FollowedBy stream operations. |
HoppingWindow | The hopping window releases the events in batches defined by a time period every given time interval. The batch is
also determined by the time period given in the window. When the time interval the events being released and the
time period it hold the events are equal, the hopping window acts as a |
IntSort | This class implements a merge sort algorithm to sort timestamp values for state persistence. |
LengthBatchWindow | This is a batch (tumbling) length window, that holds up to the given length of events, and gets updated on every
given number of events arrival.
E.g.
from inputStream window |
LengthWindow | The |
LinkedList | The |
Max | Aggregator to find the maximum value in a stream. |
MaxForever | The aggregator to keep the maximum value received so far. It is similar to |
MergeSort | This object implements the merge sort algorithm to sort the provided value arrays. |
Min | Aggregator to find the minimum value in a stream. |
MinForever | The aggregator to keep the minimum value received so far. It is similar to |
Node | The |
NotOperatorProcessor | |
OperandProcessor | Processor to perform operand processor operations. |
OrOperatorProcessor | Processor to perform OR stream operations. |
OrderBy | The |
OutputProcess | The |
Scheduler | The |
Select | The |
Snapshotable | Abstract Snapshotable to be referenced by all snapshotable objects. |
SortWindow | The sort window hold a given number of events and emit the expired events in the ordered by the given fields.
E.g.
from inputStream window |
StateMachine | StateMachine which performs stream pattern processing. |
StdDev | The aggregator object to calculate standard deviation. |
StreamEvent | The |
StreamJoinProcessor | The |
Sum | Aggregator to perform summation of values in a stream. |
TableJoinProcessor | The |
TimeAccumulatingWindow | The |
TimeBatchWindow | This is a batch (tumbling) time window, that holds events arrived between window time periods, and gets updated for
every window time.
E.g.
from inputStream window |
TimeLengthWindow | This is a sliding time window that, at a given time holds the last windowLength events that arrived during last
windowTime period, and gets updated for every event arrival and expiry.
E.g.
from inputStream window |
TimeOrderWindow | The |
TimeWindow | The |
UniqueLengthWindow | This is a length window which only keeps the unique events.
E.g.
from inputStream window |
Window | The from inputStream window |
avg | Returns a |
buildStreamEvent | Creates |
count | Returns a |
createAndOperatorProcessor | Creates and returns a |
createCompoundPatternProcessor | Creates and returns a |
createFilter | Creates a |
createFollowedByProcessor | Creates and returns a |
createNotOperatorProcessor | Creates and returns a |
createOperandProcessor | Creates and returns a |
createOrOperatorProcessor | Creates and returns a |
createOrderBy | Creates an |
createOutputProcess | Creates and return a |
createResetStreamEvent | Creates a RESET event from a given event. |
createSelect | Creates and returns a select clause. |
createStateMachine | Creates and returns a |
createStreamJoinProcessor | Creates a |
createTableJoinProcessor | Creates a |
delay | The |
distinctCount | Returns a |
externalTime | The |
externalTimeBatch | The |
getStreamEvent | Get the stream event from any? field. This function can only be used only if we are sure that the |
hopping | The |
initPersistence | Function to initialize and start snapshotting. |
length | The |
lengthBatch | The |
max | Returns a |
maxForever | Returns a |
min | Returns a |
minForever | Returns a |
registerSnapshotable | Function to register Snapshotables. |
removeState | Function to clear an existing state. |
restoreState | Function to restore state of a given object. |
sort | The |
stdDev | Returns a |
sum | Returns a |
time | The |
timeAccum | The |
timeBatch | The |
timeLength | The |
timeOrder | The |
toSnapshottableEvent | Convert a single |
toSnapshottableEvents | Converts a given array of streams:StreamEvent objects to an array of |
toStreamEvent | Convert a single |
toStreamEvents | Converts a given array of snapshotable events to an array of |
uniqueLength | The |
OUTPUT | |
RESET | The reset event type |
EXPIRED | The expired event type |
CURRENT | The current event type. |
TIMER | The timer event type. |
DEFAULT | The default key to group by if the group by clause is not used in query |
DELIMITER | |
DELIMITER_REGEX | |
ASCENDING | |
DESCENDING |