ballerina/streams module

Type Definitions

Type Values Description
EventType TIMER | RESET | EXPIRED | CURRENT | ALL

The type of stream events.

JoinType RIGHTOUTERJOIN | LEFTOUTERJOIN | JOIN | FULLOUTERJOIN

The types of joins between streams and tables.

Records Summary

Record Description
SnapshottableStreamEvent This record represents a stream event which can be persisted.

Objects Summary

Object Description
Aggregator

Abstract object, which should be implemented in order to create a new aggregator.

Average

Aggregator to calculate average in streams.

Count

Aggregator to count events in streams.

DelayWindow
DistinctCount

Aggregator to get the distinct counts of values in streams.

ExternalTimeBatchWindow
ExternalTimeWindow

This is a sliding time window based on external time, that holds events for that arrived during last window time period from the external timestamp, and gets updated on every monotonically increasing timestamp. E.g. from inputStream window externalTime(inputStream.timestamp, 4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The externalTime window should only have two parameters (timestamp field, windowTime)

Filter

The Filter object represents the where clause in a streaming query. This object takes two parameter for initialization. nextProcessorPointer is the function pointer of the next processor to be invoked once the filtering is complete. conditionFunc is a function pointer which return true if the given where clause evaluates to true.

HoppingWindow
IntSort

This class implements a merge sort algorithm to sort timestamp values for state persistence.

LengthBatchWindow

This is a batch (tumbling) length window, that holds up to the given length of events, and gets updated on every given number of events arrival. E.g. from inputStream window lengthBatch(5) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The lengthBatch window should only have one parameter ( windowBatchLength)

LengthWindow

The LengthWindow is a sliding length window, that holds last windowLength events, and gets updated on every event arrival and expiry. E.g. from inputStream window length(5) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The length window should only have one parameter ( windowLength)

LinkedList

The LinkedList object which represents the linked list data structure.

Max

Aggregator to find the maximum value in a stream.

MaxForever

The aggregator to keep the maximum value received so far. It is similar to Max aggregator, but it keeps the maximum value it received so far, forever.

MergeSort

This object implements the merge sort algorithm to sort the provided value arrays. fieldFuncs are function pointers which returns the field values of each stream event's data map's values. sortTypes are an array of ( streams:ASCENDING or streams:DESCENDING).

Min

Aggregator to find the minimum value in a stream.

MinForever

The aggregator to keep the minimum value received so far. It is similar to Min aggregator, but it keeps the minimum value it received so far, forever.

Node

The Node object represents a node in the linkedlist data structure.

OrderBy

The OrderBy object represents the desugared code of order by clause of a streaming query. This object takes 3 parameters to initialize itself. nextProcessPointer is the process method of the next processor. fieldFuncs is an array of function pointers which returns the field values to be sorted. sortTypes is an array of string specifying whether the sort order (ascending or descending). Internally this processor uses a MergeSort object to sort.

OutputProcess

The OutputProcess object is responsible for sending the output (only the events of type streams:CURRENT to the destination stream. It takes a function pointer outputFunc which actually has the logic to process the output.

Scheduler

The Scheduler object is responsible for generating streams:TIMER events at the given timestamp. Once the event is generated, the timer event is passed to the provided processFunc function pointer. The function pointer is the process function of the target processor, to which the timer event should be sent.

Select

The Select object represents the select clause. Anything that comes under select clause like aggregator function invocations are also handled in this processor. Further, grouping of the events (provided by the groupby clause) is also performed in this processor. aggregatorArr is an array of aggregators which are used in the select clause. groupbyFuncArray is an array of function pointers which returns the values being grouped. selectFunc is a function pointer to a lambda function which creates the data field of the output stream event. scopeName is used as a breadcrumb to identify the select clause if there are multiple forever blocks.

Snapshotable

Abstract Snapshotable to be referenced by all snapshotable objects.

SortWindow
StdDev

The aggregator object to calculate standard deviation

StreamEvent

The StreamEvent object is a wrapper around the actual data being received to the input stream. If a record is receive to a input stream, that record is converted to a map of anydata values and set that map to a field called data in a new StreamEvent object. StreamEvent is only used internally to transmit event data from one processor to another processor. At the time the record is converted to a map, the timestamp is set. If the record is first received by the input stream, the eventType is set to streams:CURRENT. Other than stream events of type streams:CURRENT, there are 3 types of StreamEvents. They are streams:EXPIRED, streams:RESET, streams:TIMER. An expired event is used to remove the state of its respective current event. A reset event is used to completely wipe the state and a timer event is used to trigger the process method of a particular processor in timely manner.

StreamJoinProcessor

The StreamJoinProcessor object is responsible for performing SQLish joins between two or more streams. The onConditionFunc is the lambda function which represents the where clause in the join clause. The joining happens only if the condition is statified. nextProcessor is the process function of the next processor, which can be a Select processor, Aggregator processor, Having processor.. etc. The lhsStream is the left hand side stream of the join and its attached window is 'lhsWindow. The rhsStream is the right hand side stream of the join and its attached window is 'rhsWindow. The unidirectionalStream stream defines the stream by which the joining is triggered when the events are received. Usually it is lhsStream, in rare cases it can be rhsStream. The joinType is the type of the join and it can be any value defined by streams:JoinType.

Sum

Aggregator to perform summation of values in a stream.

TableJoinProcessor

The TableJoinProcessor object handles joining streams with in-memory tables in ballerina. nextProcessor is the process function of the next processor, which can be a Select processor, Aggregator processor, Having processor.. etc. The streamName is the stream of the join and its attached window is 'windowInstance. The tableName is the name of the table with which the stream is joined. The joinType is the type of the join and it can be any value defined by streams:JoinType.

TimeAccumulatingWindow
TimeBatchWindow
TimeLengthWindow
TimeOrderWindow
TimeWindow
UniqueLengthWindow

This is a length window which only keeps the unique events. E.g. from inputStream window uniqueLength(inputStream.timestamp, 4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The uniqueLength window should only have two parameters (stream field, windowLength)

Window

The Window abstract objects is the base object for implementing windows in Ballerina streams. The process function contains the logic of processing events when events are received. getCandidateEvents function is used inside the Select object to return the events in the window to perform joining. The window names in the window objects cannot be used in the queries. Always a function which returns the specific window has to be used in streaing query. E.g. If LengthWindow has to be used in a streaming query, the function streams:length has to be used for streaming query without the module identifier streams. An example is shown below.

from inputStream window length(5) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } }

Functions Summary

Return Type Function and Description
Aggregator avg()

Returns a Average aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

StreamEvent[] buildStreamEvent(any t, string streamName)

Creates streams:StreamEvent object array for a record t received by the stream denoted by the name streamNme.

Aggregator count()

Returns a Count aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Filter createFilter(function (streams:StreamEvent?[]) returns () nextProcPointer, function (map<anydata>) returns (boolean) conditionFunc)

Creates a Filter object and return it.

OrderBy createOrderBy(function (streams:StreamEvent?[]) returns () nextProcessorPointer, function (map<anydata>) returns (anydata)?[] fields, string[] sortFieldMetadata)

Creates an OrderBy object and return it.

OutputProcess createOutputProcess(function (map<anydata>[]) returns () outputFunc)

Creates and return a OutputProcess object.

StreamEvent createResetStreamEvent(streams:StreamEvent event)

Creates a RESET event from a given event.

Select createSelect(function (streams:StreamEvent?[]) returns () nextProcPointer, streams:Aggregator[] aggregatorArr, function (streams:StreamEvent) returns (anydata)?[]? groupbyFuncArray, function (streams:StreamEvent,streams:Aggregator[]) returns (map<anydata>) selectFunc, string scopeName)

Creates and returns a select clause.

StreamJoinProcessor createStreamJoinProcessor(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc)

Creates a StreamJoinProcessor and returns it.

TableJoinProcessor createTableJoinProcessor(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (streams:StreamEvent) returns (map<anydata>[]) tableQuery)

Creates a TableJoinProcessor and return it.

Window delay(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer)

The delay function creates a DelayWindow object and returns it.

Aggregator distinctCount()

Returns a DistinctCount aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Window externalTime(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer)

The externalTime function creates a ExternalTimeWindow object and returns it.

Window externalTimeBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer)

