ballerina/streams module
Type Definitions
| Type | Values | Description | |
|---|---|---|---|
| EventType | TIMER | RESET | EXPIRED | CURRENT | ALL | The type of stream events. |
|
| JoinType | RIGHTOUTERJOIN | LEFTOUTERJOIN | JOIN | FULLOUTERJOIN | The types of joins between streams and tables. |
Records Summary
| Record | Description | ||
|---|---|---|---|
| SnapshottableStreamEvent | This record represents a stream event which can be persisted. |
Objects Summary
| Object | Description | ||
|---|---|---|---|
| Aggregator | Abstract object, which should be implemented in order to create a new aggregator. |
||
| Average | Aggregator to calculate average in streams. |
||
| Count | Aggregator to count events in streams. |
||
| DelayWindow | |||
| DistinctCount | Aggregator to get the distinct counts of values in streams. |
||
| ExternalTimeBatchWindow | |||
| ExternalTimeWindow | This is a sliding time window based on external time, that holds events for that arrived during last window time
period from the external timestamp, and gets updated on every monotonically increasing timestamp.
E.g.
from inputStream window |
||
| Filter | The |
||
| HoppingWindow | |||
| IntSort | This class implements a merge sort algorithm to sort timestamp values for state persistence. |
||
| LengthBatchWindow | This is a batch (tumbling) length window, that holds up to the given length of events, and gets updated on every
given number of events arrival.
E.g.
from inputStream window |
||
| LengthWindow | The |
||
| LinkedList | The |
||
| Max | Aggregator to find the maximum value in a stream. |
||
| MaxForever | The aggregator to keep the maximum value received so far. It is similar to |
||
| MergeSort | This object implements the merge sort algorithm to sort the provided value arrays. |
||
| Min | Aggregator to find the minimum value in a stream. |
||
| MinForever | The aggregator to keep the minimum value received so far. It is similar to |
||
| Node | The |
||
| OrderBy | The |
||
| OutputProcess | The |
||
| Scheduler | The |
||
| Select | The |
||
| Snapshotable | Abstract Snapshotable to be referenced by all snapshotable objects. |
||
| SortWindow | |||
| StdDev | The aggregator object to calculate standard deviation |
||
| StreamEvent | The |
||
| StreamJoinProcessor | The |
||
| Sum | Aggregator to perform summation of values in a stream. |
||
| TableJoinProcessor | The |
||
| TimeAccumulatingWindow | |||
| TimeBatchWindow | |||
| TimeLengthWindow | |||
| TimeOrderWindow | |||
| TimeWindow | |||
| UniqueLengthWindow | This is a length window which only keeps the unique events.
E.g.
from inputStream window |
||
| Window | The from inputStream window |
Functions Summary
| Return Type | Function and Description | ||
|---|---|---|---|
| Aggregator | avg() Returns a |
||
| StreamEvent[] | buildStreamEvent(any t, string streamName) Creates |
||
| Aggregator | count() Returns a |
||
| Filter | createFilter(function (streams:StreamEvent?[]) returns () nextProcPointer, function (map<anydata>) returns (boolean) conditionFunc) Creates a |
||
| OrderBy | createOrderBy(function (streams:StreamEvent?[]) returns () nextProcessorPointer, function (map<anydata>) returns (anydata)?[] fields, string[] sortFieldMetadata) Creates an |
||
| OutputProcess | createOutputProcess(function (map<anydata>[]) returns () outputFunc) Creates and return a |
||
| StreamEvent | createResetStreamEvent(streams:StreamEvent event) Creates a RESET event from a given event. |
||
| Select | createSelect(function (streams:StreamEvent?[]) returns () nextProcPointer, streams:Aggregator[] aggregatorArr, function (streams:StreamEvent) returns (anydata)?[]? groupbyFuncArray, function (streams:StreamEvent,streams:Aggregator[]) returns (map<anydata>) selectFunc, string scopeName) Creates and returns a select clause. |
||
| StreamJoinProcessor | createStreamJoinProcessor(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc) Creates a |
||
| TableJoinProcessor | createTableJoinProcessor(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (streams:StreamEvent) returns (map<anydata>[]) tableQuery) Creates a |
||
| Window | delay(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
| Aggregator | distinctCount() Returns a |
||
| Window | externalTime(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
| Window | externalTimeBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
| StreamEvent | getStreamEvent(any anyEvent) Get the stream event from any? field. This function can only be used only if we are sure that the |
||
| Window | hopping(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
| initPersistence() Function to initialize and start snapshotting. |
|||
| Window | length(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
| Window | lengthBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
| Aggregator | max() Returns a |
||
| Aggregator | maxForever() Returns a |
||
| Aggregator | min() Returns a |
||
| Aggregator | minForever() Returns a |
||
| registerSnapshotable(string key, any reference) Function to register Snapshotables. |
|||
| boolean | removeState(string key) Function to clear an existing state. |
||
| restoreState(string key, any reference) Function to restore state of a given object. |
|||
| Window | sort(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
| Aggregator | stdDev() Returns a |
||
| Aggregator | sum() Returns a |
||
| Window | time(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
| Window | timeAccum(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
| Window | timeBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
| Window | timeLength(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
| Window | timeOrder(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
||
| SnapshottableStreamEvent | toSnapshottableEvent(streams:StreamEvent event) Convert a single |
||
| SnapshottableStreamEvent[] | toSnapshottableEvents(streams:StreamEvent[]|any[]? events) Converts a given array of streams:StreamEvent objects to an array of |
||
| StreamEvent | toStreamEvent(streams:SnapshottableStreamEvent event) Convert a single |
||
| StreamEvent[] | toStreamEvents(streams:SnapshottableStreamEvent[]|any[]? events) Converts a given array of snapshotable events to an array of |
||
| Window | uniqueLength(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) The |
Constants
public type SnapshottableStreamEvent record
This record represents a stream event which can be persisted.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| eventType | CURRENT|EXPIRED|ALL|RESET|TIMER | CURRENT | |
| timestamp | int | 0 | |
| data | map |
{} |
public function avg() returns (Aggregator)
Returns a Average aggregator object. The aggregator function name which is used in a streaming query should have the
same name as this function's name.
| Return Type | Description | ||
|---|---|---|---|
| Aggregator | A |
public function buildStreamEvent(any t, string streamName) returns (StreamEvent[])
Creates streams:StreamEvent object array for a record t received by the stream denoted by the name streamNme.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| t | any | A record received by the stream |
|
| streamName | string | Name of the stream to which the record |
| Return Type | Description | ||
|---|---|---|---|
| StreamEvent[] | Returns an array of streams:StreamEvents|() |
public function count() returns (Aggregator)
Returns a Count aggregator object. The aggregator function name which is used in a streaming query should have the
same name as this function's name.
| Return Type | Description | ||
|---|---|---|---|
| Aggregator | A |
public function createFilter(function (streams:StreamEvent?[]) returns () nextProcPointer, function (map<anydata>) returns (boolean) conditionFunc) returns (Filter)
Creates a Filter object and return it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| nextProcPointer | function (streams:StreamEvent?[]) returns () | The function pointer to the |
|
| conditionFunc | function (map |
The function pointer to the condition evaluator. This is a function which returns true or false based on the boolean expression given in the where clause. |
| Return Type | Description | ||
|---|---|---|---|
| Filter | Returns a |
public function createOrderBy(function (streams:StreamEvent?[]) returns () nextProcessorPointer, function (map<anydata>) returns (anydata)?[] fields, string[] sortFieldMetadata) returns (OrderBy)
Creates an OrderBy object and return it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| nextProcessorPointer | function (streams:StreamEvent?[]) returns () | ||
| fields | function (map |
An array of function pointers which each returns a field by which the events are sorted. Events are sorted by the first field, if there are elements of same value, the second field is used and so on. |
|
| sortFieldMetadata | string[] | sortTypes of the fields ( |
| Return Type | Description | ||
|---|---|---|---|
| OrderBy | Returns a |
public function createOutputProcess(function (map<anydata>[]) returns () outputFunc) returns (OutputProcess)
Creates and return a OutputProcess object.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| outputFunc | function (map |
The function pointer to a lambda function created out of the statements in the streaming action |
| Return Type | Description | ||
|---|---|---|---|
| OutputProcess | Returns a |
public function createResetStreamEvent(streams:StreamEvent event) returns (StreamEvent)
Creates a RESET event from a given event.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| event | streams:StreamEvent | The event from which the reset event is created. |
| Return Type | Description | ||
|---|---|---|---|
| StreamEvent | A stream event of type streams:RESET. |
public function createSelect(function (streams:StreamEvent?[]) returns () nextProcPointer, streams:Aggregator[] aggregatorArr, function (streams:StreamEvent) returns (anydata)?[]? groupbyFuncArray, function (streams:StreamEvent,streams:Aggregator[]) returns (map<anydata>) selectFunc, string scopeName) returns (Select)
Creates and returns a select clause.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| nextProcPointer | function (streams:StreamEvent?[]) returns () | The function pointer to the |
|
| aggregatorArr | streams:Aggregator[] | The array of aggregators used in the select clause. If the same aggregator is used twice, the
|
|
| groupbyFuncArray | function (streams:StreamEvent) returns (anydata)?[]? | The array of function pointer which contains the lambda function which returns the expressions in the group by clause. |
|
| selectFunc | function (streams:StreamEvent,streams:Aggregator[]) returns (map |
The function pointer to a lambda function that creates the |
|
| scopeName | string | $scope$name | A unique id to identify the forever block if there are multiple forever blocks. |
| Return Type | Description | ||
|---|---|---|---|
| Select | Returns a |
public function createStreamJoinProcessor(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc) returns (StreamJoinProcessor)
Creates a StreamJoinProcessor and returns it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| nextProcessor | function (streams:StreamEvent?[]) returns () | The |
|
| joinType | JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN | Type of the join being performed ("JOIN"|"LEFTOUTERJOIN"|"RIGHTOUTERJOIN"|"FULLOUTERJOIN") |
|
| conditionFunc | function (map |
() |
| Return Type | Description | ||
|---|---|---|---|
| StreamJoinProcessor | Returns a |
public function createTableJoinProcessor(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (streams:StreamEvent) returns (map<anydata>[]) tableQuery) returns (TableJoinProcessor)
Creates a TableJoinProcessor and return it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| nextProcessor | function (streams:StreamEvent?[]) returns () | ||
| joinType | JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN | The type of the join and it can be any value defined by |
|
| tableQuery | function (streams:StreamEvent) returns (map |
The function pointer to a function which retrieves the records from the table and joins w ith each stream event. |
| Return Type | Description | ||
|---|---|---|---|
| TableJoinProcessor | Returns a |
public function delay(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The delay function creates a DelayWindow object and returns it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| windowParameters | any[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
| Return Type | Description | ||
|---|---|---|---|
| Window | Returns the created window. |
public function distinctCount() returns (Aggregator)
Returns a DistinctCount aggregator object. The aggregator function name which is used in a streaming query should
have the same name as this function's name.
| Return Type | Description | ||
|---|---|---|---|
| Aggregator | A |
public function externalTime(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The externalTime function creates a ExternalTimeWindow object and returns it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| windowParameters | any[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
| Return Type | Description | ||
|---|---|---|---|
| Window | Returns the created window. |
public function externalTimeBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The externalTimeBatch function creates a ExternalTimeBatchWindow object and returns it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| windowParameters | any[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
| Return Type | Description | ||
|---|---|---|---|
| Window | Returns the created window. |
public function getStreamEvent(any anyEvent) returns (StreamEvent)
Get the stream event from any? field. This function can only be used only if we are sure that the anyEvent is a
streams:StreamEvent.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| anyEvent | any | The object from which, the stream event is extracted. |
| Return Type | Description | ||
|---|---|---|---|
| StreamEvent | Returns the extracted streams:StreamEvent object. |
public function hopping(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The hopping function creates a HoppingWindow object and returns it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| windowParameters | any[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
| Return Type | Description | ||
|---|---|---|---|
| Window | Returns the created window. |
public function initPersistence()
Function to initialize and start snapshotting.
public function length(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The length function creates a LengthWindow object and returns it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| windowParameters | any[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
| Return Type | Description | ||
|---|---|---|---|
| Window | Returns the created window. |
public function lengthBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The lengthBatch function creates a LengthBatchWindow object and returns it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| windowParameters | any[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
| Return Type | Description | ||
|---|---|---|---|
| Window | Returns the created window. |
public function max() returns (Aggregator)
Returns a Max aggregator object. The aggregator function name which is used in a streaming query should
have the same name as this function's name.
| Return Type | Description | ||
|---|---|---|---|
| Aggregator | A |
public function maxForever() returns (Aggregator)
Returns a MaxForever aggregator object. The aggregator function name which is used in a streaming query should
have the same name as this function's name.
| Return Type | Description | ||
|---|---|---|---|
| Aggregator | A |
public function min() returns (Aggregator)
Returns a Min aggregator object. The aggregator function name which is used in a streaming query should
have the same name as this function's name.
| Return Type | Description | ||
|---|---|---|---|
| Aggregator | A |
public function minForever() returns (Aggregator)
Returns a MinForever aggregator object. The aggregator function name which is used in a streaming query should
have the same name as this function's name.
| Return Type | Description | ||
|---|---|---|---|
| Aggregator | A |
public function registerSnapshotable(string key, any reference)
Function to register Snapshotables.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| key | string | An unique |
|
| reference | any | The snapshotable reference to be registered. |
public function removeState(string key) returns (boolean)
Function to clear an existing state.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| key | string | An unique |
| Return Type | Description | ||
|---|---|---|---|
| boolean | A |
public function restoreState(string key, any reference)
Function to restore state of a given object.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| key | string | An unique |
|
| reference | any | The snapshotable reference to be restored. |
public function sort(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The sort function creates a SortWindow object and returns it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| windowParameters | any[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
| Return Type | Description | ||
|---|---|---|---|
| Window | Returns the created window. |
public function stdDev() returns (Aggregator)
Returns a StdDev aggregator object. The aggregator function name which is used in a streaming query should
have the same name as this function's name.
| Return Type | Description | ||
|---|---|---|---|
| Aggregator | A |
public function sum() returns (Aggregator)
Returns a Sum aggregator object. The aggregator function name which is used in a streaming query should have the
same name as this function's name.
| Return Type | Description | ||
|---|---|---|---|
| Aggregator | A |
public function time(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The time function creates a TimeWindow object and returns it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| windowParameters | any[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
| Return Type | Description | ||
|---|---|---|---|
| Window | Returns the created window. |
public function timeAccum(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The timeAccum function creates a TimeAccumulatingWindow object and returns it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| windowParameters | any[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
| Return Type | Description | ||
|---|---|---|---|
| Window | Returns the created window. |
public function timeBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The timeBatch function creates a TimeBatchWindow object and returns it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| windowParameters | any[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
| Return Type | Description | ||
|---|---|---|---|
| Window | Returns the created window. |
public function timeLength(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The timeLength function creates a TimeLengthWindow object and returns it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| windowParameters | any[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
| Return Type | Description | ||
|---|---|---|---|
| Window | Returns the created window. |
public function timeOrder(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The timeOrder function creates a TimeOrderWindow object and returns it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| windowParameters | any[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
| Return Type | Description | ||
|---|---|---|---|
| Window | Returns the created window. |
public function toSnapshottableEvent(streams:StreamEvent event) returns (SnapshottableStreamEvent)
Convert a single streams:StreamEvent object to streams:SnapshottableStreamEvent object.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| event | streams:StreamEvent | The |
| Return Type | Description | ||
|---|---|---|---|
| SnapshottableStreamEvent | The converted |
public function toSnapshottableEvents(streams:StreamEvent[]|any[]? events) returns (SnapshottableStreamEvent[])
Converts a given array of streams:StreamEvent objects to an array of streams:SnapshottableStreamEvent.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| events | streams:StreamEvent[]|any[]? | The events to be coverted to snapshotable events. |
| Return Type | Description | ||
|---|---|---|---|
| SnapshottableStreamEvent[] | Returns the converted snapshotable events. |
public function toStreamEvent(streams:SnapshottableStreamEvent event) returns (StreamEvent)
Convert a single streams:SnapshottableStreamEvent object to streams:StreamEvent object.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| event | streams:SnapshottableStreamEvent | The |
| Return Type | Description | ||
|---|---|---|---|
| StreamEvent | The converted |
public function toStreamEvents(streams:SnapshottableStreamEvent[]|any[]? events) returns (StreamEvent[])
Converts a given array of snapshotable events to an array of streams:StreamEvent objects.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| events | streams:SnapshottableStreamEvent[]|any[]? | Snapshotable events to be converted to |
| Return Type | Description | ||
|---|---|---|---|
| StreamEvent[] | Returns the converted |
public function uniqueLength(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)
The uniqueLength function creates a UniqueLengthWindow object and returns it.
| Parameter Name | Data Type | Default Value | Description |
|---|---|---|---|
| windowParameters | any[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | () | The function pointer to the |
| Return Type | Description | ||
|---|---|---|---|
| Window | Returns the created window. |
public type Aggregator object
Abstract object, which should be implemented in order to create a new aggregator.
-
<Aggregator> copy() returns (Aggregator)
Returns a copy of self, but it does not contain the current state.
Return Type Description Aggregator A
Aggregatorobject. -
<Aggregator> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the aggregated value and returns the final aggregated value.
Parameter Name Data Type Default Value Description value anydata value being aggregated.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT,streams:EXPIREDorstreams:RESET. Based on the type of the eventvaluewill be added to the aggregation or removed from the aggregation.Return Type Description anydata Updated aggregated value after
valuebeing aggregated.
public type Average object
Aggregator to calculate average in streams.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| count | int | 0 | |
| sum | float | 0.0 |
-
<Average> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Returns the calculated average after
valuebeing aggregated into current average. If theeventTypeisstreams:CURRENT,valueis added to the current sum and count is increase by 1. If theeventTypeisstreams:EXPIRED,valueis subtracted from the current sum and count is descreased by 1. If theeventTypeisstreams:RESET, Current summation and count is reset, regardless thevalue. Then by dividing the sum by count, the average is calculated.Parameter Name Data Type Default Value Description value anydata The numeric value being aggregated in order to calculate the new average.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT,streams:EXPIREDorstreams:RESET.Return Type Description anydata Updated average value after
valuebeing aggregated. -
<Average> copy() returns (Aggregator)
Returns a copy of the
Averageaggregator without its current state.Return Type Description Aggregator Returns
Averageaggregator. -
<Average> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<Average> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type Count object
Aggregator to count events in streams.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| count | int | 0 |
-
<Count> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current count when a new event arrives and return the updated count. If the
eventTypeisstreams:CURRENT, count is increase by 1. If theeventTypeisstreams:EXPIRED, count is descreased by 1. If theeventTypeisstreams:RESET, count is reset, regardless thevalue.Parameter Name Data Type Default Value Description value anydata In count aggregator the value is not used.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT,streams:EXPIREDorstreams:RESET.Return Type Description anydata Updated count.
-
<Count> copy() returns (Aggregator)
Returns a copy of the
Countaggregator without its current state.Return Type Description Aggregator Returns
Countaggregator. -
<Count> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<Count> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type DelayWindow object
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| delayInMilliSeconds | int | ||
| windowParameters | any[] | ||
| delayedEventQueue | streams:LinkedList | ||
| lastTimestamp | int | 0 | |
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | ||
| scheduler | streams:Scheduler |
-
<DelayWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<DelayWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<DelayWindow> process(streams:StreamEvent?[] streamEvents)
The
processfunction process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<DelayWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent,StreamEvent)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<DelayWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<DelayWindow> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type DistinctCount object
Aggregator to get the distinct counts of values in streams.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| distinctValues | map |
{} |
-
<DistinctCount> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current distinct count when a new event arrives and return the updated count. If the
eventTypeisstreams:CURRENT, count is increased by 1. If theeventTypeisstreams:EXPIRED, count is descreased by 1. If theeventTypeisstreams:RESET, count is reset, regardless of thevalue.Parameter Name Data Type Default Value Description value anydata Value being counted uniquely.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT,streams:EXPIREDorstreams:RESET.Return Type Description anydata Updated distinct count.
-
<DistinctCount> copy() returns (Aggregator)
Returns a copy of the
DistinctCountaggregator without its current state.Return Type Description Aggregator Returns
DistinctCountaggregator. -
<DistinctCount> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<DistinctCount> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type ExternalTimeBatchWindow object
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| timeToKeep | int | ||
| currentEventChunk | streams:LinkedList | ||
| expiredEventChunk | streams:LinkedList | ||
| resetEvent | streams:StreamEvent? | () | |
| startTime | int | -1 | |
| isStartTimeEnabled | boolean | false | |
| replaceTimestampWithBatchEndTime | boolean | false | |
| flushed | boolean | false | |
| endTime | int | -1 | |
| schedulerTimeout | int | 0 | |
| lastScheduledTime | int | ||
| lastCurrentEventTime | int | 0 | |
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | ||
| timeStamp | string | ||
| storeExpiredEvents | boolean | false | |
| outputExpectsExpiredEvents | boolean | false | |
| windowParameters | any[] | ||
| scheduler | streams:Scheduler |
-
<ExternalTimeBatchWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<ExternalTimeBatchWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<ExternalTimeBatchWindow> process(streams:StreamEvent?[] streamEvents)
The
processfunction process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<ExternalTimeBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent,StreamEvent)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<ExternalTimeBatchWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<ExternalTimeBatchWindow> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data. -
<ExternalTimeBatchWindow> cloneAppend(streams:StreamEvent currStreamEvent)
Parameter Name Data Type Default Value Description currStreamEvent streams:StreamEvent -
<ExternalTimeBatchWindow> flushToOutputChunk(streams:LinkedList complexEventChunks, int currentTime, boolean preserveCurrentEvents)
Parameter Name Data Type Default Value Description complexEventChunks streams:LinkedList currentTime int preserveCurrentEvents boolean -
<ExternalTimeBatchWindow> appendToOutputChunk(streams:LinkedList complexEventChunks, int currentTime, boolean preserveCurrentEvents)
Parameter Name Data Type Default Value Description complexEventChunks streams:LinkedList currentTime int preserveCurrentEvents boolean -
<ExternalTimeBatchWindow> findEndTime(int currentTime, int startTime_, int timeToKeep_) returns (int)
Parameter Name Data Type Default Value Description currentTime int startTime_ int timeToKeep_ int Return Type Description int -
<ExternalTimeBatchWindow> initTiming(streams:StreamEvent firstStreamEvent)
Parameter Name Data Type Default Value Description firstStreamEvent streams:StreamEvent -
<ExternalTimeBatchWindow> getTimestamp(any val) returns (int)
Parameter Name Data Type Default Value Description val any Return Type Description int
public type ExternalTimeWindow object
This is a sliding time window based on external time, that holds events for that arrived during last window time
period from the external timestamp, and gets updated on every monotonically increasing timestamp.
E.g.
from inputStream window externalTime(inputStream.timestamp, 4000)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
The externalTime window should only have two parameters (timestamp field,
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| timeInMillis | int | ||
| windowParameters | any[] | ||
| expiredEventQueue | streams:LinkedList | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | ||
| timeStamp | string |
-
<ExternalTimeWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<ExternalTimeWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<ExternalTimeWindow> process(streams:StreamEvent?[] streamEvents)
The
processfunction process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<ExternalTimeWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent,StreamEvent)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<ExternalTimeWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<ExternalTimeWindow> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data. -
<ExternalTimeWindow> getTimestamp(any val) returns (int)
Parameter Name Data Type Default Value Description val any Return Type Description int
public type Filter object
The Filter object represents the where clause in a streaming query. This object takes two parameter for
initialization. nextProcessorPointer is the function pointer of the next processor to be invoked once the
filtering is complete. conditionFunc is a function pointer which return true if the given where clause evaluates to
true.
-
<Filter> process(streams:StreamEvent?[] streamEvents)
Process the incoming stream events. This function takes an array of stream events, iterate each of the events in the array, then call the
conditionFuncon each and see ifconditionFuncis evaluates to true. if so, those events will be passed to thenextPrcessorPointerwhich can be theprocessfunction of the next processor object ( e.g.Select,Window,Aggregator.. etc).Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The events being filtered.
public type HoppingWindow object
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| timeInMilliSeconds | int | ||
| hoppingTime | int | ||
| windowParameters | any[] | ||
| nextEmitTime | int | -1 | |
| currentEventQueue | streams:LinkedList | ||
| resetEvent | streams:StreamEvent? | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | ||
| scheduler | streams:Scheduler |
-
<HoppingWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<HoppingWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<HoppingWindow> process(streams:StreamEvent?[] streamEvents)
The
processfunction process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<HoppingWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent,StreamEvent)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<HoppingWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<HoppingWindow> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type IntSort object
This class implements a merge sort algorithm to sort timestamp values for state persistence.
-
<IntSort> sort(int[] arr)
Sorts a given array of int values.
Parameter Name Data Type Default Value Description arr int[] The array of int values to be sorted.
public type LengthBatchWindow object
This is a batch (tumbling) length window, that holds up to the given length of events, and gets updated on every
given number of events arrival.
E.g.
from inputStream window lengthBatch(5)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
The lengthBatch window should only have one parameter (
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| length | int | ||
| windowParameters | any[] | ||
| count | int | ||
| resetEvent | streams:StreamEvent? | ||
| currentEventQueue | streams:LinkedList | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? |
-
<LengthBatchWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<LengthBatchWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<LengthBatchWindow> process(streams:StreamEvent?[] streamEvents)
The
processfunction process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<LengthBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent,StreamEvent)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<LengthBatchWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<LengthBatchWindow> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type LengthWindow object
The LengthWindow is a sliding length window, that holds last windowLength events, and gets updated on every event
arrival and expiry.
E.g.
from inputStream window length(5)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
The length window should only have one parameter (
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| size | int | ||
| linkedList | streams:LinkedList | ||
| windowParameters | any[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? |
-
<LengthWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<LengthWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<LengthWindow> process(streams:StreamEvent?[] streamEvents)
The
processfunction process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<LengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent,StreamEvent)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<LengthWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<LengthWindow> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type LinkedList object
The LinkedList object which represents the linked list data structure.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| first | streams:Node? | ||
| last | streams:Node? | ||
| curr | streams:Node? | ||
| size | int | 0 | |
| ascend | boolean | true |
-
<LinkedList> isEmpty() returns (boolean)
Checks if the linked list is empty.
Return Type Description boolean Returns
trueif the linked list is empty, otherwise returnsfalse. -
<LinkedList> resetToFront()
Moves the cursoer to the front/head of the linked list.
-
<LinkedList> resetToRear()
Moves the cursor to the end of the linked list if the cursor is not already at the last element of the linked list.
-
<LinkedList> hasNext() returns (boolean)
Returns true if the linked list has more elements starting from the current cursor location.
Return Type Description boolean Returns
trueif there are more elements onwards from the current cursor location, otherwisefalse. -
<LinkedList> hasPrevious() returns (boolean)
Returns
trueif there are prior elements to the current element/cursor location, otherwise false.Return Type Description boolean Returns true, if there are elements prior to the current cursor location, otherwise
false. -
<LinkedList> next() returns (any)
Returns the next element of the linked list and moves the cursor to the next element.
Return Type Description any The next element from the current cursor location.
-
<LinkedList> previous() returns (any)
Returns the previous element of the linked list and moves the cursor to the previous element.
Return Type Description any The previous element from the current cursor location.
-
<LinkedList> removeCurrent()
Removes the element at the current cursor location.
-
<LinkedList> getSize() returns (int)
Returns the current number of elements in the linked list.
Return Type Description int The number of elements in the linked list.
-
<LinkedList> clear()
Empties the linked list.
-
<LinkedList> removeFirstOccurrence(any elem) returns (boolean)
Removes the first occurence of the element pass as
elemand returntrueof the removal is successful.Parameter Name Data Type Default Value Description elem any Return Type Description boolean Return
trueif removal is successful otherwisefalse. -
<LinkedList> remove(any elem) returns (boolean)
Removes the first occurence of the element pass as
elemand returntrueof the removal is successful.Parameter Name Data Type Default Value Description elem any Return Type Description boolean Return
trueif removal is successful otherwisefalse. -
<LinkedList> getFirst() returns (any)
Returns the first element of the linked list, without moving the cursor.
Return Type Description any First element of the linked list.
-
<LinkedList> getLast() returns (any)
Returns the last element of the linked list, without moving the cursor.
Return Type Description any Last element of the linked list.
-
<LinkedList> addFirst(any data)
Adds a new element to the front of the linked list without moving the cursor.
Parameter Name Data Type Default Value Description data any Data to be added to the front of the linked list.
-
<LinkedList> addLast(any data)
Adds a new element to the end of the linked list without moving the cursor.
Parameter Name Data Type Default Value Description data any Data to be added to the end of the linked list.
-
<LinkedList> removeFirst() returns (any)
Removes the first element in the linked list without moving the cursor.
Return Type Description any Returns the removed element.
-
<LinkedList> removeLast() returns (any)
Removes the last element in the linked list without moving the cursor.
Return Type Description any Returns the removed element.
-
<LinkedList> insertBeforeCurrent(any data)
Insert a new element before the current cursor location.
Parameter Name Data Type Default Value Description data any Data to be inserted.
-
<LinkedList> dequeue() returns (any)
Returns the first element which is added to the linked list.
Return Type Description any The dequeued element.
-
<LinkedList> asArray() returns (any[])
Creates an array from the elements in the linked list and return it. The cursor will not be changed.
Return Type Description any[] An array of elements in the linked list.
-
<LinkedList> addAll(any[] data)
Adds elements of an array to the current cursor location and moves the cursor to the end of the list.
Parameter Name Data Type Default Value Description data any[]
public type Max object
Aggregator to find the maximum value in a stream.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| iMaxQueue | streams:LinkedList | BLangTypeInit: new null ([]) | |
| fMaxQueue | streams:LinkedList | BLangTypeInit: new null ([]) | |
| iMax | int? | () | |
| fMax | float? | () |
-
<Max> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current maximum value and return the updated maximum value.
Parameter Name Data Type Default Value Description value anydata Value being checked whether it is greater than the current maximum value.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT,streams:EXPIREDorstreams:RESET.Return Type Description anydata Updated max value.
-
<Max> copy() returns (Aggregator)
Returns a copy of the
Maxaggregator without its current state.Return Type Description Aggregator Returns
Maxaggregator. -
<Max> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<Max> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type MaxForever object
The aggregator to keep the maximum value received so far. It is similar to Max aggregator, but it keeps the maximum
value it received so far, forever.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| iMax | int? | () | |
| fMax | float? | () |
-
<MaxForever> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current maximum value and return the updated maximum value.
Parameter Name Data Type Default Value Description value anydata Value being checked whether it is greater than the current maximum value.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT,streams:EXPIREDorstreams:RESET.Return Type Description anydata Updated maximum value.
-
<MaxForever> copy() returns (Aggregator)
Returns a copy of the
MaxForeveraggregator.Return Type Description Aggregator A
Aggregatorobject which representsMaxForeveraggregator. -
<MaxForever> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<MaxForever> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type MergeSort object
This object implements the merge sort algorithm to sort the provided value arrays. fieldFuncs are function pointers
which returns the field values of each stream event's data map's values. sortTypes are an array of (
streams:ASCENDING or streams:DESCENDING).
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| fieldFuncs | function (map |
||
| sortTypes | string[] |
-
<MergeSort> __init(function (map<anydata>) returns (anydata)?[] fieldFuncs, string[] sortTypes)
Parameter Name Data Type Default Value Description fieldFuncs function (map ) returns (anydata)?[] sortTypes string[] -
<MergeSort> topDownMergeSort(streams:StreamEvent?[] events)
Sorts the given stream events using the merge sort algorithm.
Parameter Name Data Type Default Value Description events streams:StreamEvent?[] The array of stream events to be sorted.
public type Min object
Aggregator to find the minimum value in a stream.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| iMinQueue | streams:LinkedList | BLangTypeInit: new null ([]) | |
| fMinQueue | streams:LinkedList | BLangTypeInit: new null ([]) | |
| iMin | int? | () | |
| fMin | float? | () |
-
<Min> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current minimum value and return the updated minimum value.
Parameter Name Data Type Default Value Description value anydata Value being checked whether it is lesser than the current minimum value.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT,streams:EXPIREDorstreams:RESET.Return Type Description anydata Updated minimum value.
-
<Min> copy() returns (Aggregator)
Returns a copy of the
Minaggregator.Return Type Description Aggregator A
Aggregatorobject which representsMinaggregator. -
<Min> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<Min> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type MinForever object
The aggregator to keep the minimum value received so far. It is similar to Min aggregator, but it keeps the minimum
value it received so far, forever.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| iMin | int? | () | |
| fMin | float? | () |
-
<MinForever> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current minimum value and return the updated minimum value.
Parameter Name Data Type Default Value Description value anydata Value being checked whether it is lesser than the current minimum value.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT,streams:EXPIREDorstreams:RESET.Return Type Description anydata Updated minimum value.
-
<MinForever> copy() returns (Aggregator)
Returns a copy of the
MinForeveraggregator.Return Type Description Aggregator A
Aggregatorobject which representsMinForeveraggregator. -
<MinForever> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<MinForever> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type Node object
The Node object represents a node in the linkedlist data structure.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| data | any | ||
| next | streams:Node? | ||
| prev | streams:Node? |
-
<Node> __init(streams:Node? prev, any data, streams:Node? next)
Parameter Name Data Type Default Value Description prev streams:Node? data any next streams:Node?
public type OrderBy object
The OrderBy object represents the desugared code of order by clause of a streaming query. This object takes 3
parameters to initialize itself. nextProcessPointer is the process method of the next processor. fieldFuncs
is an array of function pointers which returns the field values to be sorted. sortTypes is an array of string
specifying whether the sort order (ascending or descending). Internally this processor uses a MergeSort object
to sort.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| nextProcessorPointer | function (streams:StreamEvent?[]) returns () | ||
| fieldFuncs | function (map |
||
| sortTypes | string[] | ||
| mergeSort | streams:MergeSort |
-
<OrderBy> __init(function (streams:StreamEvent?[]) returns () nextProcessorPointer, function (map<anydata>) returns (anydata)?[] fieldFuncs, string[] sortTypes)
Parameter Name Data Type Default Value Description nextProcessorPointer function (streams:StreamEvent?[]) returns () fieldFuncs function (map ) returns (anydata)?[] sortTypes string[] -
<OrderBy> process(streams:StreamEvent?[] streamEvents)
Sorts the given array of stream events according to the given parameters (fieldFuncs and sortTypes).
Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be sorted.
public type OutputProcess object
The OutputProcess object is responsible for sending the output (only the events of type streams:CURRENT to the
destination stream. It takes a function pointer outputFunc which actually has the logic to process the output.
-
<OutputProcess> __init(function (map<anydata>[]) returns () outputFunc)
Parameter Name Data Type Default Value Description outputFunc function (map []) returns () -
<OutputProcess> process(streams:StreamEvent?[] streamEvents)
Sends the output to the streaming action. most of the time the output is published to a destination stream at the streaming action. Only the events with type
streams:CURRENTare passed to theoutputFunc.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be filtered out for CURRENT events.
public type Scheduler object
The Scheduler object is responsible for generating streams:TIMER events at the given timestamp. Once the event is
generated, the timer event is passed to the provided processFunc function pointer. The function pointer is the
process function of the target processor, to which the timer event should be sent.
-
<Scheduler> __init(function (streams:StreamEvent?[]) returns () processFunc)
Parameter Name Data Type Default Value Description processFunc function (streams:StreamEvent?[]) returns () -
<Scheduler> notifyAt(int timestamp)
Schedule to send a timer events at the given timestamp.
Parameter Name Data Type Default Value Description timestamp int The timestamp at which the timer event will be generated and passed to the provided
processFunc. -
<Scheduler> sendTimerEvents() returns (error?<>)
Creates the timer events.
Return Type Description error?<> Returns error if sending timer events failed.
public type Select object
The Select object represents the select clause. Anything that comes under select clause like aggregator function
invocations are also handled in this processor. Further, grouping of the events (provided by the groupby clause) is
also performed in this processor. aggregatorArr is an array of aggregators which are used in the select clause.
groupbyFuncArray is an array of function pointers which returns the values being grouped. selectFunc is a
function pointer to a lambda function which creates the data field of the output stream event. scopeName is
used as a breadcrumb to identify the select clause if there are multiple forever blocks.
-
<Select> process(streams:StreamEvent?[] streamEvents)
Selects only the selected fields in the select clause.
Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events passed to the select clause.
-
<Select> getGroupByKey(function (streams:StreamEvent) returns (anydata)?[]? groupbyFunctionArray, streams:StreamEvent e) returns (string)
Creates a unique key for each group with the given values in the group by clause.
Parameter Name Data Type Default Value Description groupbyFunctionArray function (streams:StreamEvent) returns (anydata)?[]? Function pointer array to the lambda function which returns each group by field.
e streams:StreamEvent Stream Event object being grouped.
Return Type Description string Returns a unique groupby key by which the event
eis grouped.
public type Snapshotable object
Abstract Snapshotable to be referenced by all snapshotable objects.
-
<Snapshotable> saveState() returns (map<any>)
Function to return the current state of a snapshotable object.
Return Type Description map A
map<any>that represents the current state of the snapshotable instance. -
<Snapshotable> restoreState(map state)
Function to restore a previous state intoa a snapshotable object.
Parameter Name Data Type Default Value Description state map A
map<any>state that can be used to restore snapshotable instance to a previous state.
public type SortWindow object
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| lengthToKeep | int | ||
| windowParameters | any[] | ||
| sortedWindow | streams:LinkedList | ||
| fields | string[] | ||
| sortTypes | string[] | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | ||
| fieldFuncs | function (map |
||
| mergeSort | streams:MergeSort |
-
<SortWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<SortWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<SortWindow> process(streams:StreamEvent?[] streamEvents)
The
processfunction process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<SortWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent,StreamEvent)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<SortWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<SortWindow> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type StdDev object
The aggregator object to calculate standard deviation
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| mean | float | 0.0 | |
| stdDeviation | float | 0.0 | |
| sumValue | float | 0.0 | |
| count | int | 0 |
-
<StdDev> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current standard deviation as the new values come into the aggregation.
Parameter Name Data Type Default Value Description value anydata Value being added or removed from aggregation in order to calculate the new standard deviation.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT,streams:EXPIREDorstreams:RESET.Return Type Description anydata Updated standard deviation.
-
<StdDev> copy() returns (Aggregator)
Returns a copy of the
StdDevaggregator.Return Type Description Aggregator A
Aggregatorobject which representsStdDevaggregator. -
<StdDev> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<StdDev> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type StreamEvent object
The StreamEvent object is a wrapper around the actual data being received to the input stream. If a record is
receive to a input stream, that record is converted to a map of anydata values and set that map to a field called
data in a new StreamEvent object. StreamEvent is only used internally to transmit event data from one
processor to another processor. At the time the record is converted to a map, the timestamp is set. If the record
is first received by the input stream, the eventType is set to streams:CURRENT. Other than stream events of type
streams:CURRENT, there are 3 types of StreamEvents. They are streams:EXPIRED, streams:RESET, streams:TIMER. An expired
event is used to remove the state of its respective current event. A reset event is used to completely wipe the
state and a timer event is used to trigger the process method of a particular processor in timely manner.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| eventType | CURRENT|EXPIRED|ALL|RESET|TIMER | ||
| timestamp | int | ||
| data | map |
{} |
-
<StreamEvent> __init((string,map<anydata>)|map<anydata> eventData, CURRENT|EXPIRED|ALL|RESET|TIMER eventType, int timestamp)
Parameter Name Data Type Default Value Description eventData (string,map )|map eventType CURRENT|EXPIRED|ALL|RESET|TIMER timestamp int -
<StreamEvent> copy() returns (StreamEvent)
Returns a copy of the stream event instance.
Return Type Description StreamEvent A copy of the
StreamEventobject with its state. -
<StreamEvent> addData(map<anydata> eventData)
Adds key values pairs in a given map to the field
data.Parameter Name Data Type Default Value Description eventData map map of anydata values to be added to field
data. -
<StreamEvent> addAttribute(string key, anydata val)
Adds an attribute of an event to the map with its value.
Parameter Name Data Type Default Value Description key string The key of the map entry.
val anydata Respective value of the
key.
public type StreamJoinProcessor object
The StreamJoinProcessor object is responsible for performing SQLish joins between two or more streams.
The onConditionFunc is the lambda function which represents the where clause in the join clause. The joining
happens only if the condition is statified. nextProcessor is the process function of the next processor, which
can be a Select processor, Aggregator processor, Having processor.. etc. The lhsStream is the left hand side
stream of the join and its attached window is 'lhsWindow. The rhsStream is the right hand side stream of the join
and its attached window is 'rhsWindow. The unidirectionalStream stream defines the stream by which the joining is
triggered when the events are received. Usually it is lhsStream, in rare cases it can be rhsStream. The
joinType is the type of the join and it can be any value defined by streams:JoinType.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| lhsWindow | streams:Window? | ||
| rhsWindow | streams:Window? | ||
| lhsStream | string? | ||
| rhsStream | string? | ||
| unidirectionalStream | string? | ||
| joinType | JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN | ||
| lockField | int | 0 |
-
<StreamJoinProcessor> __init(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (map<anydata>,map<anydata>) returns (boolean)? onConditionFunc)
Parameter Name Data Type Default Value Description nextProcessor function (streams:StreamEvent?[]) returns () joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN onConditionFunc function (map ,map ) returns (boolean)? -
<StreamJoinProcessor> process(streams:StreamEvent?[] streamEvents)
Process the events from both
lhsStreamand therhsStreamand perform the joining.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] Stream events being joined.
-
<StreamJoinProcessor> setLHS(string streamName, streams:Window windowInstance)
Sets the left hand side stream name and the respective window instance.
Parameter Name Data Type Default Value Description streamName string The name of the left hand side stream.
windowInstance streams:Window The window attached to the left hand side stream.
-
<StreamJoinProcessor> setRHS(string streamName, streams:Window windowInstance)
Sets the right hand side stream name and the respective window instance.
Parameter Name Data Type Default Value Description streamName string The name of the right hand side stream.
windowInstance streams:Window The window attached to the right hand side stream.
-
<StreamJoinProcessor> setUnidirectionalStream(string streamName)
Sets the stream by which the joining is triggered.
Parameter Name Data Type Default Value Description streamName string The name of the stream. In most cases, the joining is triggered when the events are received by the left hand side stream even if the right hand side stream receives the events before the left hand side stream receives events.
public type Sum object
Aggregator to perform summation of values in a stream.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| iSum | int | 0 | |
| fSum | float | 0.0 |
-
<Sum> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)
Updates the current sum of numeric values based on the
eventType. If theeventTypeisstreams:CURRENT,valueis added to the current sum. If theeventTypeisstreams:EXPIRED,valueis subtracted from the current sum. If theeventTypeisstreams:RESET, Current summation will be reset, regardless thevalue.Parameter Name Data Type Default Value Description value anydata The numeric value being aggregated to the current sum.
eventType CURRENT|EXPIRED|ALL|RESET|TIMER Type of the incoming event
streams:CURRENT,streams:EXPIREDorstreams:RESET.Return Type Description anydata aggregated summation after the given
value. -
<Sum> copy() returns (Aggregator)
Returns a copy of the
Sumaggregator without its current state.Return Type Description Aggregator Returns
Sumaggregator. -
<Sum> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<Sum> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type TableJoinProcessor object
The TableJoinProcessor object handles joining streams with in-memory tables in ballerina.
nextProcessor is the process function of the next processor, which can be a Select processor, Aggregator
processor, Having processor.. etc. The streamName is the stream of the join and its attached
window is 'windowInstance. The tableName is the name of the table with which the stream is joined. The
joinType is the type of the join and it can be any value defined by streams:JoinType.
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| windowInstance | streams:Window? | ||
| streamName | string | ||
| tableName | string | ||
| joinType | JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN | ||
| lockField | int | 0 |
-
<TableJoinProcessor> __init(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (streams:StreamEvent) returns (map<anydata>[]) tableQuery)
Parameter Name Data Type Default Value Description nextProcessor function (streams:StreamEvent?[]) returns () joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN tableQuery function (streams:StreamEvent) returns (map []) -
<TableJoinProcessor> process(streams:StreamEvent?[] streamEvents)
Joins the incoming events to the stream with the given table.
Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The stream events being joined with the table.
-
<TableJoinProcessor> setJoinProperties(string tn, string sn, streams:Window wi)
Set the properties required for joining.
Parameter Name Data Type Default Value Description tn string The name of the table being joined with the stream.
sn string The name of the stream being joined with the table.
wi streams:Window The window instance which is attached to the stream.
public type TimeAccumulatingWindow object
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| timeInMillis | int | ||
| windowParameters | any[] | ||
| currentEventQueue | streams:LinkedList | ||
| resetEvent | streams:StreamEvent? | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | ||
| lastTimestamp | int | -1 | |
| scheduler | streams:Scheduler |
-
<TimeAccumulatingWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<TimeAccumulatingWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<TimeAccumulatingWindow> process(streams:StreamEvent?[] streamEvents)
The
processfunction process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<TimeAccumulatingWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent,StreamEvent)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<TimeAccumulatingWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<TimeAccumulatingWindow> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type TimeBatchWindow object
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| timeInMilliSeconds | int | ||
| windowParameters | any[] | ||
| nextEmitTime | int | -1 | |
| currentEventQueue | streams:LinkedList | ||
| resetEvent | streams:StreamEvent? | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | ||
| scheduler | streams:Scheduler |
-
<TimeBatchWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<TimeBatchWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<TimeBatchWindow> process(streams:StreamEvent?[] streamEvents)
The
processfunction process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<TimeBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent,StreamEvent)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<TimeBatchWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<TimeBatchWindow> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type TimeLengthWindow object
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| timeInMilliSeconds | int | ||
| length | int | ||
| windowParameters | any[] | ||
| count | int | 0 | |
| expiredEventChunk | streams:LinkedList | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | ||
| scheduler | streams:Scheduler |
-
<TimeLengthWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<TimeLengthWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<TimeLengthWindow> process(streams:StreamEvent?[] streamEvents)
The
processfunction process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<TimeLengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent,StreamEvent)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<TimeLengthWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<TimeLengthWindow> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type TimeOrderWindow object
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| timeInMillis | int | ||
| windowParameters | any[] | ||
| expiredEventQueue | streams:LinkedList | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | ||
| timestamp | string | ||
| lastTimestamp | int | ||
| dropOlderEvents | boolean | ||
| mergeSort | streams:MergeSort | ||
| scheduler | streams:Scheduler |
-
<TimeOrderWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<TimeOrderWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<TimeOrderWindow> process(streams:StreamEvent?[] streamEvents)
The
processfunction process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<TimeOrderWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent,StreamEvent)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<TimeOrderWindow> getTimestamp(any val) returns (int)
Parameter Name Data Type Default Value Description val any Return Type Description int -
<TimeOrderWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<TimeOrderWindow> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type TimeWindow object
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| timeInMillis | int | ||
| windowParameters | any[] | ||
| expiredEventQueue | streams:LinkedList | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? | ||
| lastTimestamp | int | -9223372036854775808 | |
| scheduler | streams:Scheduler |
-
<TimeWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<TimeWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<TimeWindow> process(streams:StreamEvent?[] streamEvents)
The
processfunction process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<TimeWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent,StreamEvent)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<TimeWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<TimeWindow> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type UniqueLengthWindow object
This is a length window which only keeps the unique events.
E.g.
from inputStream window uniqueLength(inputStream.timestamp, 4000)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
The uniqueLength window should only have two parameters (stream field,
| Field Name | Data Type | Default Value | Description |
|---|---|---|---|
| uniqueKey | string | ||
| length | int | ||
| windowParameters | any[] | ||
| count | int | 0 | |
| uniqueMap | map |
||
| expiredEventChunk | streams:LinkedList | ||
| nextProcessPointer | function (streams:StreamEvent?[]) returns ()? |
-
<UniqueLengthWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)
Parameter Name Data Type Default Value Description nextProcessPointer function (streams:StreamEvent?[]) returns ()? windowParameters any[] -
<UniqueLengthWindow> initParameters(any[] parameters)
Parameter Name Data Type Default Value Description parameters any[] -
<UniqueLengthWindow> process(streams:StreamEvent?[] streamEvents)
The
processfunction process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<UniqueLengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent,StreamEvent)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.
-
<UniqueLengthWindow> saveState() returns (map<any>)
Return current state to be saved as a map of
anytyped values.Return Type Description map A map of
anytyped values. -
<UniqueLengthWindow> restoreState(map state)
Restores the saved state which is passed as a map of
anytyped values.Parameter Name Data Type Default Value Description state map A map of typed
anyvalues. This map contains the values to be restored from the persisted data.
public type Window object
The Window abstract objects is the base object for implementing windows in Ballerina streams. The process
function contains the logic of processing events when events are received. getCandidateEvents function is used
inside the Select object to return the events in the window to perform joining.
The window names in the window objects cannot be used in the queries. Always a function which returns the specific
window has to be used in streaing query.
E.g. If LengthWindow has to be used in a streaming query, the function streams:length has to be used for
streaming query without the module identifier streams. An example is shown below.
from inputStream window length(5)
select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count
group by inputStream.name => (TeacherOutput [] teachers) {
foreach var t in teachers {
outputStream.publish(t);
}
}
-
<Window> process(streams:StreamEvent?[] streamEvents)
The
processfunction process the incoming events to the events and update the current state of the window.Parameter Name Data Type Default Value Description streamEvents streams:StreamEvent?[] The array of stream events to be processed.
-
<Window> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])
Returns the events(State) which match with the where condition in the join clause for a given event.
Parameter Name Data Type Default Value Description originEvent streams:StreamEvent The event against which the state or the events being held by the window is matched.
conditionFunc function (map ,map ) returns (boolean)? The function pointer to the lambda function which contain the condition logic in where clause.
isLHSTrigger boolean true Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.
Return Type Description (StreamEvent,StreamEvent)[] Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.