ballerina/streams module

Type Definitions

Type Values Description
EventType TIMER | RESET | EXPIRED | CURRENT | ALL
JoinType RIGHTOUTERJOIN | LEFTOUTERJOIN | JOIN | FULLOUTERJOIN

Objects Summary

Object Description
Aggregator
Average
Count
DelayWindow
DistinctCount
ExternalTimeBatchWindow
ExternalTimeWindow
Filter
HoppingWindow
LengthBatchWindow
LengthWindow
LinkedList
Max
MaxForever
MergeSort
Min
MinForever
Node
OrderBy
OutputProcess
Scheduler
Select
SortWindow
StdDev
StreamEvent
StreamJoinProcessor
Sum
TableJoinProcessor
TimeAccumulatingWindow
TimeBatchWindow
TimeLengthWindow
TimeOrderWindow
TimeWindow
UniqueLengthWindow
Window

Functions Summary

Return Type Function and Description
Aggregator avg()
StreamEvent[] buildStreamEvent(any t, string streamName)
Aggregator count()
Filter createFilter(function (streams:StreamEvent[]) returns (()) nextProcPointer, function (map<anydata>) returns (boolean) conditionFunc)
OrderBy createOrderBy(function (streams:StreamEvent[]) returns (()) nextProcessorPointer, function (map<anydata>) returns (anydata)[] fields, string[] sortFieldMetadata)
OutputProcess createOutputProcess(function (map<anydata>[]) returns (()) outputFunc)
StreamEvent createResetStreamEvent(streams:StreamEvent event)
Select createSelect(function (streams:StreamEvent[]) returns (()) nextProcPointer, streams:Aggregator[] aggregatorArr, function (streams:StreamEvent) returns (anydata)[]? groupbyFuncArray, function (streams:StreamEvent,streams:Aggregator[]) returns (map<anydata>) selectFunc)
StreamJoinProcessor createStreamJoinProcessor(function (streams:StreamEvent[]) returns (()) nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc)
TableJoinProcessor createTableJoinProcessor(function (streams:StreamEvent[]) returns (()) nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (streams:StreamEvent) returns (map<anydata>[]) tableQuery)
Window delay(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Aggregator distinctCount()
Window externalTime(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Window externalTimeBatch(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
StreamEvent getStreamEvent(any anyEvent)
Window hopping(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Window length(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Window lengthBatch(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Aggregator max()
Aggregator maxForever()
Aggregator min()
Aggregator minForever()
Window sort(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Aggregator stdDev()
Aggregator sum()
Window time(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Window timeAccum(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Window timeBatch(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Window timeLength(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Window timeOrder(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Window uniqueLength(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer)

Global Variables

Name Data Type Description
ASCENDING string
CURRENT EventType
DEFAULT string
DELIMITER string
DELIMITER_REGEX string
DESCENDING string
EXPIRED EventType
OUTPUT string
RESET EventType
TIMER EventType

public function avg() returns (Aggregator)

Return Type Description
Aggregator

public function buildStreamEvent(any t, string streamName) returns (StreamEvent[])

Parameter Name Data Type Default Value Description
t any
streamName string
Return Type Description
StreamEvent[]

public function count() returns (Aggregator)

Return Type Description
Aggregator

public function createFilter(function (streams:StreamEvent[]) returns (()) nextProcPointer, function (map<anydata>) returns (boolean) conditionFunc) returns (Filter)

Parameter Name Data Type Default Value Description
nextProcPointer function (streams:StreamEvent[]) returns (())
conditionFunc function (map) returns (boolean)
Return Type Description
Filter

public function createOrderBy(function (streams:StreamEvent[]) returns (()) nextProcessorPointer, function (map<anydata>) returns (anydata)[] fields, string[] sortFieldMetadata) returns (OrderBy)

Parameter Name Data Type Default Value Description
nextProcessorPointer function (streams:StreamEvent[]) returns (())
fields function (map) returns (anydata)[]
sortFieldMetadata string[]
Return Type Description
OrderBy

public function createOutputProcess(function (map<anydata>[]) returns (()) outputFunc) returns (OutputProcess)

Parameter Name Data Type Default Value Description
outputFunc function (map[]) returns (())
Return Type Description
OutputProcess

public function createResetStreamEvent(streams:StreamEvent event) returns (StreamEvent)

Parameter Name Data Type Default Value Description
event streams:StreamEvent
Return Type Description
StreamEvent

public function createSelect(function (streams:StreamEvent[]) returns (()) nextProcPointer, streams:Aggregator[] aggregatorArr, function (streams:StreamEvent) returns (anydata)[]? groupbyFuncArray, function (streams:StreamEvent,streams:Aggregator[]) returns (map<anydata>) selectFunc) returns (Select)

Parameter Name Data Type Default Value Description
nextProcPointer function (streams:StreamEvent[]) returns (())
aggregatorArr streams:Aggregator[]
groupbyFuncArray function (streams:StreamEvent) returns (anydata)[]?
selectFunc function (streams:StreamEvent,streams:Aggregator[]) returns (map)
Return Type Description
Select

public function createStreamJoinProcessor(function (streams:StreamEvent[]) returns (()) nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc) returns (StreamJoinProcessor)

Parameter Name Data Type Default Value Description
nextProcessor function (streams:StreamEvent[]) returns (())
joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN
conditionFunc function (map,map) returns (boolean)? null
Return Type Description
StreamJoinProcessor

public function createTableJoinProcessor(function (streams:StreamEvent[]) returns (()) nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (streams:StreamEvent) returns (map<anydata>[]) tableQuery) returns (TableJoinProcessor)

Parameter Name Data Type Default Value Description
nextProcessor function (streams:StreamEvent[]) returns (())
joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN
tableQuery function (streams:StreamEvent) returns (map[])
Return Type Description
TableJoinProcessor

public function delay(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function distinctCount() returns (Aggregator)

Return Type Description
Aggregator

public function externalTime(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function externalTimeBatch(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function getStreamEvent(any anyEvent) returns (StreamEvent)

Parameter Name Data Type Default Value Description
anyEvent any
Return Type Description
StreamEvent

public function hopping(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function length(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function lengthBatch(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function max() returns (Aggregator)

Return Type Description
Aggregator

public function maxForever() returns (Aggregator)

Return Type Description
Aggregator

public function min() returns (Aggregator)

Return Type Description
Aggregator

public function minForever() returns (Aggregator)

Return Type Description
Aggregator

public function sort(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function stdDev() returns (Aggregator)

Return Type Description
Aggregator

public function sum() returns (Aggregator)

Return Type Description
Aggregator

public function time(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function timeAccum(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function timeBatch(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function timeLength(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function timeOrder(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function uniqueLength(any[] windowParameters, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
windowParameters any[]
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public type Aggregator object

  • <Aggregator> copy() returns (Aggregator)

    Return Type Description
    Aggregator
  • <Aggregator> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Parameter Name Data Type Default Value Description
    value anydata
    eventType CURRENT|EXPIRED|ALL|RESET|TIMER
    Return Type Description
    anydata

public type Average object

Field Name Data Type Default Value Description
count int 0
sum float 0.0
  • <Average> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Parameter Name Data Type Default Value Description
    value anydata
    eventType CURRENT|EXPIRED|ALL|RESET|TIMER
    Return Type Description
    anydata
  • <Average> copy() returns (Aggregator)

    Return Type Description
    Aggregator

public type Count object

Field Name Data Type Default Value Description
count int 0
  • <Count> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Parameter Name Data Type Default Value Description
    value anydata
    eventType CURRENT|EXPIRED|ALL|RESET|TIMER
    Return Type Description
    anydata
  • <Count> copy() returns (Aggregator)

    Return Type Description
    Aggregator

public type DelayWindow object

Field Name Data Type Default Value Description
delayInMilliSeconds int
windowParameters any[]
delayedEventQueue streams:LinkedList
lastTimestamp int 0
timer task:Timer?
nextProcessPointer function (streams:StreamEvent[]) returns (())?
  • <DelayWindow> __init(function (streams:StreamEvent[]) returns (())? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    windowParameters any[]
  • <DelayWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <DelayWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <DelayWindow> invokeProcess() returns (error?<>)

    Return Type Description
    error?<>
  • <DelayWindow> handleError(error e)

    Parameter Name Data Type Default Value Description
    e error
  • <DelayWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]

public type DistinctCount object

Field Name Data Type Default Value Description
distinctValues map {}
  • <DistinctCount> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Parameter Name Data Type Default Value Description
    value anydata
    eventType CURRENT|EXPIRED|ALL|RESET|TIMER
    Return Type Description
    anydata
  • <DistinctCount> copy() returns (Aggregator)

    Return Type Description
    Aggregator

public type ExternalTimeBatchWindow object

Field Name Data Type Default Value Description
timeToKeep int
currentEventChunk streams:LinkedList
expiredEventChunk streams:LinkedList
resetEvent streams:StreamEvent? null
startTime int -1
isStartTimeEnabled boolean false
replaceTimestampWithBatchEndTime boolean false
flushed boolean false
endTime int -1
schedulerTimeout int 0
lastScheduledTime int
lastCurrentEventTime int 0
timer task:Timer?
nextProcessPointer function (streams:StreamEvent[]) returns (())?
timeStamp string
storeExpiredEvents boolean false
outputExpectsExpiredEvents boolean false
windowParameters any[]
  • <ExternalTimeBatchWindow> __init(function (streams:StreamEvent[]) returns (())? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    windowParameters any[]
  • <ExternalTimeBatchWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <ExternalTimeBatchWindow> invokeProcess() returns (error?<>)

    Return Type Description
    error?<>
  • <ExternalTimeBatchWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <ExternalTimeBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]
  • <ExternalTimeBatchWindow> handleError(error e)

    Parameter Name Data Type Default Value Description
    e error
  • <ExternalTimeBatchWindow> cloneAppend(streams:StreamEvent currStreamEvent)

    Parameter Name Data Type Default Value Description
    currStreamEvent streams:StreamEvent
  • <ExternalTimeBatchWindow> flushToOutputChunk(streams:LinkedList complexEventChunks, int currentTime, boolean preserveCurrentEvents)

    Parameter Name Data Type Default Value Description
    complexEventChunks streams:LinkedList
    currentTime int
    preserveCurrentEvents boolean
  • <ExternalTimeBatchWindow> appendToOutputChunk(streams:LinkedList complexEventChunks, int currentTime, boolean preserveCurrentEvents)

    Parameter Name Data Type Default Value Description
    complexEventChunks streams:LinkedList
    currentTime int
    preserveCurrentEvents boolean
  • <ExternalTimeBatchWindow> findEndTime(int currentTime, int startTime_, int timeToKeep_) returns (int)

    Parameter Name Data Type Default Value Description
    currentTime int
    startTime_ int
    timeToKeep_ int
    Return Type Description
    int
  • <ExternalTimeBatchWindow> initTiming(streams:StreamEvent firstStreamEvent)

    Parameter Name Data Type Default Value Description
    firstStreamEvent streams:StreamEvent
  • <ExternalTimeBatchWindow> getTimestamp(any val) returns (int)

    Parameter Name Data Type Default Value Description
    val any
    Return Type Description
    int

public type ExternalTimeWindow object

Field Name Data Type Default Value Description
timeInMillis int
windowParameters any[]
expiredEventQueue streams:LinkedList
nextProcessPointer function (streams:StreamEvent[]) returns (())?
timeStamp string
  • <ExternalTimeWindow> __init(function (streams:StreamEvent[]) returns (())? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    windowParameters any[]
  • <ExternalTimeWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <ExternalTimeWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <ExternalTimeWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]
  • <ExternalTimeWindow> getTimestamp(any val) returns (int)

    Parameter Name Data Type Default Value Description
    val any
    Return Type Description
    int

public type Filter object

  • <Filter> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]

public type HoppingWindow object

Field Name Data Type Default Value Description
timeInMilliSeconds int
hoppingTime int
windowParameters any[]
currentEventQueue streams:LinkedList
resetEvent streams:StreamEvent?
timer task:Timer?
isStart boolean
nextProcessPointer function (streams:StreamEvent[]) returns (())?
  • <HoppingWindow> __init(function (streams:StreamEvent[]) returns (())? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    windowParameters any[]
  • <HoppingWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <HoppingWindow> invokeProcess() returns (error?<>)

    Return Type Description
    error?<>
  • <HoppingWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <HoppingWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]
  • <HoppingWindow> handleError(error e)

    Parameter Name Data Type Default Value Description
    e error

public type LengthBatchWindow object

Field Name Data Type Default Value Description
length int
windowParameters any[]
count int
resetEvent streams:StreamEvent?
currentEventQueue streams:LinkedList
expiredEventQueue streams:LinkedList?
nextProcessPointer function (streams:StreamEvent[]) returns (())?
  • <LengthBatchWindow> __init(function (streams:StreamEvent[]) returns (())? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    windowParameters any[]
  • <LengthBatchWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <LengthBatchWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <LengthBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]

public type LengthWindow object

Field Name Data Type Default Value Description
size int
linkedList streams:LinkedList
windowParameters any[]
nextProcessPointer function (streams:StreamEvent[]) returns (())?
  • <LengthWindow> __init(function (streams:StreamEvent[]) returns (())? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    windowParameters any[]
  • <LengthWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <LengthWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <LengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]

public type LinkedList object

Field Name Data Type Default Value Description
first streams:Node?
last streams:Node?
curr streams:Node?
size int 0
ascend boolean true
  • <LinkedList> isEmpty() returns (boolean)

    Return Type Description
    boolean
  • <LinkedList> resetToFront()

  • <LinkedList> resetToRear()

  • <LinkedList> hasNext() returns (boolean)

    Return Type Description
    boolean
  • <LinkedList> hasPrevious() returns (boolean)

    Return Type Description
    boolean
  • <LinkedList> next() returns (any)

    Return Type Description
    any
  • <LinkedList> previous() returns (any)

    Return Type Description
    any
  • <LinkedList> removeCurrent()

  • <LinkedList> getSize() returns (int)

    Return Type Description
    int
  • <LinkedList> clear()

  • <LinkedList> removeFirstOccurrence(any elem) returns (boolean)

    Parameter Name Data Type Default Value Description
    elem any
    Return Type Description
    boolean
  • <LinkedList> remove(any elem) returns (boolean)

    Parameter Name Data Type Default Value Description
    elem any
    Return Type Description
    boolean
  • <LinkedList> getFirst() returns (any)

    Return Type Description
    any
  • <LinkedList> getLast() returns (any)

    Return Type Description
    any
  • <LinkedList> addFirst(any data)

    Parameter Name Data Type Default Value Description
    data any
  • <LinkedList> addLast(any data)

    Parameter Name Data Type Default Value Description
    data any
  • <LinkedList> removeFirst() returns (any)

    Return Type Description
    any
  • <LinkedList> removeLast() returns (any)

    Return Type Description
    any
  • <LinkedList> insertBeforeCurrent(any data)

    Parameter Name Data Type Default Value Description
    data any
  • <LinkedList> dequeue() returns (any)

    Return Type Description
    any
  • <LinkedList> asArray() returns (any[])

    Return Type Description
    any[]

public type Max object

Field Name Data Type Default Value Description
iMaxQueue streams:LinkedList BLangTypeInit: new null ([])
fMaxQueue streams:LinkedList BLangTypeInit: new null ([])
iMax int? null
fMax float? null
  • <Max> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Parameter Name Data Type Default Value Description
    value anydata
    eventType CURRENT|EXPIRED|ALL|RESET|TIMER
    Return Type Description
    anydata
  • <Max> copy() returns (Aggregator)

    Return Type Description
    Aggregator

public type MaxForever object

Field Name Data Type Default Value Description
iMax int? null
fMax float? null
  • <MaxForever> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Parameter Name Data Type Default Value Description
    value anydata
    eventType CURRENT|EXPIRED|ALL|RESET|TIMER
    Return Type Description
    anydata
  • <MaxForever> copy() returns (Aggregator)

    Return Type Description
    Aggregator

public type MergeSort object

Field Name Data Type Default Value Description
fieldFuncs function (map) returns (anydata)[]
sortTypes string[]
  • <MergeSort> __init(function (map<anydata>) returns (anydata)[] fieldFuncs, string[] sortTypes)

    Parameter Name Data Type Default Value Description
    fieldFuncs function (map) returns (anydata)[]
    sortTypes string[]
  • <MergeSort> topDownMergeSort(streams:StreamEvent[] a)

    Parameter Name Data Type Default Value Description
    a streams:StreamEvent[]

public type Min object

Field Name Data Type Default Value Description
iMinQueue streams:LinkedList BLangTypeInit: new null ([])
fMinQueue streams:LinkedList BLangTypeInit: new null ([])
iMin int? null
fMin float? null
  • <Min> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Parameter Name Data Type Default Value Description
    value anydata
    eventType CURRENT|EXPIRED|ALL|RESET|TIMER
    Return Type Description
    anydata
  • <Min> copy() returns (Aggregator)

    Return Type Description
    Aggregator

public type MinForever object

Field Name Data Type Default Value Description
iMin int? null
fMin float? null
  • <MinForever> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Parameter Name Data Type Default Value Description
    value anydata
    eventType CURRENT|EXPIRED|ALL|RESET|TIMER
    Return Type Description
    anydata
  • <MinForever> copy() returns (Aggregator)

    Return Type Description
    Aggregator

public type Node object

Field Name Data Type Default Value Description
data any
next streams:Node?
prev streams:Node?

public type OrderBy object

Field Name Data Type Default Value Description
nextProcessorPointer function (streams:StreamEvent[]) returns (())
fieldFuncs function (map) returns (anydata)[]
sortTypes string[]
mergeSort streams:MergeSort
  • <OrderBy> __init(function (streams:StreamEvent[]) returns (()) nextProcessorPointer, function (map<anydata>) returns (anydata)[] fieldFuncs, string[] sortTypes)

    Parameter Name Data Type Default Value Description
    nextProcessorPointer function (streams:StreamEvent[]) returns (())
    fieldFuncs function (map) returns (anydata)[]
    sortTypes string[]
  • <OrderBy> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]

public type OutputProcess object

  • <OutputProcess> __init(function (map<anydata>[]) returns (()) outputFunc)

    Parameter Name Data Type Default Value Description
    outputFunc function (map[]) returns (())
  • <OutputProcess> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]

public type Scheduler object

  • <Scheduler> __init(function (streams:StreamEvent[]) returns (()) processFunc)

    Parameter Name Data Type Default Value Description
    processFunc function (streams:StreamEvent[]) returns (())
  • <Scheduler> notifyAt(int timestamp)

    Parameter Name Data Type Default Value Description
    timestamp int
  • <Scheduler> schedule(int timestamp)

    Parameter Name Data Type Default Value Description
    timestamp int
  • <Scheduler> wrapperFunc()

  • <Scheduler> sendTimerEvents() returns (error?<>)

    Return Type Description
    error?<>

public type Select object

  • <Select> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <Select> getGroupByKey(function (streams:StreamEvent) returns (anydata)[]? groupbyFunctionArray, streams:StreamEvent e) returns (string)

    Parameter Name Data Type Default Value Description
    groupbyFunctionArray function (streams:StreamEvent) returns (anydata)[]?
    e streams:StreamEvent
    Return Type Description
    string

public type SortWindow object

Field Name Data Type Default Value Description
lengthToKeep int
windowParameters any[]
sortedWindow streams:LinkedList
sortMetadata string[]
fields string[]
sortTypes string[]
nextProcessPointer function (streams:StreamEvent[]) returns (())?
fieldFuncs function (map) returns (anydata)[]
mergeSort streams:MergeSort
  • <SortWindow> __init(function (streams:StreamEvent[]) returns (())? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    windowParameters any[]
  • <SortWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <SortWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <SortWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]

public type StdDev object

Field Name Data Type Default Value Description
mean float 0.0
stdDeviation float 0.0
sumValue float 0.0
count int 0
  • <StdDev> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Parameter Name Data Type Default Value Description
    value anydata
    eventType CURRENT|EXPIRED|ALL|RESET|TIMER
    Return Type Description
    anydata
  • <StdDev> copy() returns (Aggregator)

    Return Type Description
    Aggregator

public type StreamEvent object

Field Name Data Type Default Value Description
eventType CURRENT|EXPIRED|ALL|RESET|TIMER
timestamp int
data map {}
  • <StreamEvent> __init((string,map<anydata>)|map<anydata> eventData, CURRENT|EXPIRED|ALL|RESET|TIMER eventType, int timestamp)

    Parameter Name Data Type Default Value Description
    eventData (string,map)|map
    eventType CURRENT|EXPIRED|ALL|RESET|TIMER
    timestamp int
  • <StreamEvent> copy() returns (StreamEvent)

    Return Type Description
    StreamEvent
  • <StreamEvent> addData(map<anydata> eventData)

    Parameter Name Data Type Default Value Description
    eventData map
  • <StreamEvent> addAttribute(string key, anydata val)

    Parameter Name Data Type Default Value Description
    key string
    val anydata

public type StreamJoinProcessor object

Field Name Data Type Default Value Description
lhsWindow streams:Window?
rhsWindow streams:Window?
lhsStream string?
rhsStream string?
unidirectionalStream string?
joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN
lockField int 0
  • <StreamJoinProcessor> __init(function (streams:StreamEvent[]) returns (()) nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (map<anydata>,map<anydata>) returns (boolean)? onConditionFunc)

    Parameter Name Data Type Default Value Description
    nextProcessor function (streams:StreamEvent[]) returns (())
    joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN
    onConditionFunc function (map,map) returns (boolean)?
  • <StreamJoinProcessor> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <StreamJoinProcessor> setLHS(string streamName, streams:Window windowInstance)

    Parameter Name Data Type Default Value Description
    streamName string
    windowInstance streams:Window
  • <StreamJoinProcessor> setRHS(string streamName, streams:Window windowInstance)

    Parameter Name Data Type Default Value Description
    streamName string
    windowInstance streams:Window
  • <StreamJoinProcessor> setUnidirectionalStream(string streamName)

    Parameter Name Data Type Default Value Description
    streamName string

public type Sum object

Field Name Data Type Default Value Description
iSum int 0
fSum float 0.0
  • <Sum> process(anydata value, CURRENT|EXPIRED|ALL|RESET|TIMER eventType) returns (anydata)

    Parameter Name Data Type Default Value Description
    value anydata
    eventType CURRENT|EXPIRED|ALL|RESET|TIMER
    Return Type Description
    anydata
  • <Sum> copy() returns (Aggregator)

    Return Type Description
    Aggregator

public type TableJoinProcessor object

Field Name Data Type Default Value Description
windowInstance streams:Window?
streamName string
tableName string
joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN
lockField int 0
  • <TableJoinProcessor> __init(function (streams:StreamEvent[]) returns (()) nextProcessor, JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN joinType, function (streams:StreamEvent) returns (map<anydata>[]) tableQuery)

    Parameter Name Data Type Default Value Description
    nextProcessor function (streams:StreamEvent[]) returns (())
    joinType JOIN|LEFTOUTERJOIN|RIGHTOUTERJOIN|FULLOUTERJOIN
    tableQuery function (streams:StreamEvent) returns (map[])
  • <TableJoinProcessor> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <TableJoinProcessor> setJoinProperties(string tn, string sn, streams:Window wi)

    Parameter Name Data Type Default Value Description
    tn string
    sn string
    wi streams:Window

public type TimeAccumulatingWindow object

Field Name Data Type Default Value Description
timeInMillis int
windowParameters any[]
expiredEventQueue streams:LinkedList
nextProcessPointer function (streams:StreamEvent[]) returns (())?
lastTimestamp int -9223372036854775808
scheduler streams:Scheduler
  • <TimeAccumulatingWindow> __init(function (streams:StreamEvent[]) returns (())? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    windowParameters any[]
  • <TimeAccumulatingWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <TimeAccumulatingWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <TimeAccumulatingWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]

public type TimeBatchWindow object

Field Name Data Type Default Value Description
timeInMilliSeconds int
windowParameters any[]
nextEmitTime int -1
currentEventQueue streams:LinkedList
expiredEventQueue streams:LinkedList?
resetEvent streams:StreamEvent?
timer task:Timer?
nextProcessPointer function (streams:StreamEvent[]) returns (())?
  • <TimeBatchWindow> __init(function (streams:StreamEvent[]) returns (())? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    windowParameters any[]
  • <TimeBatchWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <TimeBatchWindow> invokeProcess() returns (error?<>)

    Return Type Description
    error?<>
  • <TimeBatchWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <TimeBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]
  • <TimeBatchWindow> handleError(error e)

    Parameter Name Data Type Default Value Description
    e error

public type TimeLengthWindow object

Field Name Data Type Default Value Description
timeInMilliSeconds int
length int
windowParameters any[]
count int 0
expiredEventChunk streams:LinkedList
nextProcessPointer function (streams:StreamEvent[]) returns (())?
scheduler streams:Scheduler
  • <TimeLengthWindow> __init(function (streams:StreamEvent[]) returns (())? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    windowParameters any[]
  • <TimeLengthWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <TimeLengthWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <TimeLengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]

public type TimeOrderWindow object

Field Name Data Type Default Value Description
timeInMillis int
windowParameters any[]
expiredEventQueue streams:LinkedList
nextProcessPointer function (streams:StreamEvent[]) returns (())?
timestamp string
dropOlderEvents boolean
mergeSort streams:MergeSort
  • <TimeOrderWindow> __init(function (streams:StreamEvent[]) returns (())? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    windowParameters any[]
  • <TimeOrderWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <TimeOrderWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <TimeOrderWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]
  • <TimeOrderWindow> getTimestamp(any val) returns (int)

    Parameter Name Data Type Default Value Description
    val any
    Return Type Description
    int

public type TimeWindow object

Field Name Data Type Default Value Description
timeInMillis int
windowParameters any[]
expiredEventQueue streams:LinkedList
timerQueue streams:LinkedList
nextProcessPointer function (streams:StreamEvent[]) returns (())?
lastTimestamp int -9223372036854775808
scheduler streams:Scheduler
  • <TimeWindow> __init(function (streams:StreamEvent[]) returns (())? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    windowParameters any[]
  • <TimeWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <TimeWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <TimeWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]

public type UniqueLengthWindow object

Field Name Data Type Default Value Description
uniqueKey string
length int
windowParameters any[]
count int 0
uniqueMap map
expiredEventChunk streams:LinkedList
nextProcessPointer function (streams:StreamEvent[]) returns (())?
  • <UniqueLengthWindow> __init(function (streams:StreamEvent[]) returns (())? nextProcessPointer, any[] windowParameters)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    windowParameters any[]
  • <UniqueLengthWindow> initParameters(any[] parameters)

    Parameter Name Data Type Default Value Description
    parameters any[]
  • <UniqueLengthWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <UniqueLengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]

public type Window object

  • <Window> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <Window> getCandidateEvents(streams:StreamEvent originEvent, function (map<anydata>,map<anydata>) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]