The externalTimeBatch function creates a ExternalTimeBatchWindow object and returns it.

StreamEvent getStreamEvent(any anyEvent)

Get the stream event from any? field. This function can only be used only if we are sure that the anyEvent is a streams:StreamEvent.

Window hopping(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer)

The hopping function creates a HoppingWindow object and returns it.

initPersistence()

Function to initialize and start snapshotting.

Window length(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer)

The length function creates a LengthWindow object and returns it.

Window lengthBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer)

The lengthBatch function creates a LengthBatchWindow object and returns it.

Aggregator max()

Returns a Max aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Aggregator maxForever()

Returns a MaxForever aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Aggregator min()

Returns a Min aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Aggregator minForever()

Returns a MinForever aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

registerSnapshotable(string key, any reference)

Function to register Snapshotables.

boolean removeState(string key)

Function to clear an existing state.

restoreState(string key, any reference)

Function to restore state of a given object.

Window sort(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer)

The sort function creates a SortWindow object and returns it.

Aggregator stdDev()

Returns a StdDev aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Aggregator sum()

Returns a Sum aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Window time(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer)

The time function creates a TimeWindow object and returns it.

Window timeAccum(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer)

The timeAccum function creates a TimeAccumulatingWindow object and returns it.

Window timeBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer)

The timeBatch function creates a TimeBatchWindow object and returns it.

Window timeLength(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer)

The timeLength function creates a TimeLengthWindow object and returns it.

Window timeOrder(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer)

The timeOrder function creates a TimeOrderWindow object and returns it.

SnapshottableStreamEvent toSnapshottableEvent(streams:StreamEvent event)

Convert a single streams:StreamEvent object to streams:SnapshottableStreamEvent object.

SnapshottableStreamEvent[] toSnapshottableEvents(streams:StreamEvent[]|any[]? events)

Converts a given array of streams:StreamEvent objects to an array of streams:SnapshottableStreamEvent.

StreamEvent toStreamEvent(streams:SnapshottableStreamEvent event)

Convert a single streams:SnapshottableStreamEvent object to streams:StreamEvent object.

StreamEvent[] toStreamEvents(streams:SnapshottableStreamEvent[]|any[]? events)

Converts a given array of snapshotable events to an array of streams:StreamEvent objects.

Window uniqueLength(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer)

The uniqueLength function creates a UniqueLengthWindow object and returns it.

Constants

Name Data Type Value Description
OUTPUT OUTPUT
RESET RESET

The reset event type

EXPIRED EXPIRED

The expired event type

CURRENT CURRENT

The current event type.

TIMER TIMER

The timer event type.

DEFAULT DEFAULT

The default key to group by if the group by clause is not used in query

DELIMITER .
DELIMITER_REGEX \.
ASCENDING ascending
DESCENDING descending

public type SnapshottableStreamEvent record

This record represents a stream event which can be persisted.

Field Name Data Type Default Value Description
eventType CURRENT|EXPIRED|ALL|RESET|TIMER CURRENT
timestamp int 0
data map {}

public function avg() returns (Aggregator)

Returns a Average aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator object which performs averaging.

public function buildStreamEvent(any t, string streamName) returns (StreamEvent[])

Creates streams:StreamEvent object array for a record t received by the stream denoted by the name streamNme.

Parameter Name Data Type Default Value Description
t any

A record received by the stream streamName.

streamName string

Name of the stream to which the record t is received.

Return Type Description
StreamEvent[]

Returns an array of streams:StreamEvents|()

public function count() returns (Aggregator)

Returns a Count aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator object which performs counting.

public function createFilter(function (streams:StreamEvent?[]) returns () nextProcPointer, function (map<anydata>) returns (boolean) conditionFunc) returns (Filter)

Creates a Filter object and return it.

Parameter Name Data Type Default Value Description
nextProcPointer function (streams:StreamEvent?[]) returns ()

The function pointer to the process function of the next processor.

conditionFunc function (map) returns (boolean)

The function pointer to the condition evaluator. This is a function which returns true or false based on the boolean expression given in the where clause.

Return Type Description
Filter

Returns a Filter object.

public function createOrderBy(function (streams:StreamEvent?[]) returns () nextProcessorPointer, function (map<anydata>) returns (anydata)?[] fields, string[] sortFieldMetadata) returns (OrderBy)

Creates an OrderBy object and return it.

Parameter Name Data Type Default Value Description
nextProcessorPointer function (streams:StreamEvent?[]) returns ()
fields function (map) returns (anydata)?[]

An array of function pointers which each returns a field by which the events are sorted. Events are sorted by the first field, if there are elements of same value, the second field is used and so on.

sortFieldMetadata string[]

sortTypes of the fields (streams:ASCENDING or streams:DESCENDING). First element is the sort type of the first element of fields and so on.

Return Type Description
OrderBy

Returns a OrderBy object.

public function createOutputProcess(function (map<anydata>[]) returns () outputFunc) returns (OutputProcess)

Creates and return a OutputProcess object.

Parameter Name Data Type Default Value Description
outputFunc function (map[]) returns ()

The function pointer to a lambda function created out of the statements in the streaming action

Return Type Description
OutputProcess

Returns a OutputProcess object.

