ballerina/streams module
Type Definitions
Type | Values | Description | |
---|---|---|---|
EventType | TIMER | RESET | EXPIRED | CURRENT | ALL | The type of stream events. |
|
JoinType | RIGHTOUTERJOIN | LEFTOUTERJOIN | JOIN | FULLOUTERJOIN | The types of joins between streams and tables. |
Records Summary
Record | Description | ||
---|---|---|---|
SnapshottableStreamEvent | This record represents a stream event which can be persisted. |
Objects Summary
Object | Description | ||
---|---|---|---|
Aggregator | Abstract object, which should be implemented in order to create a new aggregator. |
||
Average | Aggregator to calculate average in streams. |
||
Count | Aggregator to count events in streams. |
||
DelayWindow | This window will delay the incoming events for a given amount of time.
E.g.
from inputStream window |
||
DistinctCount | Aggregator to get the distinct counts of values in streams. |
||
ExternalTimeBatchWindow | This is a batch (tumbling) time window based on external time, that holds events arrived during window time periods,
and gets updated for every window time.
E.g.
from inputStream window |
||
ExternalTimeWindow | This is a sliding time window based on external time, that holds events for that arrived during last window time
period from the external timestamp, and gets updated on every monotonically increasing timestamp.
E.g.
from inputStream window |
||
Filter | The |
||
HoppingWindow | The hopping window releases the events in batches defined by a time period every given time interval. The batch is
also determined by the time period given in the window. When the time interval the events being released and the
time period it hold the events are equal, the hopping window acts as a |
||
IntSort | This class implements a merge sort algorithm to sort timestamp values for state persistence. |
||
LengthBatchWindow | This is a batch (tumbling) length window, that holds up to the given length of events, and gets updated on every
given number of events arrival.
E.g.
from inputStream window |
||
LengthWindow | The |
||
LinkedList | The |
||
Max | Aggregator to find the maximum value in a stream. |
||
MaxForever | The aggregator to keep the maximum value received so far. It is similar to |
||
MergeSort | This object implements the merge sort algorithm to sort the provided value arrays. |
||
Min | Aggregator to find the minimum value in a stream. |
||
MinForever | The aggregator to keep the minimum value received so far. It is similar to |
||
Node | The |
||
OrderBy | The |
||
OutputProcess | The |
||
Scheduler | The |
||
Select | The |
||
Snapshotable | Abstract Snapshotable to be referenced by all snapshotable objects. |
||
SortWindow | The sort window hold a given number of events and emit the expired events in the ordered by the given fields.
E.g.
from inputStream window |
||
StdDev | The aggregator object to calculate standard deviation. |
||
StreamEvent | The |
||
StreamJoinProcessor | The |
||
Sum | Aggregator to perform summation of values in a stream. |
||
TableJoinProcessor | The |
||
TimeAccumulatingWindow | The |
||
TimeBatchWindow | This is a batch (tumbling) time window, that holds events arrived between window time periods, and gets updated for
every window time.
E.g.
from inputStream window |
||
TimeLengthWindow | This is a sliding time window that, at a given time holds the last windowLength events that arrived during last
windowTime period, and gets updated for every event arrival and expiry.
E.g.
from inputStream window |
||
TimeOrderWindow | The |
||
TimeWindow | The |
||
UniqueLengthWindow | This is a length window which only keeps the unique events.
E.g.
from inputStream window |
||
Window | The from inputStream window |
Functions Summary
Return Type | Function and Description | ||
---|---|---|---|
Aggregator | avg() Returns a |
||
StreamEvent|null[] | buildStreamEvent(any t, string streamName) Creates |
||
Aggregator | count() Returns a |
||
Filter | createFilter(function (streams:StreamEvent?[]) returns () nextProcPointer, function (map<anydata>) returns (boolean) conditionFunc) Creates a |
||
OrderBy | createOrderBy(function (streams:StreamEvent?[]) returns () nextProcessorPointer, function (map<anydata>) returns (anydata)?[] fields, string[] sortFieldMetadata) Creates an |
||
OutputProcess | createOutputProcess(function (map<anydata>[]) returns () outputFunc) Creates and return a |
||
StreamEvent | createResetStreamEvent(streams:StreamEvent event) Creates a RESET event from a given event. |
||
Select | createSelect(function (streams:StreamEvent?[]) returns () nextProcPointer, streams:Aggregator[] aggregatorArr, function (streams:StreamEvent) returns (anydata)?[]? groupbyFuncArray, function (streams:StreamEvent,streams:Aggregator[]) returns (map<anydata>) selectFunc, string scopeName) Creates and returns a select clause. |
||
StreamJoinProcessor | createStreamJoinProcessor(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc) Creates a |
||
TableJoinProcessor | createTableJoinProcessor(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (streams:StreamEvent) returns (map<anydata>[]) tableQuery) Creates a |
||
Window | delay(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
Aggregator | distinctCount() Returns a |
||
Window | externalTime(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
Window | externalTimeBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
StreamEvent | getStreamEvent(any? anyEvent) Get the stream event from any? field. This function can only be used only if we are sure that the |
||
Window | hopping(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
initPersistence() Function to initialize and start snapshotting. |
|||
Window | length(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
Window | lengthBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
Aggregator | max() Returns a |
||
Aggregator | maxForever() Returns a |
||
Aggregator | min() Returns a |
||
Aggregator | minForever() Returns a |
||
registerSnapshotable(string key, any reference) Function to register Snapshotables. |
|||
boolean | removeState(string key) Function to clear an existing state. |
||
restoreState(string key, any reference) Function to restore state of a given object. |
|||
Window | sort(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
Aggregator | stdDev() Returns a |
||
Aggregator | sum() Returns a |
||
Window | time(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
Window | timeAccum(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
Window | timeBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
Window | timeLength(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
Window | timeOrder(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
SnapshottableStreamEvent | toSnapshottableEvent(streams:StreamEvent event) Convert a single |
||
SnapshottableStreamEvent|null[] | toSnapshottableEvents(streams:StreamEvent[]|any[]? events) Converts a given array of streams:StreamEvent objects to an array of |
||
StreamEvent | toStreamEvent(streams:SnapshottableStreamEvent event) Convert a single |
||
StreamEvent|null[] | toStreamEvents(streams:SnapshottableStreamEvent[]|any[]? events) Converts a given array of snapshotable events to an array of |
||
Window | uniqueLength(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
Constants
public type SnapshottableStreamEvent record
This record represents a stream event which can be persisted.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
eventType | CURRENT|EXPIRED|ALL|RESET|TIMER | CURRENT | description |
timestamp | int | 0 | description |
data | map |
{} | description |
public function avg() returns (Aggregator)
Returns a Average
aggregator object. The aggregator function name which is used in a streaming query should have the
same name as this function's name.
Return Type | Description | ||
---|---|---|---|
Aggregator | A |
public function buildStreamEvent(any t, string streamName) returns (StreamEvent|null[])
Creates streams:StreamEvent
object array for a record t
received by the stream denoted by the name streamNme
.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
t | any | A record received by the stream |
|
streamName | string | Name of the stream to which the record |
Return Type | Description | ||
---|---|---|---|
StreamEvent|null[] | Returns an array of streams:StreamEvents|() |
public function count() returns (Aggregator)
Returns a Count
aggregator object. The aggregator function name which is used in a streaming query should have the
same name as this function's name.
Return Type | Description | ||
---|---|---|---|
Aggregator | A |
public function createFilter(function (streams:StreamEvent?[]) returns () nextProcPointer, function (map<anydata>) returns (boolean) conditionFunc) returns (Filter)
Creates a Filter
object and return it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
nextProcPointer | function (streams:StreamEvent?[]) returns () | The function pointer to the |
|
conditionFunc | function (map |
The function pointer to the condition evaluator. This is a function which returns true or false based on the boolean expression given in the where clause. |
Return Type | Description | ||
---|---|---|---|
Filter | Returns a |
public function createOrderBy(function (streams:StreamEvent?[]) returns () nextProcessorPointer, function (map<anydata>) returns (anydata)?[] fields, string[] sortFieldMetadata) returns (OrderBy)
Creates an OrderBy
object and return it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
nextProcessorPointer | function (streams:StreamEvent?[]) returns () | A function pointer to the |
|
fields | function (map |
An array of function pointers which each returns a field by which the events are sorted. Events are sorted by the first field, if there are elements of same value, the second field is used and so on. |
|
sortFieldMetadata | string[] | sortTypes of the fields ( |
Return Type | Description | ||
---|---|---|---|
OrderBy | Returns a |
public function createOutputProcess(function (map<anydata>[]) returns () outputFunc) returns (OutputProcess)
Creates and return a OutputProcess
object.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
outputFunc | function (map |
The function pointer to a lambda function created out of the statements in the streaming action |
Return Type | Description | ||
---|---|---|---|
OutputProcess | Returns a |
public function createResetStreamEvent(streams:StreamEvent event) returns (StreamEvent)
Creates a RESET event from a given event.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
event | streams:StreamEvent | The event from which the reset event is created. |
Return Type | Description | ||
---|---|---|---|
StreamEvent | A stream event of type streams:RESET. |
public function createSelect(function (streams:StreamEvent?[]) returns () nextProcPointer, streams:Aggregator[] aggregatorArr, function (streams:StreamEvent) returns (anydata)?[]? groupbyFuncArray, function (streams:StreamEvent,streams:Aggregator[]) returns (map<anydata>) selectFunc, string scopeName) returns (Select)
Creates and returns a select clause.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
nextProcPointer | function (streams:StreamEvent?[]) returns () | The function pointer to the |
|
aggregatorArr | streams:Aggregator[] | The array of aggregators used in the select clause. If the same aggregator is used twice, the
|
|
groupbyFuncArray | function (streams:StreamEvent) returns (anydata)?[]? | The array of function pointer which contains the lambda function which returns the expressions in the group by clause. |
|
selectFunc | function (streams:StreamEvent,streams:Aggregator[]) returns (map |
The function pointer to a lambda function that creates the |
|
scopeName | string | $scope$name | A unique id to identify the forever block if there are multiple forever blocks. |
Return Type | Description | ||
---|---|---|---|
Select | Returns a |
public function createStreamJoinProcessor(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc) returns (StreamJoinProcessor)
Creates a StreamJoinProcessor
and returns it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
nextProcessor | function (streams:StreamEvent?[]) returns () | The |
|
joinType | JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN | Type of the join being performed ("JOIN"|"LEFTOUTERJOIN"|"RIGHTOUTERJOIN"|"FULLOUTERJOIN") |
|
conditionFunc | function (map |
() | A lambda function which contains the joining condition and return true if the condition satifies the condition. |
Return Type | Description | ||
---|---|---|---|
StreamJoinProcessor | Returns a |
public function createTableJoinProcessor(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (streams:StreamEvent) returns (map<anydata>[]) tableQuery) returns (TableJoinProcessor)
Creates a TableJoinProcessor
and return it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
nextProcessor | function (streams:StreamEvent?[]) returns () | The function pointer to |
|
joinType | JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN | The type of the join and it can be any value defined by |
|
tableQuery | function (streams:StreamEvent) returns (map |
The function pointer to a function which retrieves the records from the table and joins w ith each stream event. |
Return Type | Description | ||
---|---|---|---|
TableJoinProcessor | Returns a |
public function delay(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The delay
function creates a DelayWindow
object and returns it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
windowParameters | any[] | Arguments which should be passed with the window function in the streams query in the order they appear in the argument list. |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
Return Type | Description | ||
---|---|---|---|
Window | Returns the created window. |
public function distinctCount() returns (Aggregator)
Returns a DistinctCount
aggregator object. The aggregator function name which is used in a streaming query should
have the same name as this function's name.
Return Type | Description | ||
---|---|---|---|
Aggregator | A |
public function externalTime(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The externalTime
function creates a ExternalTimeWindow
object and returns it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
windowParameters | any[] | Arguments which should be passed with the window function in the streams query in the order they appear in the argument list. |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
Return Type | Description | ||
---|---|---|---|
Window | Returns the created window. |
public function externalTimeBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The externalTimeBatch
function creates a ExternalTimeBatchWindow
object and returns it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
windowParameters | any[] | Arguments which should be passed with the window function in the streams query in the order they appear in the argument list. |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
Return Type | Description | ||
---|---|---|---|
Window | Returns the created window. |
public function getStreamEvent(any? anyEvent) returns (StreamEvent)
Get the stream event from any? field. This function can only be used only if we are sure that the anyEvent
is a
streams:StreamEvent.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
anyEvent | any? | The object from which, the stream event is extracted. |
Return Type | Description | ||
---|---|---|---|
StreamEvent | Returns the extracted streams:StreamEvent object. |
public function hopping(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The hopping
function creates a HoppingWindow
object and returns it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
windowParameters | any[] | Arguments which should be passed with the window function in the streams query in the order they appear in the argument list. |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
Return Type | Description | ||
---|---|---|---|
Window | Returns the created window. |
public function initPersistence()
Function to initialize and start snapshotting.
public function length(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The length
function creates a LengthWindow
object and returns it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
windowParameters | any[] | Arguments which should be passed with the window function in the streams query in the order they appear in the argument list. |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
Return Type | Description | ||
---|---|---|---|
Window | Returns the created window. |
public function lengthBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The lengthBatch
function creates a LengthBatchWindow
object and returns it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
windowParameters | any[] | Arguments which should be passed with the window function in the streams query in the order they appear in the argument list. |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
Return Type | Description | ||
---|---|---|---|
Window | Returns the created window. |
public function max() returns (Aggregator)
Returns a Max
aggregator object. The aggregator function name which is used in a streaming query should
have the same name as this function's name.
Return Type | Description | ||
---|---|---|---|
Aggregator | A |
public function maxForever() returns (Aggregator)
Returns a MaxForever
aggregator object. The aggregator function name which is used in a streaming query should
have the same name as this function's name.
Return Type | Description | ||
---|---|---|---|
Aggregator | A |
public function min() returns (Aggregator)
Returns a Min
aggregator object. The aggregator function name which is used in a streaming query should
have the same name as this function's name.
Return Type | Description | ||
---|---|---|---|
Aggregator | A |
public function minForever() returns (Aggregator)
Returns a MinForever
aggregator object. The aggregator function name which is used in a streaming query should
have the same name as this function's name.
Return Type | Description | ||
---|---|---|---|
Aggregator | A |
public function registerSnapshotable(string key, any reference)
Function to register Snapshotables.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
key | string | An unique |
|
reference | any | The snapshotable reference to be registered. |
public function removeState(string key) returns (boolean)
Function to clear an existing state.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
key | string | An unique |
Return Type | Description | ||
---|---|---|---|
boolean | A |
public function restoreState(string key, any reference)
Function to restore state of a given object.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
key | string | An unique |
|
reference | any | The snapshotable reference to be restored. |
public function sort(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The sort
function creates a SortWindow
object and returns it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
windowParameters | any[] | Arguments which should be passed with the window function in the streams query in the order they appear in the argument list. |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
Return Type | Description | ||
---|---|---|---|
Window | Returns the created window. |
public function stdDev() returns (Aggregator)
Returns a StdDev
aggregator object. The aggregator function name which is used in a streaming query should
have the same name as this function's name.
Return Type | Description | ||
---|---|---|---|
Aggregator | A |
public function sum() returns (Aggregator)
Returns a Sum
aggregator object. The aggregator function name which is used in a streaming query should have the
same name as this function's name.
Return Type | Description | ||
---|---|---|---|
Aggregator | A |
public function time(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The time
function creates a TimeWindow
object and returns it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
windowParameters | any[] | Arguments which should be passed with the window function in the streams query in the order they appear in the argument list. |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
Return Type | Description | ||
---|---|---|---|
Window | Returns the created window. |
public function timeAccum(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The timeAccum
function creates a TimeAccumulatingWindow
object and returns it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
windowParameters | any[] | Arguments which should be passed with the window function in the streams query in the order they appear in the argument list. |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
Return Type | Description | ||
---|---|---|---|
Window | Returns the created window. |
public function timeBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The timeBatch
function creates a TimeBatchWindow
object and returns it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
windowParameters | any[] | Arguments which should be passed with the window function in the streams query in the order they appear in the argument list. |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
Return Type | Description | ||
---|---|---|---|
Window | Returns the created window. |
public function timeLength(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The timeLength
function creates a TimeLengthWindow
object and returns it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
windowParameters | any[] | Arguments which should be passed with the window function in the streams query in the order they appear in the argument list. |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
Return Type | Description | ||
---|---|---|---|
Window | Returns the created window. |
public function timeOrder(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The timeOrder
function creates a TimeOrderWindow
object and returns it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
windowParameters | any[] | Arguments which should be passed with the window function in the streams query in the order they appear in the argument list. |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
Return Type | Description | ||
---|---|---|---|
Window | Returns the created window. |
public function toSnapshottableEvent(streams:StreamEvent event) returns (SnapshottableStreamEvent)
Convert a single streams:StreamEvent
object to streams:SnapshottableStreamEvent
object.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
event | streams:StreamEvent | The |
Return Type | Description | ||
---|---|---|---|
SnapshottableStreamEvent | The converted |
public function toSnapshottableEvents(streams:StreamEvent[]|any[]? events) returns (SnapshottableStreamEvent|null[])
Converts a given array of streams:StreamEvent objects to an array of streams:SnapshottableStreamEvent
.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
events | streams:StreamEvent[]|any[]? | The events to be coverted to snapshotable events. |
Return Type | Description | ||
---|---|---|---|
SnapshottableStreamEvent|null[] | Returns the converted snapshotable events. |
public function toStreamEvent(streams:SnapshottableStreamEvent event) returns (StreamEvent)
Convert a single streams:SnapshottableStreamEvent
object to streams:StreamEvent
object.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
event | streams:SnapshottableStreamEvent | The |
Return Type | Description | ||
---|---|---|---|
StreamEvent | The converted |
public function toStreamEvents(streams:SnapshottableStreamEvent[]|any[]? events) returns (StreamEvent|null[])
Converts a given array of snapshotable events to an array of streams:StreamEvent
objects.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
events | streams:SnapshottableStreamEvent[]|any[]? | Snapshotable events to be converted to |
Return Type | Description | ||
---|---|---|---|
StreamEvent|null[] | Returns the converted |
public function uniqueLength(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The uniqueLength
function creates a UniqueLengthWindow
object and returns it.
Parameter Name | Data Type | Default Value | Description |
---|---|---|---|
windowParameters | any[] | Arguments which should be passed with the window function in the streams query in the order they appear in the argument list. |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
Return Type | Description | ||
---|---|---|---|
Window | Returns the created window. |
public type Aggregator object
Abstract object, which should be implemented in order to create a new aggregator.
-
<Aggregator> copy() returns (Aggregator)
Returns a copy of self, but it does not contain the current state.
Return Type Description Aggregator A
Aggregator
object. -
<Aggregator> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the aggregated value and returns the final aggregated value.
Parameter Name Data Type Default Value Description value anydata value being aggregated.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT
,streams:EXPIRED
orstreams:RESET
. Based on the type of the eventvalue
will be added to the aggregation or removed from the aggregation.Return Type Description anydata Updated aggregated value after
value
being aggregated.
public type Average object
Aggregator to calculate average in streams.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
count | int | 0 | description |
sum | float | 0.0 | description |
-
<Average> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Returns the calculated average after
value
being aggregated into current average. If theeventType
isstreams:CURRENT
,value
is added to the current sum and count is increase by 1. If theeventType
isstreams:EXPIRED
,value
is subtracted from the current sum and count is descreased by 1. If theeventType
isstreams:RESET
, Current summation and count is reset, regardless thevalue
. Then by dividing the sum by count, the average is calculated.Parameter Name Data Type Default Value Description value anydata The numeric value being aggregated in order to calculate the new average.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT
,streams:EXPIRED
orstreams:RESET
.Return Type Description anydata Updated average value after
value
being aggregated. -
<Average> copy() returns (Aggregator)
Returns a copy of the
Average
aggregator without its current state.Return Type Description Aggregator Returns
Average
aggregator. -
<Average> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<Average> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type Count object
Aggregator to count events in streams.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
count | int | 0 | description |
-
<Count> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current count when a new event arrives and return the updated count. If the
eventType
isstreams:CURRENT
, count is increase by 1. If theeventType
isstreams:EXPIRED
, count is descreased by 1. If theeventType
isstreams:RESET
, count is reset, regardless thevalue
.Parameter Name Data Type Default Value Description value anydata In count aggregator the value is not used.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT
,streams:EXPIRED
orstreams:RESET
.Return Type Description anydata Updated count.
-
<Count> copy() returns (Aggregator)
Returns a copy of the
Count
aggregator without its current state.Return Type Description Aggregator Returns
Count
aggregator. -
<Count> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<Count> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type DelayWindow object
This window will delay the incoming events for a given amount of time.
E.g.
from inputStream window delay(4000)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
The delay
window should only have one parameter (
Field Name | Data Type | Default Value | Description |
---|---|---|---|
delayInMilliSeconds | int | description |
|
windowParameters | any[] | description |
|
delayedEventQueue | streams:LinkedList | description |
|
lastTimestamp | int | 0 | description |
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | description |
|
scheduler | streams:Scheduler | description |
-
<DelayWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<DelayWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<DelayWindow> process(streams:StreamEvent?[] streamEvents)
The
process
function process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<DelayWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent|null,StreamEvent|null)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent|StreamEvent">null,StreamEvent|StreamEvent">null)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<DelayWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<DelayWindow> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type DistinctCount object
Aggregator to get the distinct counts of values in streams.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
distinctValues | map |
{} | description |
-
<DistinctCount> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current distinct count when a new event arrives and return the updated count. If the
eventType
isstreams:CURRENT
, count is increased by 1. If theeventType
isstreams:EXPIRED
, count is descreased by 1. If theeventType
isstreams:RESET
, count is reset, regardless of thevalue
.Parameter Name Data Type Default Value Description value anydata Value being counted uniquely.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT
,streams:EXPIRED
orstreams:RESET
.Return Type Description anydata Updated distinct count.
-
<DistinctCount> copy() returns (Aggregator)
Returns a copy of the
DistinctCount
aggregator without its current state.Return Type Description Aggregator Returns
DistinctCount
aggregator. -
<DistinctCount> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<DistinctCount> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type ExternalTimeBatchWindow object
This is a batch (tumbling) time window based on external time, that holds events arrived during window time periods,
and gets updated for every window time.
E.g.
from inputStream window externalTimeBatch(inputStream.timestamp, 1000, 500, 1200, true)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
The externalTimeBatch
window should only have two to five parameters (timestamp field,
Field Name | Data Type | Default Value | Description |
---|---|---|---|
timeToKeep | int | description |
|
currentEventChunk | streams:LinkedList | description |
|
expiredEventChunk | streams:LinkedList | description |
|
resetEvent | streams:StreamEvent? | () | description |
startTime | int | -1 | description |
isStartTimeEnabled | boolean | false | description |
replaceTimestampWithBatchEndTime | boolean | false | description |
flushed | boolean | false | description |
endTime | int | -1 | description |
schedulerTimeout | int | 0 | description |
lastScheduledTime | int | description |
|
lastCurrentEventTime | int | 0 | description |
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | description |
|
timeStamp | string | description |
|
storeExpiredEvents | boolean | false | description |
outputExpectsExpiredEvents | boolean | false | description |
windowParameters | any[] | description |
|
scheduler | streams:Scheduler | description |
-
<ExternalTimeBatchWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<ExternalTimeBatchWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<ExternalTimeBatchWindow> process(streams:StreamEvent?[] streamEvents)
The
process
function process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<ExternalTimeBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent|null,StreamEvent|null)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent|StreamEvent">null,StreamEvent|StreamEvent">null)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<ExternalTimeBatchWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<ExternalTimeBatchWindow> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data. -
<ExternalTimeBatchWindow> cloneAppend(streams:StreamEvent currStreamEvent)
Parameter Name Data Type Default Value Description currStreamEvent streams:StreamEvent -
<ExternalTimeBatchWindow> flushToOutputChunk(streams:LinkedList complexEventChunks, int currentTime, boolean preserveCurrentEvents)
Parameter Name Data Type Default Value Description complexEventChunks streams:LinkedList currentTime int preserveCurrentEvents boolean -
<ExternalTimeBatchWindow> appendToOutputChunk(streams:LinkedList complexEventChunks, int currentTime, boolean preserveCurrentEvents)
Parameter Name Data Type Default Value Description complexEventChunks streams:LinkedList currentTime int preserveCurrentEvents boolean -
<ExternalTimeBatchWindow> findEndTime(int currentTime, int startTime_, int timeToKeep_) returns (int)
Parameter Name Data Type Default Value Description currentTime int startTime_ int timeToKeep_ int Return Type Description int -
<ExternalTimeBatchWindow> initTiming(streams:StreamEvent firstStreamEvent)
Parameter Name Data Type Default Value Description firstStreamEvent streams:StreamEvent -
<ExternalTimeBatchWindow> getTimestamp(any val) returns (int)
Parameter Name Data Type Default Value Description val any Return Type Description int
public type ExternalTimeWindow object
This is a sliding time window based on external time, that holds events for that arrived during last window time
period from the external timestamp, and gets updated on every monotonically increasing timestamp.
E.g.
from inputStream window externalTime(inputStream.timestamp, 4000)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
The externalTime
window should only have two parameters (timestamp field,
Field Name | Data Type | Default Value | Description |
---|---|---|---|
timeInMillis | int | description |
|
windowParameters | any[] | description |
|
expiredEventQueue | streams:LinkedList | description |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | description |
|
timeStamp | string | description |
-
<ExternalTimeWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<ExternalTimeWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<ExternalTimeWindow> process(streams:StreamEvent?[] streamEvents)
The
process
function process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<ExternalTimeWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent|null,StreamEvent|null)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent|StreamEvent">null,StreamEvent|StreamEvent">null)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<ExternalTimeWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<ExternalTimeWindow> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data. -
<ExternalTimeWindow> getTimestamp(any val) returns (int)
Parameter Name Data Type Default Value Description val any Return Type Description int
public type Filter object
The Filter
object represents the where
clause in a streaming query. This object takes two parameter for
initialization. nextProcessorPointer
is the function pointer of the next processor to be invoked once the
filtering is complete. conditionFunc is a function pointer which return true if the given where clause evaluates to
true.
-
<Filter> process(streams:StreamEvent?[] streamEvents)
Process the incoming stream events. This function takes an array of stream events, iterate each of the events in the array, then call the
conditionFunc
on each and see ifconditionFunc
is evaluates to true. if so, those events will be passed to thenextPrcessorPointer
which can be theprocess
function of the next processor object ( e.g.Select
,Window
,Aggregator
.. etc).Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The events being filtered.
public type HoppingWindow object
The hopping window releases the events in batches defined by a time period every given time interval. The batch is
also determined by the time period given in the window. When the time interval the events being released and the
time period it hold the events are equal, the hopping window acts as a TimeBatch
window.
E.g.
from inputStream window hopping(5000, 4000)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
Hopping window should only have two parameters (
Field Name | Data Type | Default Value | Description |
---|---|---|---|
timeInMilliSeconds | int | description |
|
hoppingTime | int | description |
|
windowParameters | any[] | description |
|
nextEmitTime | int | -1 | description |
currentEventQueue | streams:LinkedList | description |
|
resetEvent | streams:StreamEvent? | description |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | description |
|
scheduler | streams:Scheduler | description |
-
<HoppingWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<HoppingWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<HoppingWindow> process(streams:StreamEvent?[] streamEvents)
The
process
function process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<HoppingWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent|null,StreamEvent|null)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent|StreamEvent">null,StreamEvent|StreamEvent">null)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<HoppingWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<HoppingWindow> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type IntSort object
This class implements a merge sort algorithm to sort timestamp values for state persistence.
-
<IntSort> sort(int[] arr)
Sorts a given array of int values.
Parameter Name Data Type Default Value Description arr int[] The array of int values to be sorted.
public type LengthBatchWindow object
This is a batch (tumbling) length window, that holds up to the given length of events, and gets updated on every
given number of events arrival.
E.g.
from inputStream window lengthBatch(5)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
The lengthBatch
window should only have one parameter (
Field Name | Data Type | Default Value | Description |
---|---|---|---|
length | int | description |
|
windowParameters | any[] | description |
|
count | int | description |
|
resetEvent | streams:StreamEvent? | description |
|
currentEventQueue | streams:LinkedList | description |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | description |
-
<LengthBatchWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<LengthBatchWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<LengthBatchWindow> process(streams:StreamEvent?[] streamEvents)
The
process
function process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<LengthBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent|null,StreamEvent|null)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent|StreamEvent">null,StreamEvent|StreamEvent">null)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<LengthBatchWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<LengthBatchWindow> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type LengthWindow object
The LengthWindow
is a sliding length window, that holds last windowLength events, and gets updated on every event
arrival and expiry.
E.g.
from inputStream window length(5)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
The length
window should only have one parameter (
Field Name | Data Type | Default Value | Description |
---|---|---|---|
size | int | description |
|
linkedList | streams:LinkedList | description |
|
windowParameters | any[] | description |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | description |
-
<LengthWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<LengthWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<LengthWindow> process(streams:StreamEvent?[] streamEvents)
The
process
function process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<LengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent|null,StreamEvent|null)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent|StreamEvent">null,StreamEvent|StreamEvent">null)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<LengthWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<LengthWindow> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type LinkedList object
The LinkedList
object which represents the linked list data structure.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
first | streams:Node? | description |
|
last | streams:Node? | description |
|
curr | streams:Node? | description |
|
size | int | 0 | description |
ascend | boolean | true | description |
-
<LinkedList> isEmpty() returns (boolean)
Checks if the linked list is empty.
Return Type Description boolean Returns
true
if the linked list is empty, otherwise returnsfalse
. -
<LinkedList> resetToFront()
Moves the cursoer to the front/head of the linked list.
-
<LinkedList> resetToRear()
Moves the cursor to the end of the linked list if the cursor is not already at the last element of the linked list.
-
<LinkedList> hasNext() returns (boolean)
Returns true if the linked list has more elements starting from the current cursor location.
Return Type Description boolean Returns
true
if there are more elements onwards from the current cursor location, otherwisefalse
. -
<LinkedList> hasPrevious() returns (boolean)
Returns
true
if there are prior elements to the current element/cursor location, otherwise false.Return Type Description boolean Returns true, if there are elements prior to the current cursor location, otherwise
false
. -
<LinkedList> next() returns (any|null)
Returns the next element of the linked list and moves the cursor to the next element.
Return Type Description any|null The next element from the current cursor location.
-
<LinkedList> previous() returns (any|null)
Returns the previous element of the linked list and moves the cursor to the previous element.
Return Type Description any|null The previous element from the current cursor location.
-
<LinkedList> removeCurrent()
Removes the element at the current cursor location.
-
<LinkedList> getSize() returns (int)
Returns the current number of elements in the linked list.
Return Type Description int The number of elements in the linked list.
-
<LinkedList> clear()
Empties the linked list.
-
<LinkedList> removeFirstOccurrence(any? elem) returns (boolean)
Removes the first occurence of the element pass as
elem
and returntrue
of the removal is successful.Parameter Name Data Type Default Value Description elem any? Return Type Description boolean Return
true
if removal is successful otherwisefalse
. -
<LinkedList> remove(any? elem) returns (boolean)
Removes the first occurence of the element pass as
elem
and returntrue
of the removal is successful.Parameter Name Data Type Default Value Description elem any? Return Type Description boolean Return
true
if removal is successful otherwisefalse
. -
<LinkedList> getFirst() returns (any|null)
Returns the first element of the linked list, without moving the cursor.
Return Type Description any|null First element of the linked list.
-
<LinkedList> getLast() returns (any|null)
Returns the last element of the linked list, without moving the cursor.
Return Type Description any|null Last element of the linked list.
-
<LinkedList> addFirst(any data)
Adds a new element to the front of the linked list without moving the cursor.
Parameter Name Data Type Default Value Description data any Data to be added to the front of the linked list.
-
<LinkedList> addLast(any data)
Adds a new element to the end of the linked list without moving the cursor.
Parameter Name Data Type Default Value Description data any Data to be added to the end of the linked list.
-
<LinkedList> removeFirst() returns (any|null)
Removes the first element in the linked list without moving the cursor.
Return Type Description any|null Returns the removed element.
-
<LinkedList> removeLast() returns (any|null)
Removes the last element in the linked list without moving the cursor.
Return Type Description any|null Returns the removed element.
-
<LinkedList> insertBeforeCurrent(any data)
Insert a new element before the current cursor location.
Parameter Name Data Type Default Value Description data any Data to be inserted.
-
<LinkedList> dequeue() returns (any|null)
Returns the first element which is added to the linked list.
Return Type Description any|null The dequeued element.
-
<LinkedList> asArray() returns (any[])
Creates an array from the elements in the linked list and return it. The cursor will not be changed.
Return Type Description any[] An array of elements in the linked list.
-
<LinkedList> addAll(any[] data)
Adds elements of an array to the current cursor location and moves the cursor to the end of the list.
Parameter Name Data Type Default Value Description data any[] The array to be added to the linked list.
public type Max object
Aggregator to find the maximum value in a stream.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
iMaxQueue | streams:LinkedList | BLangTypeInit: new null ([]) | description |
fMaxQueue | streams:LinkedList | BLangTypeInit: new null ([]) | description |
iMax | int? | () | description |
fMax | float? | () | description |
-
<Max> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current maximum value and return the updated maximum value.
Parameter Name Data Type Default Value Description value anydata Value being checked whether it is greater than the current maximum value.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT
,streams:EXPIRED
orstreams:RESET
.Return Type Description anydata Updated max value.
-
<Max> copy() returns (Aggregator)
Returns a copy of the
Max
aggregator without its current state.Return Type Description Aggregator Returns
Max
aggregator. -
<Max> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<Max> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type MaxForever object
The aggregator to keep the maximum value received so far. It is similar to Max
aggregator, but it keeps the maximum
value it received so far, forever.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
iMax | int? | () | description |
fMax | float? | () | description |
-
<MaxForever> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current maximum value and return the updated maximum value.
Parameter Name Data Type Default Value Description value anydata Value being checked whether it is greater than the current maximum value.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT
,streams:EXPIRED
orstreams:RESET
.Return Type Description anydata Updated maximum value.
-
<MaxForever> copy() returns (Aggregator)
Returns a copy of the
MaxForever
aggregator.Return Type Description Aggregator A
Aggregator
object which representsMaxForever
aggregator. -
<MaxForever> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<MaxForever> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type MergeSort object
This object implements the merge sort algorithm to sort the provided value arrays. fieldFuncs
are function pointers
which returns the field values of each stream event's data
map's values. sortTypes
are an array of (
streams:ASCENDING or streams:DESCENDING).
Field Name | Data Type | Default Value | Description |
---|---|---|---|
fieldFuncs | function (map |
description |
|
sortTypes | string[] | description |
-
<MergeSort> __init(function (map<anydata>) returns (anydata)?[] fieldFuncs, string[] sortTypes)
Parameter Name Data Type Default Value Description fieldFuncs function (map ) returns (anydata)?[] sortTypes string[] -
<MergeSort> topDownMergeSort(streams:StreamEvent?[] events)
Sorts the given stream events using the merge sort algorithm.
Parameter Name Data Type Default Value Description events streams:StreamEvent?[] The array of stream events to be sorted.
public type Min object
Aggregator to find the minimum value in a stream.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
iMinQueue | streams:LinkedList | BLangTypeInit: new null ([]) | description |
fMinQueue | streams:LinkedList | BLangTypeInit: new null ([]) | description |
iMin | int? | () | description |
fMin | float? | () | description |
-
<Min> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current minimum value and return the updated minimum value.
Parameter Name Data Type Default Value Description value anydata Value being checked whether it is lesser than the current minimum value.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT
,streams:EXPIRED
orstreams:RESET
.Return Type Description anydata Updated minimum value.
-
<Min> copy() returns (Aggregator)
Returns a copy of the
Min
aggregator.Return Type Description Aggregator A
Aggregator
object which representsMin
aggregator. -
<Min> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<Min> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type MinForever object
The aggregator to keep the minimum value received so far. It is similar to Min
aggregator, but it keeps the minimum
value it received so far, forever.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
iMin | int? | () | description |
fMin | float? | () | description |
-
<MinForever> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current minimum value and return the updated minimum value.
Parameter Name Data Type Default Value Description value anydata Value being checked whether it is lesser than the current minimum value.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT
,streams:EXPIRED
orstreams:RESET
.Return Type Description anydata Updated minimum value.
-
<MinForever> copy() returns (Aggregator)
Returns a copy of the
MinForever
aggregator.Return Type Description Aggregator A
Aggregator
object which representsMinForever
aggregator. -
<MinForever> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<MinForever> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type Node object
The Node
object represents a node in the linkedlist data structure.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
data | any? | description |
|
next | streams:Node? | description |
|
prev | streams:Node? | description |
-
<Node> __init(streams:Node? prev, any? data, streams:Node? next)
Parameter Name Data Type Default Value Description prev streams:Node? data any? next streams:Node?
public type OrderBy object
The OrderBy
object represents the desugared code of order by
clause of a streaming query. This object takes 3
parameters to initialize itself. nextProcessPointer
is the process
method of the next processor. fieldFuncs
is an array of function pointers which returns the field values to be sorted. sortTypes
is an array of string
specifying whether the sort order (ascending or descending). Internally this processor uses a MergeSort
object
to sort.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
nextProcessorPointer | function (streams:StreamEvent?[]) returns () | description |
|
fieldFuncs | function (map |
description |
|
sortTypes | string[] | description |
|
mergeSort | streams:MergeSort | description |
-
<OrderBy> __init(function (streams:StreamEvent?[]) returns () nextProcessorPointer, function (map<anydata>) returns (anydata)?[] fieldFuncs, string[] sortTypes)
Parameter Name Data Type Default Value Description nextProcessorPointer function (streams:StreamEvent?[]) returns () fieldFuncs function (map ) returns (anydata)?[] sortTypes string[] -
<OrderBy> process(streams:StreamEvent?[] streamEvents)
Sorts the given array of stream events according to the given parameters (fieldFuncs and sortTypes).
Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be sorted.
public type OutputProcess object
The OutputProcess
object is responsible for sending the output (only the events of type streams:CURRENT
to the
destination stream. It takes a function pointer outputFunc
which actually has the logic to process the output.
-
<OutputProcess> __init(function (map<anydata>[]) returns () outputFunc)
Parameter Name Data Type Default Value Description outputFunc function (map []) returns () -
<OutputProcess> process(streams:StreamEvent?[] streamEvents)
Sends the output to the streaming action. most of the time the output is published to a destination stream at the streaming action. Only the events with type
streams:CURRENT
are passed to theoutputFunc
.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be filtered out for CURRENT events.
public type Scheduler object
The Scheduler
object is responsible for generating streams:TIMER events at the given timestamp. Once the event is
generated, the timer event is passed to the provided processFunc
function pointer. The function pointer is the
process
function of the target processor, to which the timer event should be sent.
-
<Scheduler> __init(function (streams:StreamEvent?[]) returns () processFunc)
Parameter Name Data Type Default Value Description processFunc function (streams:StreamEvent?[]) returns () -
<Scheduler> notifyAt(int timestamp)
Schedule to send a timer events at the given timestamp.
Parameter Name Data Type Default Value Description timestamp int The timestamp at which the timer event will be generated and passed to the provided
processFunc
. -
<Scheduler> sendTimerEvents() returns (error<>|null)
Creates the timer events.
Return Type Description error<>|null Returns error if sending timer events failed.
public type Select object
The Select
object represents the select clause. Anything that comes under select clause like aggregator function
invocations are also handled in this processor. Further, grouping of the events (provided by the groupby clause) is
also performed in this processor. aggregatorArr
is an array of aggregators which are used in the select clause.
groupbyFuncArray
is an array of function pointers which returns the values being grouped. selectFunc
is a
function pointer to a lambda function which creates the data
field of the output stream event. scopeName
is
used as a breadcrumb to identify the select clause if there are multiple forever
blocks.
-
<Select> process(streams:StreamEvent?[] streamEvents)
Selects only the selected fields in the select clause.
Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events passed to the select clause.
-
<Select> getGroupByKey(function (streams:StreamEvent) returns (anydata)?[]? groupbyFunctionArray, streams:StreamEvent e) returns (string)
Creates a unique key for each group with the given values in the group by clause.
Parameter Name Data Type Default Value Description groupbyFunctionArray function (streams:StreamEvent) returns (anydata)?[]? Function pointer array to the lambda function which returns each group by field.
e streams:StreamEvent Stream Event object being grouped.
Return Type Description string Returns a unique groupby key by which the event
e
is grouped.
public type Snapshotable object
Abstract Snapshotable to be referenced by all snapshotable objects.
-
<Snapshotable> saveState() returns (map<any>)
Function to return the current state of a snapshotable object.
Return Type Description map A
map<any>
that represents the current state of the snapshotable instance. -
<Snapshotable> restoreState(map state)
Function to restore a previous state intoa a snapshotable object.
Parameter Name Data Type Default Value Description state map A
map<any>
state that can be used to restore snapshotable instance to a previous state.
public type SortWindow object
The sort window hold a given number of events and emit the expired events in the ordered by the given fields.
E.g.
from inputStream window sort(10, inputStream.age, "ascending", inputStream.name, "descending")
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
The sort
window should have three or more odd no of parameters (
Field Name | Data Type | Default Value | Description |
---|---|---|---|
lengthToKeep | int | description |
|
windowParameters | any[] | description |
|
sortedWindow | streams:LinkedList | description |
|
fields | string[] | description |
|
sortTypes | string[] | description |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | description |
|
fieldFuncs | function (map |
description |
|
mergeSort | streams:MergeSort | description |
-
<SortWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<SortWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<SortWindow> process(streams:StreamEvent?[] streamEvents)
The
process
function process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<SortWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent|null,StreamEvent|null)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent|StreamEvent">null,StreamEvent|StreamEvent">null)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<SortWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<SortWindow> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type StdDev object
The aggregator object to calculate standard deviation.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
mean | float | 0.0 | description |
stdDeviation | float | 0.0 | description |
sumValue | float | 0.0 | description |
count | int | 0 | description |
-
<StdDev> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current standard deviation as the new values come into the aggregation.
Parameter Name Data Type Default Value Description value anydata Value being added or removed from aggregation in order to calculate the new standard deviation.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT
,streams:EXPIRED
orstreams:RESET
.Return Type Description anydata Updated standard deviation.
-
<StdDev> copy() returns (Aggregator)
Returns a copy of the
StdDev
aggregator.Return Type Description Aggregator A
Aggregator
object which representsStdDev
aggregator. -
<StdDev> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<StdDev> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type StreamEvent object
The StreamEvent
object is a wrapper around the actual data being received to the input stream. If a record is
receive to a input stream, that record is converted to a map of anydata values and set that map to a field called
data
in a new StreamEvent
object. StreamEvent
is only used internally to transmit event data from one
processor to another processor. At the time the record is converted to a map, the timestamp is set. If the record
is first received by the input stream, the eventType is set to streams:CURRENT. Other than stream events of type
streams:CURRENT, there are 3 types of StreamEvents. They are streams:EXPIRED, streams:RESET, streams:TIMER. An expired
event is used to remove the state of its respective current event. A reset event is used to completely wipe the
state and a timer event is used to trigger the process
method of a particular processor in timely manner.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
eventType | CURRENT|EXPIRED|ALL|RESET|TIMER | description |
|
timestamp | int | description |
|
data | map |
{} | description |
-
<StreamEvent> __init((string,map<anydata>)|map<anydata> eventData, CURRENT|EXPIRED|ALL|RESET|TIMER eventType, int timestamp)
Parameter Name Data Type Default Value Description eventData (string,map )|map eventType CURRENT|EXPIRED|ALL|RESET|TIMER timestamp int -
<StreamEvent> copy() returns (StreamEvent)
Returns a copy of the stream event instance.
Return Type Description StreamEvent A copy of the
StreamEvent
object with its state. -
<StreamEvent> addData(map<anydata> eventData)
Adds key values pairs in a given map to the field
data
.Parameter Name Data Type Default Value Description eventData map map of anydata values to be added to field
data
. -
<StreamEvent> addAttribute(string key, anydata val)
Adds an attribute of an event to the map with its value.
Parameter Name Data Type Default Value Description key string The key of the map entry.
val anydata Respective value of the
key
.
public type StreamJoinProcessor object
The StreamJoinProcessor
object is responsible for performing SQLish joins between two or more streams.
The onConditionFunc
is the lambda function which represents the where clause in the join clause. The joining
happens only if the condition is statified. nextProcessor
is the process
function of the next processor, which
can be a Select
processor, Aggregator
processor, Having
processor.. etc. The lhsStream
is the left hand side
stream of the join and its attached window is 'lhsWindow
. The rhsStream
is the right hand side stream of the join
and its attached window is 'rhsWindow
. The unidirectionalStream
stream defines the stream by which the joining is
triggered when the events are received. Usually it is lhsStream
, in rare cases it can be rhsStream
. The
joinType
is the type of the join and it can be any value defined by streams:JoinType
.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
lhsWindow | streams:Window? | description |
|
rhsWindow | streams:Window? | description |
|
lhsStream | string? | description |
|
rhsStream | string? | description |
|
unidirectionalStream | string? | description |
|
joinType | JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN | description |
|
lockField | int | 0 | description |
-
<StreamJoinProcessor> __init(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (map<anydata>,map<anydata>) returns (boolean)? onConditionFunc)
Parameter Name Data Type Default Value Description nextProcessor function (streams:StreamEvent?[]) returns () joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN onConditionFunc function (map ,map ) returns (boolean)? -
<StreamJoinProcessor> process(streams:StreamEvent?[] streamEvents)
Process the events from both
lhsStream
and therhsStream
and perform the joining.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] Stream events being joined.
-
<StreamJoinProcessor> setLHS(string streamName, streams:Window windowInstance)
Sets the left hand side stream name and the respective window instance.
Parameter Name Data Type Default Value Description streamName string The name of the left hand side stream.
windowInstance streams:Window The window attached to the left hand side stream.
-
<StreamJoinProcessor> setRHS(string streamName, streams:Window windowInstance)
Sets the right hand side stream name and the respective window instance.
Parameter Name Data Type Default Value Description streamName string The name of the right hand side stream.
windowInstance streams:Window The window attached to the right hand side stream.
-
<StreamJoinProcessor> setUnidirectionalStream(string streamName)
Sets the stream by which the joining is triggered.
Parameter Name Data Type Default Value Description streamName string The name of the stream. In most cases, the joining is triggered when the events are received by the left hand side stream even if the right hand side stream receives the events before the left hand side stream receives events.
public type Sum object
Aggregator to perform summation of values in a stream.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
iSum | int | 0 | description |
fSum | float | 0.0 | description |
-
<Sum> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current sum of numeric values based on the
eventType
. If theeventType
isstreams:CURRENT
,value
is added to the current sum. If theeventType
isstreams:EXPIRED
,value
is subtracted from the current sum. If theeventType
isstreams:RESET
, Current summation will be reset, regardless thevalue
.Parameter Name Data Type Default Value Description value anydata The numeric value being aggregated to the current sum.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT
,streams:EXPIRED
orstreams:RESET
.Return Type Description anydata aggregated summation after the given
value
. -
<Sum> copy() returns (Aggregator)
Returns a copy of the
Sum
aggregator without its current state.Return Type Description Aggregator Returns
Sum
aggregator. -
<Sum> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<Sum> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type TableJoinProcessor object
The TableJoinProcessor
object handles joining streams with in-memory tables in ballerina.
nextProcessor
is the process
function of the next processor, which can be a Select
processor, Aggregator
processor, Having
processor.. etc. The streamName
is the stream of the join and its attached
window is 'windowInstance
. The tableName
is the name of the table with which the stream is joined. The
joinType
is the type of the join and it can be any value defined by streams:JoinType
.
Field Name | Data Type | Default Value | Description |
---|---|---|---|
windowInstance | streams:Window? | description |
|
streamName | string | description |
|
tableName | string | description |
|
joinType | JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN | description |
|
lockField | int | 0 | description |
-
<TableJoinProcessor> __init(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (streams:StreamEvent) returns (map<anydata>[]) tableQuery)
Parameter Name Data Type Default Value Description nextProcessor function (streams:StreamEvent?[]) returns () joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN tableQuery function (streams:StreamEvent) returns (map []) -
<TableJoinProcessor> process(streams:StreamEvent?[] streamEvents)
Joins the incoming events to the stream with the given table.
Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The stream events being joined with the table.
-
<TableJoinProcessor> setJoinProperties(string tn, string sn, streams:Window wi)
Set the properties required for joining.
Parameter Name Data Type Default Value Description tn string The name of the table being joined with the stream.
sn string The name of the stream being joined with the table.
wi streams:Window The window instance which is attached to the stream.
public type TimeAccumulatingWindow object
The TimeAccumulatingWindow
holds the events but if the events are not received for a specific time period, the
collected events are released, at the point the time exceeds the given time period from the time when the last
event is received.
E.g.
from inputStream window timeAccum(4000)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
Time accumulating window should only have one parameter (
Field Name | Data Type | Default Value | Description |
---|---|---|---|
timeInMillis | int | description |
|
windowParameters | any[] | description |
|
currentEventQueue | streams:LinkedList | description |
|
resetEvent | streams:StreamEvent? | description |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | description |
|
lastTimestamp | int | -1 | description |
scheduler | streams:Scheduler | description |
-
<TimeAccumulatingWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<TimeAccumulatingWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<TimeAccumulatingWindow> process(streams:StreamEvent?[] streamEvents)
The
process
function process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<TimeAccumulatingWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent|null,StreamEvent|null)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent|StreamEvent">null,StreamEvent|StreamEvent">null)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<TimeAccumulatingWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<TimeAccumulatingWindow> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type TimeBatchWindow object
This is a batch (tumbling) time window, that holds events arrived between window time periods, and gets updated for
every window time.
E.g.
from inputStream window timeBatch(5000)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
The timeBatch
window should only have one parameter (
Field Name | Data Type | Default Value | Description |
---|---|---|---|
timeInMilliSeconds | int | description |
|
windowParameters | any[] | description |
|
nextEmitTime | int | -1 | description |
currentEventQueue | streams:LinkedList | description |
|
resetEvent | streams:StreamEvent? | description |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | description |
|
scheduler | streams:Scheduler | description |
-
<TimeBatchWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<TimeBatchWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<TimeBatchWindow> process(streams:StreamEvent?[] streamEvents)
The
process
function process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<TimeBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent|null,StreamEvent|null)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent|StreamEvent">null,StreamEvent|StreamEvent">null)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<TimeBatchWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<TimeBatchWindow> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type TimeLengthWindow object
This is a sliding time window that, at a given time holds the last windowLength events that arrived during last
windowTime period, and gets updated for every event arrival and expiry.
E.g.
from inputStream window timeLength(4000, 10)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
The timeLength
window should only have two parameters (
Field Name | Data Type | Default Value | Description |
---|---|---|---|
timeInMilliSeconds | int | description |
|
length | int | description |
|
windowParameters | any[] | description |
|
count | int | 0 | description |
expiredEventChunk | streams:LinkedList | description |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | description |
|
scheduler | streams:Scheduler | description |
-
<TimeLengthWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<TimeLengthWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<TimeLengthWindow> process(streams:StreamEvent?[] streamEvents)
The
process
function process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<TimeLengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent|null,StreamEvent|null)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent|StreamEvent">null,StreamEvent|StreamEvent">null)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<TimeLengthWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<TimeLengthWindow> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type TimeOrderWindow object
The TimeOrderWindow
sorts the events to be expired by the given timestamp field by comparing that timestamp value
to engine system time.
E.g.
from inputStream window timeOrder(inputStream.timestamp, 4000, true)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
timeOrder
window should only have three parameters (timestamp field,
Field Name | Data Type | Default Value | Description |
---|---|---|---|
timeInMillis | int | description |
|
windowParameters | any[] | description |
|
expiredEventQueue | streams:LinkedList | description |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | description |
|
timestamp | string | description |
|
lastTimestamp | int | description |
|
dropOlderEvents | boolean | description |
|
mergeSort | streams:MergeSort | description |
|
scheduler | streams:Scheduler | description |
-
<TimeOrderWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<TimeOrderWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<TimeOrderWindow> process(streams:StreamEvent?[] streamEvents)
The
process
function process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<TimeOrderWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent|null,StreamEvent|null)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent|StreamEvent">null,StreamEvent|StreamEvent">null)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<TimeOrderWindow> getTimestamp(any val) returns (int)
Parameter Name Data Type Default Value Description val any Return Type Description int -
<TimeOrderWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<TimeOrderWindow> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type TimeWindow object
The TimeWindow
is a sliding time window, that holds events for that arrived during last windowTime period, and
gets updated on every event arrival and expiry.
E.g.
from inputStream window time(5000)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
The time
window should only have one parameter (
Field Name | Data Type | Default Value | Description |
---|---|---|---|
timeInMillis | int | description |
|
windowParameters | any[] | description |
|
expiredEventQueue | streams:LinkedList | description |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | description |
|
lastTimestamp | int | -9223372036854775808 | description |
scheduler | streams:Scheduler | description |
-
<TimeWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<TimeWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<TimeWindow> process(streams:StreamEvent?[] streamEvents)
The
process
function process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<TimeWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent|null,StreamEvent|null)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent|StreamEvent">null,StreamEvent|StreamEvent">null)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<TimeWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<TimeWindow> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type UniqueLengthWindow object
This is a length window which only keeps the unique events.
E.g.
from inputStream window uniqueLength(inputStream.timestamp, 4000)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
The uniqueLength
window should only have two parameters (stream field,
Field Name | Data Type | Default Value | Description |
---|---|---|---|
uniqueKey | string | description |
|
length | int | description |
|
windowParameters | any[] | description |
|
count | int | 0 | description |
uniqueMap | map |
description |
|
expiredEventChunk | streams:LinkedList | description |
|
nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | description |
-
<UniqueLengthWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<UniqueLengthWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<UniqueLengthWindow> process(streams:StreamEvent?[] streamEvents)
The
process
function process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<UniqueLengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent|null,StreamEvent|null)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent|StreamEvent">null,StreamEvent|StreamEvent">null)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<UniqueLengthWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
any
typed values.Return Type Description map A map of
any
typed values. -
<UniqueLengthWindow> restoreState(map state)
Restores the saved state which is passed as a map of
any
typed values.Parameter Name Data Type Default Value Description state map A map of typed
any
values. This map contains the values to be restored from the persisted data.
public type Window object
The Window
abstract objects is the base object for implementing windows in Ballerina streams. The process
function contains the logic of processing events when events are received. getCandidateEvents
function is used
inside the Select
object to return the events in the window to perform joining.
The window names in the window objects cannot be used in the queries. Always a function which returns the specific
window has to be used in streaing query.
E.g. If LengthWindow
has to be used in a streaming query, the function streams:length
has to be used for
streaming query without the module identifier streams
. An example is shown below.
from inputStream window length
(5)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
-
<Window> process(streams:StreamEvent?[] streamEvents)
The
process
function process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<Window> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent|null,StreamEvent|null)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent|StreamEvent">null,StreamEvent|StreamEvent">null)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.