public function createResetStreamEvent(streams:StreamEvent event) returns (StreamEvent)

Creates a RESET event from a given event.

Parameter Name Data Type Default Value Description
event streams:StreamEvent

The event from which the reset event is created.

Return Type Description
StreamEvent

A stream event of type streams:RESET.

public function createSelect(function (streams:StreamEvent?[]) returns () nextProcPointer, streams:Aggregator[] aggregatorArr, function (streams:StreamEvent) returns (anydata)?[]? groupbyFuncArray, function (streams:StreamEvent,streams:Aggregator[]) returns (map<anydata>) selectFunc, string scopeName) returns (Select)

Creates and returns a select clause.

Parameter Name Data Type Default Value Description
nextProcPointer function (streams:StreamEvent?[]) returns ()

The function pointer to the process function of the next processor.

aggregatorArr streams:Aggregator[]

The array of aggregators used in the select clause. If the same aggregator is used twice, the aggregatorArr will contains that specific aggregator twice in the order they appear in the select clause.

groupbyFuncArray function (streams:StreamEvent) returns (anydata)?[]?

The array of function pointer which contains the lambda function which returns the expressions in the group by clause.

selectFunc function (streams:StreamEvent,streams:Aggregator[]) returns (map)

The function pointer to a lambda function that creates the data field of the output stream event.

scopeName string $scope$name

A unique id to identify the forever block if there are multiple forever blocks.

Return Type Description
Select

Returns a Select object.

public function createStreamJoinProcessor(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc) returns (StreamJoinProcessor)

Creates a StreamJoinProcessor and returns it.

Parameter Name Data Type Default Value Description
nextProcessor function (streams:StreamEvent?[]) returns ()

The process function of the next processor, which can be a Select processor, Aggregator processor, Having processor.. etc.

joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN

Type of the join being performed ("JOIN"|"LEFTOUTERJOIN"|"RIGHTOUTERJOIN"|"FULLOUTERJOIN")

conditionFunc function (map,map) returns (boolean)? ()
Return Type Description
StreamJoinProcessor

Returns a StreamJoinProcessor object.

public function createTableJoinProcessor(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (streams:StreamEvent) returns (map<anydata>[]) tableQuery) returns (TableJoinProcessor)

Creates a TableJoinProcessor and return it.

Parameter Name Data Type Default Value Description
nextProcessor function (streams:StreamEvent?[]) returns ()
joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN

The type of the join and it can be any value defined by streams:JoinType.

tableQuery function (streams:StreamEvent) returns (map[])

The function pointer to a function which retrieves the records from the table and joins w ith each stream event.

Return Type Description
TableJoinProcessor

Returns a TableJoinProcessor object.

public function delay(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The delay function creates a DelayWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function distinctCount() returns (Aggregator)

Returns a DistinctCount aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator object which represents DistinctCount.

public function externalTime(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The externalTime function creates a ExternalTimeWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function externalTimeBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The externalTimeBatch function creates a ExternalTimeBatchWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function getStreamEvent(any anyEvent) returns (StreamEvent)

Get the stream event from any? field. This function can only be used only if we are sure that the anyEvent is a streams:StreamEvent.

Parameter Name Data Type Default Value Description
anyEvent any

The object from which, the stream event is extracted.

Return Type Description
StreamEvent

Returns the extracted streams:StreamEvent object.

public function hopping(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The hopping function creates a HoppingWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function initPersistence()

Function to initialize and start snapshotting.

public function length(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The length function creates a LengthWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function lengthBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The lengthBatch function creates a LengthBatchWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function max() returns (Aggregator)

Returns a Max aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator which represents Max.

public function maxForever() returns (Aggregator)

Returns a MaxForever aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator which represents MaxForever.

public function min() returns (Aggregator)

Returns a Min aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator which represents Min.

public function minForever() returns (Aggregator)

Returns a MinForever aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator which represents MainForever.

public function registerSnapshotable(string key, any reference)

Function to register Snapshotables.

Parameter Name Data Type Default Value Description
key string

An unique string identifier for the snapshotable reference.

reference any

The snapshotable reference to be registered.

public function removeState(string key) returns (boolean)

Function to clear an existing state.

Parameter Name Data Type Default Value Description
key string

An unique string identifier for the snapshotable reference.

Return Type Description
boolean

A boolean indicating whether the state for the given key removed successfully.

public function restoreState(string key, any reference)

Function to restore state of a given object.

Parameter Name Data Type Default Value Description
key string

An unique string identifier for the snapshotable reference.

reference any

The snapshotable reference to be restored.

public function sort(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The sort function creates a SortWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function stdDev() returns (Aggregator)

Returns a StdDev aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator which represents StdDev.

public function sum() returns (Aggregator)

Returns a Sum aggregator object. The aggregator function name which is used in a streaming query should have the same name as this function's name.

Return Type Description
Aggregator

A Aggregator which perform addition/summation.

public function time(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The time function creates a TimeWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function timeAccum(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The timeAccum function creates a TimeAccumulatingWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function timeBatch(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The timeBatch function creates a TimeBatchWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function timeLength(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The timeLength function creates a TimeLengthWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function timeOrder(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The timeOrder function creates a TimeOrderWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public function toSnapshottableEvent(streams:StreamEvent event) returns (SnapshottableStreamEvent)

Convert a single streams:StreamEvent object to streams:SnapshottableStreamEvent object.

Parameter Name Data Type Default Value Description
event streams:StreamEvent

The streams:StreamEvent object to be converted to snapshotable event.

Return Type Description
SnapshottableStreamEvent

The converted streams:SnapshottableStreamEvent object.

public function toSnapshottableEvents(streams:StreamEvent[]|any[]? events) returns (SnapshottableStreamEvent[])

Converts a given array of streams:StreamEvent objects to an array of streams:SnapshottableStreamEvent.

Parameter Name Data Type Default Value Description
events streams:StreamEvent[]|any[]?

The events to be coverted to snapshotable events.

Return Type Description
SnapshottableStreamEvent[]

Returns the converted snapshotable events.

public function toStreamEvent(streams:SnapshottableStreamEvent event) returns (StreamEvent)

Convert a single streams:SnapshottableStreamEvent object to streams:StreamEvent object.

Parameter Name Data Type Default Value Description
event streams:SnapshottableStreamEvent

The streams:SnapshottableStreamEvent object to be converted to a stream event.

Return Type Description
StreamEvent

The converted streams:StreamEvent object.

public function toStreamEvents(streams:SnapshottableStreamEvent[]|any[]? events) returns (StreamEvent[])

Converts a given array of snapshotable events to an array of streams:StreamEvent objects.

Parameter Name Data Type Default Value Description
events streams:SnapshottableStreamEvent[]|any[]?

Snapshotable events to be converted to streams:StreamEvents.

Return Type Description
StreamEvent[]

Returns the converted streams:StreamEvents objects array.

public function uniqueLength(any[] windowParameters, function (streams:StreamEvent?[]) returns ()? nextProcessPointer) returns (Window)

The uniqueLength function creates a UniqueLengthWindow object and returns it.

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()? ()

The function pointer to the process function of the next processor.

Return Type Description
Window

Returns the created window.

public type Aggregator object

Abstract object, which should be implemented in order to create a new aggregator.

  • <Aggregator> copy() returns (Aggregator)

    Returns a copy of self, but it does not contain the current state.

    Return Type Description
    Aggregator

    A Aggregator object.

  • <Aggregator> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the aggregated value and returns the final aggregated value.

    Parameter Name Data Type Default Value Description
    value anydata

    value being aggregated.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET. Based on the type of the event value will be added to the aggregation or removed from the aggregation.

    Return Type Description
    anydata

    Updated aggregated value after value being aggregated.

public type Average object

Aggregator to calculate average in streams.

Field Name Data Type Default Value Description
count int 0
sum float 0.0
  • <Average> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Returns the calculated average after value being aggregated into current average. If the eventType is streams:CURRENT,value is added to the current sum and count is increase by 1. If the eventType is streams:EXPIRED, value is subtracted from the current sum and count is descreased by 1. If the eventType is streams:RESET, Current summation and count is reset, regardless the value. Then by dividing the sum by count, the average is calculated.

    Parameter Name Data Type Default Value Description
    value anydata

    The numeric value being aggregated in order to calculate the new average.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated average value after value being aggregated.

  • <Average> copy() returns (Aggregator)

    Returns a copy of the Average aggregator without its current state.

    Return Type Description
    Aggregator

    Returns Average aggregator.

  • <Average> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <Average> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type Count object

Aggregator to count events in streams.

Field Name Data Type Default Value Description
count int 0
  • <Count> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current count when a new event arrives and return the updated count. If the eventType is streams:CURRENT, count is increase by 1. If the eventType is streams:EXPIRED, count is descreased by 1. If the eventTypeis streams:RESET, count is reset, regardless the value.

    Parameter Name Data Type Default Value Description
    value anydata

    In count aggregator the value is not used.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated count.

  • <Count> copy() returns (Aggregator)

    Returns a copy of the Count aggregator without its current state.

    Return Type Description
    Aggregator

    Returns Count aggregator.

  • <Count> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <Count> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type DelayWindow object

Field Name Data Type Default Value Description
delayInMilliSeconds int
windowParameters any[]
delayedEventQueue streams:LinkedList
lastTimestamp int 0
nextProcessPointer function (streams:StreamEvent?[]) returns ()?
scheduler streams:Scheduler
  • <DelayWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <DelayWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <DelayWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <DelayWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <DelayWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <DelayWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type DistinctCount object

Aggregator to get the distinct counts of values in streams.

Field Name Data Type Default Value Description
distinctValues map {}
  • <DistinctCount> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current distinct count when a new event arrives and return the updated count. If the eventType is streams:CURRENT, count is increased by 1. If the eventType is streams:EXPIRED, count is descreased by 1. If the eventTypeis streams:RESET, count is reset, regardless of the value.

    Parameter Name Data Type Default Value Description
    value anydata

    Value being counted uniquely.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated distinct count.

  • <DistinctCount> copy() returns (Aggregator)

    Returns a copy of the DistinctCount aggregator without its current state.

    Return Type Description
    Aggregator

    Returns DistinctCount aggregator.

  • <DistinctCount> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <DistinctCount> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type ExternalTimeBatchWindow object

Field Name Data Type Default Value Description
timeToKeep int
currentEventChunk streams:LinkedList
expiredEventChunk streams:LinkedList
resetEvent streams:StreamEvent? ()
startTime int -1
isStartTimeEnabled boolean false
replaceTimestampWithBatchEndTime boolean false
flushed boolean false
endTime int -1
schedulerTimeout int 0
lastScheduledTime int
lastCurrentEventTime int 0
nextProcessPointer function (streams:StreamEvent?[]) returns ()?
timeStamp string
storeExpiredEvents boolean false
outputExpectsExpiredEvents boolean false
windowParameters any[]
scheduler streams:Scheduler
  • <ExternalTimeBatchWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <ExternalTimeBatchWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <ExternalTimeBatchWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <ExternalTimeBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <ExternalTimeBatchWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <ExternalTimeBatchWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

  • <ExternalTimeBatchWindow> cloneAppend(streams:StreamEvent currStreamEvent)

    Parameter Name Data Type Default Value Description
    currStreamEvent streams:StreamEvent
  • <ExternalTimeBatchWindow> flushToOutputChunk(streams:LinkedList complexEventChunks, int currentTime, boolean preserveCurrentEvents)

    Parameter Name Data Type Default Value Description
    complexEventChunks streams:LinkedList
    currentTime int
    preserveCurrentEvents boolean
  • <ExternalTimeBatchWindow> appendToOutputChunk(streams:LinkedList complexEventChunks, int currentTime, boolean preserveCurrentEvents)

    Parameter Name Data Type Default Value Description
    complexEventChunks streams:LinkedList
    currentTime int
    preserveCurrentEvents boolean
  • <ExternalTimeBatchWindow> findEndTime(int currentTime, int startTime_, int timeToKeep_) returns (int)

    Parameter Name Data Type Default Value Description
    currentTime int
    startTime_ int
    timeToKeep_ int
    Return Type Description
    int
  • <ExternalTimeBatchWindow> initTiming(streams:StreamEvent firstStreamEvent)

    Parameter Name Data Type Default Value Description
    firstStreamEvent streams:StreamEvent
  • <ExternalTimeBatchWindow> getTimestamp(any val) returns (int)

    Parameter Name Data Type Default Value Description
    val any
    Return Type Description
    int

public type ExternalTimeWindow object

This is a sliding time window based on external time, that holds events for that arrived during last window time period from the external timestamp, and gets updated on every monotonically increasing timestamp. E.g. from inputStream window externalTime(inputStream.timestamp, 4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The externalTime window should only have two parameters (timestamp field, windowTime)

Field Name Data Type Default Value Description
timeInMillis int
windowParameters any[]
expiredEventQueue streams:LinkedList
nextProcessPointer function (streams:StreamEvent?[]) returns ()?
timeStamp string
  • <ExternalTimeWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <ExternalTimeWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <ExternalTimeWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <ExternalTimeWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <ExternalTimeWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <ExternalTimeWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

  • <ExternalTimeWindow> getTimestamp(any val) returns (int)

    Parameter Name Data Type Default Value Description
    val any
    Return Type Description
    int

public type Filter object

The Filter object represents the where clause in a streaming query. This object takes two parameter for initialization. nextProcessorPointer is the function pointer of the next processor to be invoked once the filtering is complete. conditionFunc is a function pointer which return true if the given where clause evaluates to true.

  • <Filter> process(streams:StreamEvent?[] streamEvents)

    Process the incoming stream events. This function takes an array of stream events, iterate each of the events in the array, then call the conditionFunc on each and see if conditionFunc is evaluates to true. if so, those events will be passed to the nextPrcessorPointer which can be the process function of the next processor object ( e.g. Select, Window, Aggregator.. etc).

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The events being filtered.

public type HoppingWindow object

Field Name Data Type Default Value Description
timeInMilliSeconds int
hoppingTime int
windowParameters any[]
nextEmitTime int -1
currentEventQueue streams:LinkedList
resetEvent streams:StreamEvent?
nextProcessPointer function (streams:StreamEvent?[]) returns ()?
scheduler streams:Scheduler
  • <HoppingWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <HoppingWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <HoppingWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <HoppingWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <HoppingWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <HoppingWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type IntSort object

This class implements a merge sort algorithm to sort timestamp values for state persistence.

  • <IntSort> sort(int[] arr)

    Sorts a given array of int values.

    Parameter Name Data Type Default Value Description
    arr int[]

    The array of int values to be sorted.

public type LengthBatchWindow object

This is a batch (tumbling) length window, that holds up to the given length of events, and gets updated on every given number of events arrival. E.g. from inputStream window lengthBatch(5) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The lengthBatch window should only have one parameter ( windowBatchLength)

Field Name Data Type Default Value Description
length int
windowParameters any[]
count int
resetEvent streams:StreamEvent?
currentEventQueue streams:LinkedList
nextProcessPointer function (streams:StreamEvent?[]) returns ()?
  • <LengthBatchWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <LengthBatchWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <LengthBatchWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <LengthBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <LengthBatchWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <LengthBatchWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type LengthWindow object

The LengthWindow is a sliding length window, that holds last windowLength events, and gets updated on every event arrival and expiry. E.g. from inputStream window length(5) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The length window should only have one parameter ( windowLength)

Field Name Data Type Default Value Description
size int
linkedList streams:LinkedList
windowParameters any[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()?
  • <LengthWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <LengthWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <LengthWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <LengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <LengthWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <LengthWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type LinkedList object

The LinkedList object which represents the linked list data structure.

Field Name Data Type Default Value Description
first streams:Node?
last streams:Node?
curr streams:Node?
size int 0
ascend boolean true
  • <LinkedList> isEmpty() returns (boolean)

    Checks if the linked list is empty.

    Return Type Description
    boolean

    Returns true if the linked list is empty, otherwise returns false.

  • <LinkedList> resetToFront()

    Moves the cursoer to the front/head of the linked list.

  • <LinkedList> resetToRear()

    Moves the cursor to the end of the linked list if the cursor is not already at the last element of the linked list.

  • <LinkedList> hasNext() returns (boolean)

    Returns true if the linked list has more elements starting from the current cursor location.

    Return Type Description
    boolean

    Returns true if there are more elements onwards from the current cursor location, otherwise false.

  • <LinkedList> hasPrevious() returns (boolean)

    Returns true if there are prior elements to the current element/cursor location, otherwise false.

    Return Type Description
    boolean

    Returns true, if there are elements prior to the current cursor location, otherwise false.

  • <LinkedList> next() returns (any)

    Returns the next element of the linked list and moves the cursor to the next element.

    Return Type Description
    any

    The next element from the current cursor location.

  • <LinkedList> previous() returns (any)

    Returns the previous element of the linked list and moves the cursor to the previous element.

    Return Type Description
    any

    The previous element from the current cursor location.

  • <LinkedList> removeCurrent()

    Removes the element at the current cursor location.

  • <LinkedList> getSize() returns (int)

    Returns the current number of elements in the linked list.

    Return Type Description
    int

    The number of elements in the linked list.

  • <LinkedList> clear()

    Empties the linked list.

  • <LinkedList> removeFirstOccurrence(any elem) returns (boolean)

    Removes the first occurence of the element pass as elem and return true of the removal is successful.

    Parameter Name Data Type Default Value Description
    elem any
    Return Type Description
    boolean

    Return true if removal is successful otherwise false.

  • <LinkedList> remove(any elem) returns (boolean)

    Removes the first occurence of the element pass as elem and return true of the removal is successful.

    Parameter Name Data Type Default Value Description
    elem any
    Return Type Description
    boolean

    Return true if removal is successful otherwise false.

  • <LinkedList> getFirst() returns (any)

    Returns the first element of the linked list, without moving the cursor.

    Return Type Description
    any

    First element of the linked list.

  • <LinkedList> getLast() returns (any)

    Returns the last element of the linked list, without moving the cursor.

    Return Type Description
    any

    Last element of the linked list.

  • <LinkedList> addFirst(any data)

    Adds a new element to the front of the linked list without moving the cursor.

    Parameter Name Data Type Default Value Description
    data any

    Data to be added to the front of the linked list.

  • <LinkedList> addLast(any data)

    Adds a new element to the end of the linked list without moving the cursor.

    Parameter Name Data Type Default Value Description
    data any

    Data to be added to the end of the linked list.

  • <LinkedList> removeFirst() returns (any)

    Removes the first element in the linked list without moving the cursor.

    Return Type Description
    any

    Returns the removed element.

  • <LinkedList> removeLast() returns (any)

    Removes the last element in the linked list without moving the cursor.

    Return Type Description
    any

    Returns the removed element.

  • <LinkedList> insertBeforeCurrent(any data)

    Insert a new element before the current cursor location.

    Parameter Name Data Type Default Value Description
    data any

    Data to be inserted.

  • <LinkedList> dequeue() returns (any)

    Returns the first element which is added to the linked list.

    Return Type Description
    any

    The dequeued element.

  • <LinkedList> asArray() returns (any[])

    Creates an array from the elements in the linked list and return it. The cursor will not be changed.

    Return Type Description
    any[]

    An array of elements in the linked list.

  • <LinkedList> addAll(any[] data)

    Adds elements of an array to the current cursor location and moves the cursor to the end of the list.

    Parameter Name Data Type Default Value Description
    data any[]

public type Max object

Aggregator to find the maximum value in a stream.

Field Name Data Type Default Value Description
iMaxQueue streams:LinkedList BLangTypeInit: new null ([])
fMaxQueue streams:LinkedList BLangTypeInit: new null ([])
iMax int? ()
fMax float? ()
  • <Max> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current maximum value and return the updated maximum value.

    Parameter Name Data Type Default Value Description
    value anydata

    Value being checked whether it is greater than the current maximum value.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated max value.

  • <Max> copy() returns (Aggregator)

    Returns a copy of the Max aggregator without its current state.

    Return Type Description
    Aggregator

    Returns Max aggregator.

  • <Max> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <Max> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type MaxForever object

The aggregator to keep the maximum value received so far. It is similar to Max aggregator, but it keeps the maximum value it received so far, forever.

Field Name Data Type Default Value Description
iMax int? ()
fMax float? ()
  • <MaxForever> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current maximum value and return the updated maximum value.

    Parameter Name Data Type Default Value Description
    value anydata

    Value being checked whether it is greater than the current maximum value.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated maximum value.

  • <MaxForever> copy() returns (Aggregator)

    Returns a copy of the MaxForever aggregator.

    Return Type Description
    Aggregator

    A Aggregator object which represents MaxForever aggregator.

  • <MaxForever> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <MaxForever> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type MergeSort object

This object implements the merge sort algorithm to sort the provided value arrays. fieldFuncs are function pointers which returns the field values of each stream event's data map's values. sortTypes are an array of ( streams:ASCENDING or streams:DESCENDING).

Field Name Data Type Default Value Description
fieldFuncs function (map) returns (anydata)?[]
sortTypes string[]
  • <MergeSort> __init(function (map<anydata>) returns (anydata)?[] fieldFuncs, string[] sortTypes)

    Parameter Name Data Type Default Value Description
    fieldFuncs function (map) returns (anydata)?[]
    sortTypes string[]
  • <MergeSort> topDownMergeSort(streams:StreamEvent?[] events)

    Sorts the given stream events using the merge sort algorithm.

    Parameter Name Data Type Default Value Description
    events streams:StreamEvent?[]

    The array of stream events to be sorted.

public type Min object

Aggregator to find the minimum value in a stream.

Field Name Data Type Default Value Description
iMinQueue streams:LinkedList BLangTypeInit: new null ([])
fMinQueue streams:LinkedList BLangTypeInit: new null ([])
iMin int? ()
fMin float? ()
  • <Min> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current minimum value and return the updated minimum value.

    Parameter Name Data Type Default Value Description
    value anydata

    Value being checked whether it is lesser than the current minimum value.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated minimum value.

  • <Min> copy() returns (Aggregator)

    Returns a copy of the Min aggregator.

    Return Type Description
    Aggregator

    A Aggregator object which represents Min aggregator.

  • <Min> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <Min> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type MinForever object

The aggregator to keep the minimum value received so far. It is similar to Min aggregator, but it keeps the minimum value it received so far, forever.

Field Name Data Type Default Value Description
iMin int? ()
fMin float? ()
  • <MinForever> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current minimum value and return the updated minimum value.

    Parameter Name Data Type Default Value Description
    value anydata

    Value being checked whether it is lesser than the current minimum value.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated minimum value.

  • <MinForever> copy() returns (Aggregator)

    Returns a copy of the MinForever aggregator.

    Return Type Description
    Aggregator

    A Aggregator object which represents MinForever aggregator.

  • <MinForever> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <MinForever> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type Node object

The Node object represents a node in the linkedlist data structure.

Field Name Data Type Default Value Description
data any
next streams:Node?
prev streams:Node?
  • <Node> __init(streams:Node? prev, any data, streams:Node? next)

    Parameter Name Data Type Default Value Description
    prev streams:Node?
    data any
    next streams:Node?

public type OrderBy object

The OrderBy object represents the desugared code of order by clause of a streaming query. This object takes 3 parameters to initialize itself. nextProcessPointer is the process method of the next processor. fieldFuncs is an array of function pointers which returns the field values to be sorted. sortTypes is an array of string specifying whether the sort order (ascending or descending). Internally this processor uses a MergeSort object to sort.

Field Name Data Type Default Value Description
nextProcessorPointer function (streams:StreamEvent?[]) returns ()
fieldFuncs function (map) returns (anydata)?[]
sortTypes string[]
mergeSort streams:MergeSort
  • <OrderBy> __init(function (streams:StreamEvent?[]) returns () nextProcessorPointer, function (map<anydata>) returns (anydata)?[] fieldFuncs, string[] sortTypes)

    Parameter Name Data Type Default Value Description
    nextProcessorPointer function (streams:StreamEvent?[]) returns ()
    fieldFuncs function (map) returns (anydata)?[]
    sortTypes string[]
  • <OrderBy> process(streams:StreamEvent?[] streamEvents)

    Sorts the given array of stream events according to the given parameters (fieldFuncs and sortTypes).

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be sorted.

public type OutputProcess object

The OutputProcess object is responsible for sending the output (only the events of type streams:CURRENT to the destination stream. It takes a function pointer outputFunc which actually has the logic to process the output.

  • <OutputProcess> __init(function (map<anydata>[]) returns () outputFunc)

    Parameter Name Data Type Default Value Description
    outputFunc function (map[]) returns ()
  • <OutputProcess> process(streams:StreamEvent?[] streamEvents)

    Sends the output to the streaming action. most of the time the output is published to a destination stream at the streaming action. Only the events with type streams:CURRENT are passed to the outputFunc.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be filtered out for CURRENT events.

public type Scheduler object

The Scheduler object is responsible for generating streams:TIMER events at the given timestamp. Once the event is generated, the timer event is passed to the provided processFunc function pointer. The function pointer is the process function of the target processor, to which the timer event should be sent.

  • <Scheduler> __init(function (streams:StreamEvent?[]) returns () processFunc)

    Parameter Name Data Type Default Value Description
    processFunc function (streams:StreamEvent?[]) returns ()
  • <Scheduler> notifyAt(int timestamp)

    Schedule to send a timer events at the given timestamp.

    Parameter Name Data Type Default Value Description
    timestamp int

    The timestamp at which the timer event will be generated and passed to the provided processFunc.

  • <Scheduler> sendTimerEvents() returns (error?<>)

    Creates the timer events.

    Return Type Description
    error?<>

    Returns error if sending timer events failed.

public type Select object

The Select object represents the select clause. Anything that comes under select clause like aggregator function invocations are also handled in this processor. Further, grouping of the events (provided by the groupby clause) is also performed in this processor. aggregatorArr is an array of aggregators which are used in the select clause. groupbyFuncArray is an array of function pointers which returns the values being grouped. selectFunc is a function pointer to a lambda function which creates the data field of the output stream event. scopeName is used as a breadcrumb to identify the select clause if there are multiple forever blocks.

  • <Select> process(streams:StreamEvent?[] streamEvents)

    Selects only the selected fields in the select clause.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events passed to the select clause.

  • <Select> getGroupByKey(function (streams:StreamEvent) returns (anydata)?[]? groupbyFunctionArray, streams:StreamEvent e) returns (string)

    Creates a unique key for each group with the given values in the group by clause.

    Parameter Name Data Type Default Value Description
    groupbyFunctionArray function (streams:StreamEvent) returns (anydata)?[]?

    Function pointer array to the lambda function which returns each group by field.

    e streams:StreamEvent

    Stream Event object being grouped.

    Return Type Description
    string

    Returns a unique groupby key by which the event e is grouped.

public type Snapshotable object

Abstract Snapshotable to be referenced by all snapshotable objects.

  • <Snapshotable> saveState() returns (map<any>)

    Function to return the current state of a snapshotable object.

    Return Type Description
    map

    A map<any> that represents the current state of the snapshotable instance.

  • <Snapshotable> restoreState(map state)

    Function to restore a previous state intoa a snapshotable object.

    Parameter Name Data Type Default Value Description
    state map

    A map<any> state that can be used to restore snapshotable instance to a previous state.

public type SortWindow object

Field Name Data Type Default Value Description
lengthToKeep int
windowParameters any[]
sortedWindow streams:LinkedList
fields string[]
sortTypes string[]
nextProcessPointer function (streams:StreamEvent?[]) returns ()?
fieldFuncs function (map) returns (anydata)?[]
mergeSort streams:MergeSort
  • <SortWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <SortWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <SortWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <SortWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <SortWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <SortWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type StdDev object

The aggregator object to calculate standard deviation

Field Name Data Type Default Value Description
mean float 0.0
stdDeviation float 0.0
sumValue float 0.0
count int 0
  • <StdDev> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current standard deviation as the new values come into the aggregation.

    Parameter Name Data Type Default Value Description
    value anydata

    Value being added or removed from aggregation in order to calculate the new standard deviation.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    Updated standard deviation.

  • <StdDev> copy() returns (Aggregator)

    Returns a copy of the StdDev aggregator.

    Return Type Description
    Aggregator

    A Aggregator object which represents StdDev aggregator.

  • <StdDev> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <StdDev> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type StreamEvent object

The StreamEvent object is a wrapper around the actual data being received to the input stream. If a record is receive to a input stream, that record is converted to a map of anydata values and set that map to a field called data in a new StreamEvent object. StreamEvent is only used internally to transmit event data from one processor to another processor. At the time the record is converted to a map, the timestamp is set. If the record is first received by the input stream, the eventType is set to streams:CURRENT. Other than stream events of type streams:CURRENT, there are 3 types of StreamEvents. They are streams:EXPIRED, streams:RESET, streams:TIMER. An expired event is used to remove the state of its respective current event. A reset event is used to completely wipe the state and a timer event is used to trigger the process method of a particular processor in timely manner.

Field Name Data Type Default Value Description
eventType CURRENT|EXPIRED|ALL|RESET|TIMER
timestamp int
data map {}
  • <StreamEvent> __init((string,map<anydata>)|map<anydata> eventData, CURRENT|EXPIRED|ALL|RESET|TIMER eventType, int timestamp)

    Parameter Name Data Type Default Value Description
    eventData (string,map)|map
    eventType CURRENT|EXPIRED|ALL|RESET|TIMER
    timestamp int
  • <StreamEvent> copy() returns (StreamEvent)

    Returns a copy of the stream event instance.

    Return Type Description
    StreamEvent

    A copy of the StreamEvent object with its state.

  • <StreamEvent> addData(map<anydata> eventData)

    Adds key values pairs in a given map to the field data.

    Parameter Name Data Type Default Value Description
    eventData map

    map of anydata values to be added to field data.

  • <StreamEvent> addAttribute(string key, anydata val)

    Adds an attribute of an event to the map with its value.

    Parameter Name Data Type Default Value Description
    key string

    The key of the map entry.

    val anydata

    Respective value of the key.

public type StreamJoinProcessor object

The StreamJoinProcessor object is responsible for performing SQLish joins between two or more streams. The onConditionFunc is the lambda function which represents the where clause in the join clause. The joining happens only if the condition is statified. nextProcessor is the process function of the next processor, which can be a Select processor, Aggregator processor, Having processor.. etc. The lhsStream is the left hand side stream of the join and its attached window is 'lhsWindow. The rhsStream is the right hand side stream of the join and its attached window is 'rhsWindow. The unidirectionalStream stream defines the stream by which the joining is triggered when the events are received. Usually it is lhsStream, in rare cases it can be rhsStream. The joinType is the type of the join and it can be any value defined by streams:JoinType.

Field Name Data Type Default Value Description
lhsWindow streams:Window?
rhsWindow streams:Window?
lhsStream string?
rhsStream string?
unidirectionalStream string?
joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN
lockField int 0
  • <StreamJoinProcessor> __init(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (map<anydata>,map<anydata>) returns (boolean)? onConditionFunc)

    Parameter Name Data Type Default Value Description
    nextProcessor function (streams:StreamEvent?[]) returns ()
    joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN
    onConditionFunc function (map,map) returns (boolean)?
  • <StreamJoinProcessor> process(streams:StreamEvent?[] streamEvents)

    Process the events from both lhsStream and the rhsStream and perform the joining.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    Stream events being joined.

  • <StreamJoinProcessor> setLHS(string streamName, streams:Window windowInstance)

    Sets the left hand side stream name and the respective window instance.

    Parameter Name Data Type Default Value Description
    streamName string

    The name of the left hand side stream.

    windowInstance streams:Window

    The window attached to the left hand side stream.

  • <StreamJoinProcessor> setRHS(string streamName, streams:Window windowInstance)

    Sets the right hand side stream name and the respective window instance.

    Parameter Name Data Type Default Value Description
    streamName string

    The name of the right hand side stream.

    windowInstance streams:Window

    The window attached to the right hand side stream.

  • <StreamJoinProcessor> setUnidirectionalStream(string streamName)

    Sets the stream by which the joining is triggered.

    Parameter Name Data Type Default Value Description
    streamName string

    The name of the stream. In most cases, the joining is triggered when the events are received by the left hand side stream even if the right hand side stream receives the events before the left hand side stream receives events.

public type Sum object

Aggregator to perform summation of values in a stream.

Field Name Data Type Default Value Description
iSum int 0
fSum float 0.0
  • <Sum> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Updates the current sum of numeric values based on the eventType. If the eventType is streams:CURRENT, value is added to the current sum. If the eventType is streams:EXPIRED, value is subtracted from the current sum. If the eventType is streams:RESET, Current summation will be reset, regardless the value.

    Parameter Name Data Type Default Value Description
    value anydata

    The numeric value being aggregated to the current sum.

    eventType CURRENT|EXPIRED|ALL|RESET|TIMER

    Type of the incoming event streams:CURRENT, streams:EXPIRED or streams:RESET.

    Return Type Description
    anydata

    aggregated summation after the given value.

  • <Sum> copy() returns (Aggregator)

    Returns a copy of the Sum aggregator without its current state.

    Return Type Description
    Aggregator

    Returns Sum aggregator.

  • <Sum> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <Sum> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type TableJoinProcessor object

The TableJoinProcessor object handles joining streams with in-memory tables in ballerina. nextProcessor is the process function of the next processor, which can be a Select processor, Aggregator processor, Having processor.. etc. The streamName is the stream of the join and its attached window is 'windowInstance. The tableName is the name of the table with which the stream is joined. The joinType is the type of the join and it can be any value defined by streams:JoinType.

Field Name Data Type Default Value Description
windowInstance streams:Window?
streamName string
tableName string
joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN
lockField int 0
  • <TableJoinProcessor> __init(function (streams:StreamEvent?[]) returns () nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (streams:StreamEvent) returns (map<anydata>[]) tableQuery)

    Parameter Name Data Type Default Value Description
    nextProcessor function (streams:StreamEvent?[]) returns ()
    joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN
    tableQuery function (streams:StreamEvent) returns (map[])
  • <TableJoinProcessor> process(streams:StreamEvent?[] streamEvents)

    Joins the incoming events to the stream with the given table.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The stream events being joined with the table.

  • <TableJoinProcessor> setJoinProperties(string tn, string sn, streams:Window wi)

    Set the properties required for joining.

    Parameter Name Data Type Default Value Description
    tn string

    The name of the table being joined with the stream.

    sn string

    The name of the stream being joined with the table.

    wi streams:Window

    The window instance which is attached to the stream.

public type TimeAccumulatingWindow object

Field Name Data Type Default Value Description
timeInMillis int
windowParameters any[]
currentEventQueue streams:LinkedList
resetEvent streams:StreamEvent?
nextProcessPointer function (streams:StreamEvent?[]) returns ()?
lastTimestamp int -1
scheduler streams:Scheduler
  • <TimeAccumulatingWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <TimeAccumulatingWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <TimeAccumulatingWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <TimeAccumulatingWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <TimeAccumulatingWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <TimeAccumulatingWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type TimeBatchWindow object

Field Name Data Type Default Value Description
timeInMilliSeconds int
windowParameters any[]
nextEmitTime int -1
currentEventQueue streams:LinkedList
resetEvent streams:StreamEvent?
nextProcessPointer function (streams:StreamEvent?[]) returns ()?
scheduler streams:Scheduler
  • <TimeBatchWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <TimeBatchWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <TimeBatchWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <TimeBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <TimeBatchWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <TimeBatchWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type TimeLengthWindow object

Field Name Data Type Default Value Description
timeInMilliSeconds int
length int
windowParameters any[]
count int 0
expiredEventChunk streams:LinkedList
nextProcessPointer function (streams:StreamEvent?[]) returns ()?
scheduler streams:Scheduler
  • <TimeLengthWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <TimeLengthWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <TimeLengthWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <TimeLengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <TimeLengthWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <TimeLengthWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type TimeOrderWindow object

Field Name Data Type Default Value Description
timeInMillis int
windowParameters any[]
expiredEventQueue streams:LinkedList
nextProcessPointer function (streams:StreamEvent?[]) returns ()?
timestamp string
lastTimestamp int
dropOlderEvents boolean
mergeSort streams:MergeSort
scheduler streams:Scheduler
  • <TimeOrderWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <TimeOrderWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <TimeOrderWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <TimeOrderWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <TimeOrderWindow> getTimestamp(any val) returns (int)

    Parameter Name Data Type Default Value Description
    val any
    Return Type Description
    int
  • <TimeOrderWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <TimeOrderWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type TimeWindow object

Field Name Data Type Default Value Description
timeInMillis int
windowParameters any[]
expiredEventQueue streams:LinkedList
nextProcessPointer function (streams:StreamEvent?[]) returns ()?
lastTimestamp int -9223372036854775808
scheduler streams:Scheduler
  • <TimeWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <TimeWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <TimeWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <TimeWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <TimeWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <TimeWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type UniqueLengthWindow object

This is a length window which only keeps the unique events. E.g. from inputStream window uniqueLength(inputStream.timestamp, 4000) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } } The uniqueLength window should only have two parameters (stream field, windowLength)

Field Name Data Type Default Value Description
uniqueKey string
length int
windowParameters any[]
count int 0
uniqueMap map
expiredEventChunk streams:LinkedList
nextProcessPointer function (streams:StreamEvent?[]) returns ()?
  • <UniqueLengthWindow> __init(function (streams:StreamEvent?[]) returns ()? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent?[]) returns ()?
    windowParameters any[]
  • <UniqueLengthWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <UniqueLengthWindow> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <UniqueLengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.

  • <UniqueLengthWindow> saveState() returns (map<any>)

    Return current state to be saved as a map of any typed values.

    Return Type Description
    map

    A map of any typed values.

  • <UniqueLengthWindow> restoreState(map state)

    Restores the saved state which is passed as a map of any typed values.

    Parameter Name Data Type Default Value Description
    state map

    A map of typed any values. This map contains the values to be restored from the persisted data.

public type Window object

The Window abstract objects is the base object for implementing windows in Ballerina streams. The process function contains the logic of processing events when events are received. getCandidateEvents function is used inside the Select object to return the events in the window to perform joining. The window names in the window objects cannot be used in the queries. Always a function which returns the specific window has to be used in streaing query. E.g. If LengthWindow has to be used in a streaming query, the function streams:length has to be used for streaming query without the module identifier streams. An example is shown below.

from inputStream window length(5) select inputStream.name, inputStream.age, sum(inputStream.age) as sumAge, count() as count group by inputStream.name => (TeacherOutput [] teachers) { foreach var t in teachers { outputStream.publish(t); } }

  • <Window> process(streams:StreamEvent?[] streamEvents)

    The process function process the incoming events to the events and update the current state of the window.

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent?[]

    The array of stream events to be processed.

  • <Window> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Returns the events(State) which match with the where condition in the join clause for a given event.

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent

    The event against which the state or the events being held by the window is matched.

    conditionFunc function (map,map) returns (boolean)?

    The function pointer to the lambda function which contain the condition logic in where clause.

    isLHSTrigger boolean true

    Specify if the join is triggered when the lhs stream received the events, if so it should be true. Most of the time it is true. In rare cases, where the join is triggered when the rhs stream receives events this should be false.

    Return Type Description
    (StreamEvent,StreamEvent)[]

    Returns an array of 2 element tuples of events. A tuple contains the matching events one from lhs stream and one from rhs stream.