ballerina/streams module

Type Definitions

Type Values Description
EventType TIMER | RESET | EXPIRED | CURRENT | ALL
JoinType RIGHTOUTERJOIN | LEFTOUTERJOIN | JOIN | FULLOUTERJOIN

Objects Summary

Object Description
Aggregator
Average
Count
DelayWindow
DistinctCount
ExternalTimeBatchWindow
ExternalTimeWindow
Filter
GroupBy
LengthBatchWindow
LengthWindow
LinkedList
Max
MaxForever
Min
MinForever
Node
OrderBy
OutputProcess
Select
SimpleSelect
StdDev
StreamEvent
StreamJoinProcessor
Sum
TableJoinProcessor
TimeBatchWindow
TimeLengthWindow
TimeWindow
UniqueLengthWindow
Window

Functions Summary

Return Type Function and Description
Aggregator avg()
StreamEvent[] buildStreamEvent(map keyVals, string streamName)
Aggregator count()
Filter createFilter(function (streams:StreamEvent[]) returns (()) nextProcPointer, function (map) returns (boolean) conditionFunc)
GroupBy createGroupBy(function (streams:StreamEvent[]) returns (()) nextProcPointer, string[] groupByFields)
OrderBy createOrderBy(function (streams:StreamEvent[]) returns (()) nextProcessorPointer, function (map) returns (any)[] fields, string[] sortFieldMetadata)
OutputProcess createOutputProcess(function (map[]) returns (()) outputFunc)
StreamEvent createResetStreamEvent(streams:StreamEvent event)
Select createSelect(function (streams:StreamEvent[]) returns (()) nextProcPointer, streams:Aggregator[] aggregatorArr, function (streams:StreamEvent) returns (string)? groupbyFunc, function (streams:StreamEvent,streams:Aggregator[]) returns (map) selectFunc)
SimpleSelect createSimpleSelect(function (streams:StreamEvent[]) returns (()) nextProcPointer, function (streams:StreamEvent) returns (map) selectFunc)
StreamJoinProcessor createStreamJoinProcessor(function (any) returns (()) nextProcessor, RIGHTOUTERJOIN|FULLOUTERJOIN|LEFTOUTERJOIN|JOIN joinType, function (map,map) returns (boolean)? conditionFunc)
TableJoinProcessor createTableJoinProcessor(function (any) returns (()) nextProcessor, RIGHTOUTERJOIN|FULLOUTERJOIN|LEFTOUTERJOIN|JOIN joinType, function (streams:StreamEvent) returns (map[]) tableQuery)
Window delayWindow(int delayInMilliSeconds, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Aggregator distinctCount()
Window externalTimeBatchWindow(string timestamp, int time, int startTime, int timeOut, boolean replaceTimestampWithBatchEndTime, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Window externalTimeWindow(string timeStamp, int timeLength, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Window lengthBatchWindow(int length, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Window lengthWindow(int length, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Aggregator max()
Aggregator maxForever()
Aggregator min()
Aggregator minForever()
Aggregator stdDev()
Aggregator sum()
Window timeBatchWindow(int time, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Window timeLengthWindow(int timeLength, int length, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Window timeWindow(int timeLength, function (streams:StreamEvent[]) returns (())? nextProcessPointer)
Window uniqueLengthWindow(string uniqueKey, int length, function (streams:StreamEvent[]) returns (())? nextProcessPointer)

Global Variables

Name Data Type Description
ASCENDING string
CURRENT EventType
DEFAULT string
DELIMITER string
DELIMITER_REGEX string
DESCENDING string
EXPIRED EventType
OUTPUT string
RESET EventType
TIMER EventType

public function avg() returns (Aggregator)

Return Type Description
Aggregator

public function buildStreamEvent(map keyVals, string streamName) returns (StreamEvent[])

Parameter Name Data Type Default Value Description
keyVals map
streamName string
Return Type Description
StreamEvent[]

public function count() returns (Aggregator)

Return Type Description
Aggregator

public function createFilter(function (streams:StreamEvent[]) returns (()) nextProcPointer, function (map) returns (boolean) conditionFunc) returns (Filter)

Parameter Name Data Type Default Value Description
nextProcPointer function (streams:StreamEvent[]) returns (())
conditionFunc function (map) returns (boolean)
Return Type Description
Filter

public function createGroupBy(function (streams:StreamEvent[]) returns (()) nextProcPointer, string[] groupByFields) returns (GroupBy)

Parameter Name Data Type Default Value Description
nextProcPointer function (streams:StreamEvent[]) returns (())
groupByFields string[]
Return Type Description
GroupBy

public function createOrderBy(function (streams:StreamEvent[]) returns (()) nextProcessorPointer, function (map) returns (any)[] fields, string[] sortFieldMetadata) returns (OrderBy)

Parameter Name Data Type Default Value Description
nextProcessorPointer function (streams:StreamEvent[]) returns (())
fields function (map) returns (any)[]
sortFieldMetadata string[]
Return Type Description
OrderBy

public function createOutputProcess(function (map[]) returns (()) outputFunc) returns (OutputProcess)

Parameter Name Data Type Default Value Description
outputFunc function (map[]) returns (())
Return Type Description
OutputProcess

public function createResetStreamEvent(streams:StreamEvent event) returns (StreamEvent)

Parameter Name Data Type Default Value Description
event streams:StreamEvent
Return Type Description
StreamEvent

public function createSelect(function (streams:StreamEvent[]) returns (()) nextProcPointer, streams:Aggregator[] aggregatorArr, function (streams:StreamEvent) returns (string)? groupbyFunc, function (streams:StreamEvent,streams:Aggregator[]) returns (map) selectFunc) returns (Select)

Parameter Name Data Type Default Value Description
nextProcPointer function (streams:StreamEvent[]) returns (())
aggregatorArr streams:Aggregator[]
groupbyFunc function (streams:StreamEvent) returns (string)?
selectFunc function (streams:StreamEvent,streams:Aggregator[]) returns (map)
Return Type Description
Select

public function createSimpleSelect(function (streams:StreamEvent[]) returns (()) nextProcPointer, function (streams:StreamEvent) returns (map) selectFunc) returns (SimpleSelect)

Parameter Name Data Type Default Value Description
nextProcPointer function (streams:StreamEvent[]) returns (())
selectFunc function (streams:StreamEvent) returns (map)
Return Type Description
SimpleSelect

public function createStreamJoinProcessor(function (any) returns (()) nextProcessor, RIGHTOUTERJOIN|FULLOUTERJOIN|LEFTOUTERJOIN|JOIN joinType, function (map,map) returns (boolean)? conditionFunc) returns (StreamJoinProcessor)

Parameter Name Data Type Default Value Description
nextProcessor function (any) returns (())
joinType RIGHTOUTERJOIN|FULLOUTERJOIN|LEFTOUTERJOIN|JOIN
conditionFunc function (map,map) returns (boolean)? null
Return Type Description
StreamJoinProcessor

public function createTableJoinProcessor(function (any) returns (()) nextProcessor, RIGHTOUTERJOIN|FULLOUTERJOIN|LEFTOUTERJOIN|JOIN joinType, function (streams:StreamEvent) returns (map[]) tableQuery) returns (TableJoinProcessor)

Parameter Name Data Type Default Value Description
nextProcessor function (any) returns (())
joinType RIGHTOUTERJOIN|FULLOUTERJOIN|LEFTOUTERJOIN|JOIN
tableQuery function (streams:StreamEvent) returns (map[])
Return Type Description
TableJoinProcessor

public function delayWindow(int delayInMilliSeconds, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
delayInMilliSeconds int
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function distinctCount() returns (Aggregator)

Return Type Description
Aggregator

public function externalTimeBatchWindow(string timestamp, int time, int startTime, int timeOut, boolean replaceTimestampWithBatchEndTime, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
timestamp string
time int
startTime int -1
timeOut int -1
replaceTimestampWithBatchEndTime boolean false
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function externalTimeWindow(string timeStamp, int timeLength, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
timeStamp string
timeLength int
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function lengthBatchWindow(int length, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
length int
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function lengthWindow(int length, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
length int
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function max() returns (Aggregator)

Return Type Description
Aggregator

public function maxForever() returns (Aggregator)

Return Type Description
Aggregator

public function min() returns (Aggregator)

Return Type Description
Aggregator

public function minForever() returns (Aggregator)

Return Type Description
Aggregator

public function stdDev() returns (Aggregator)

Return Type Description
Aggregator

public function sum() returns (Aggregator)

Return Type Description
Aggregator

public function timeBatchWindow(int time, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
time int
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function timeLengthWindow(int timeLength, int length, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
timeLength int
length int
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function timeWindow(int timeLength, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
timeLength int
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public function uniqueLengthWindow(string uniqueKey, int length, function (streams:StreamEvent[]) returns (())? nextProcessPointer) returns (Window)

Parameter Name Data Type Default Value Description
uniqueKey string
length int
nextProcessPointer function (streams:StreamEvent[]) returns (())? null
Return Type Description
Window

public type Aggregator object

  • <Aggregator> clone() returns (Aggregator)

    Return Type Description
    Aggregator
  • <Aggregator> process(any value, TIMER|ALL|CURRENT|RESET|EXPIRED eventType) returns (any)

    Parameter Name Data Type Default Value Description
    value any
    eventType TIMER|ALL|CURRENT|RESET|EXPIRED
    Return Type Description
    any

public type Average object

Field Name Data Type Default Value Description
count int 0
sum float 0.0
  • <Average> process(any value, TIMER|ALL|CURRENT|RESET|EXPIRED eventType) returns (any)

    Parameter Name Data Type Default Value Description
    value any
    eventType TIMER|ALL|CURRENT|RESET|EXPIRED
    Return Type Description
    any
  • <Average> clone() returns (Aggregator)

    Return Type Description
    Aggregator

public type Count object

Field Name Data Type Default Value Description
count int 0
  • <Count> process(any value, TIMER|ALL|CURRENT|RESET|EXPIRED eventType) returns (any)

    Parameter Name Data Type Default Value Description
    value any
    eventType TIMER|ALL|CURRENT|RESET|EXPIRED
    Return Type Description
    any
  • <Count> clone() returns (Aggregator)

    Return Type Description
    Aggregator

public type DelayWindow object

Field Name Data Type Default Value Description
delayInMilliSeconds int
delayedEventQueue streams:LinkedList
lastTimestamp int 0
timer task:Timer?
nextProcessPointer function (streams:StreamEvent[]) returns (())?
  • <DelayWindow> new(function (streams:StreamEvent[]) returns (())? nextProcessPointer, int delayInMilliSeconds)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    delayInMilliSeconds int
  • <DelayWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <DelayWindow> invokeProcess() returns (error)

    Return Type Description
    error
  • <DelayWindow> handleError(error e)

    Parameter Name Data Type Default Value Description
    e error
  • <DelayWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map,map) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]

public type DistinctCount object

Field Name Data Type Default Value Description
distinctValues map
  • <DistinctCount> process(any value, TIMER|ALL|CURRENT|RESET|EXPIRED eventType) returns (any)

    Parameter Name Data Type Default Value Description
    value any
    eventType TIMER|ALL|CURRENT|RESET|EXPIRED
    Return Type Description
    any
  • <DistinctCount> clone() returns (Aggregator)

    Return Type Description
    Aggregator

public type ExternalTimeBatchWindow object

Field Name Data Type Default Value Description
timeToKeep int
currentEventChunk streams:LinkedList
expiredEventChunk streams:LinkedList
resetEvent streams:StreamEvent? null
startTime int 0
isStartTimeEnabled boolean false
replaceTimestampWithBatchEndTime boolean false
flushed boolean false
endTime int -1
schedulerTimeout int 0
lastScheduledTime int
lastCurrentEventTime int 0
timer task:Timer?
nextProcessPointer function (streams:StreamEvent[]) returns (())?
timeStamp string
storeExpiredEvents boolean false
outputExpectsExpiredEvents boolean false
  • <ExternalTimeBatchWindow> new(function (streams:StreamEvent[]) returns (())? nextProcessPointer, int timeToKeep, string timeStamp, int startTime, int schedulerTimeout, boolean replaceTimestampWithBatchEndTime)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    timeToKeep int
    timeStamp string
    startTime int
    schedulerTimeout int
    replaceTimestampWithBatchEndTime boolean
  • <ExternalTimeBatchWindow> invokeProcess() returns (error)

    Return Type Description
    error
  • <ExternalTimeBatchWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <ExternalTimeBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map,map) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]
  • <ExternalTimeBatchWindow> handleError(error e)

    Parameter Name Data Type Default Value Description
    e error
  • <ExternalTimeBatchWindow> cloneAppend(streams:StreamEvent currStreamEvent)

    Parameter Name Data Type Default Value Description
    currStreamEvent streams:StreamEvent
  • <ExternalTimeBatchWindow> flushToOutputChunk(streams:LinkedList complexEventChunks, int currentTime, boolean preserveCurrentEvents)

    Parameter Name Data Type Default Value Description
    complexEventChunks streams:LinkedList
    currentTime int
    preserveCurrentEvents boolean
  • <ExternalTimeBatchWindow> appendToOutputChunk(streams:LinkedList complexEventChunks, int currentTime, boolean preserveCurrentEvents)

    Parameter Name Data Type Default Value Description
    complexEventChunks streams:LinkedList
    currentTime int
    preserveCurrentEvents boolean
  • <ExternalTimeBatchWindow> findEndTime(int currentTime, int startTime_, int timeToKeep_) returns (int)

    Parameter Name Data Type Default Value Description
    currentTime int
    startTime_ int
    timeToKeep_ int
    Return Type Description
    int
  • <ExternalTimeBatchWindow> initTiming(streams:StreamEvent firstStreamEvent)

    Parameter Name Data Type Default Value Description
    firstStreamEvent streams:StreamEvent
  • <ExternalTimeBatchWindow> getTimestamp(any val) returns (int)

    Parameter Name Data Type Default Value Description
    val any
    Return Type Description
    int

public type ExternalTimeWindow object

Field Name Data Type Default Value Description
timeInMillis int
expiredEventQueue streams:LinkedList
nextProcessPointer function (streams:StreamEvent[]) returns (())?
timeStamp string
  • <ExternalTimeWindow> new(function (streams:StreamEvent[]) returns (())? nextProcessPointer, int timeInMillis, string timeStamp)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    timeInMillis int
    timeStamp string
  • <ExternalTimeWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <ExternalTimeWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map,map) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]
  • <ExternalTimeWindow> getTimestamp(any val) returns (int)

    Parameter Name Data Type Default Value Description
    val any
    Return Type Description
    int

public type Filter object

  • <Filter> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]

public type GroupBy object

Field Name Data Type Default Value Description
nextProcessorPointer function (streams:StreamEvent[]) returns (())
groupByFields string[]
groupedStreamEvents map
  • <GroupBy> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]

public type LengthBatchWindow object

Field Name Data Type Default Value Description
length int
count int
resetEvent streams:StreamEvent?
currentEventQueue streams:LinkedList
expiredEventQueue streams:LinkedList?
nextProcessPointer function (streams:StreamEvent[]) returns (())?
  • <LengthBatchWindow> new(function (streams:StreamEvent[]) returns (())? nextProcessPointer, int length)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    length int
  • <LengthBatchWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <LengthBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map,map) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]

public type LengthWindow object

Field Name Data Type Default Value Description
size int
linkedList streams:LinkedList
nextProcessPointer function (streams:StreamEvent[]) returns (())?
  • <LengthWindow> new(function (streams:StreamEvent[]) returns (())? nextProcessPointer, int size)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    size int
  • <LengthWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <LengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map,map) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]

public type LinkedList object

Field Name Data Type Default Value Description
first streams:Node?
last streams:Node?
curr streams:Node?
size int 0
ascend boolean true
  • <LinkedList> isEmpty() returns (boolean)

    Return Type Description
    boolean
  • <LinkedList> resetToFront()

  • <LinkedList> resetToRear()

  • <LinkedList> hasNext() returns (boolean)

    Return Type Description
    boolean
  • <LinkedList> hasPrevious() returns (boolean)

    Return Type Description
    boolean
  • <LinkedList> next() returns (any)

    Return Type Description
    any
  • <LinkedList> previous() returns (any)

    Return Type Description
    any
  • <LinkedList> removeCurrent()

  • <LinkedList> getSize() returns (int)

    Return Type Description
    int
  • <LinkedList> clear()

  • <LinkedList> removeFirstOccurrence(any elem) returns (boolean)

    Parameter Name Data Type Default Value Description
    elem any
    Return Type Description
    boolean
  • <LinkedList> remove(any elem) returns (boolean)

    Parameter Name Data Type Default Value Description
    elem any
    Return Type Description
    boolean
  • <LinkedList> getFirst() returns (any)

    Return Type Description
    any
  • <LinkedList> getLast() returns (any)

    Return Type Description
    any
  • <LinkedList> addFirst(any data)

    Parameter Name Data Type Default Value Description
    data any
  • <LinkedList> addLast(any data)

    Parameter Name Data Type Default Value Description
    data any
  • <LinkedList> removeFirst() returns (any)

    Return Type Description
    any
  • <LinkedList> removeLast() returns (any)

    Return Type Description
    any
  • <LinkedList> insertBeforeCurrent(any data)

    Parameter Name Data Type Default Value Description
    data any
  • <LinkedList> dequeue() returns (any)

    Return Type Description
    any
  • <LinkedList> asArray() returns (any[])

    Return Type Description
    any[]

public type Max object

Field Name Data Type Default Value Description
iMaxQueue streams:LinkedList
fMaxQueue streams:LinkedList
iMax int? null
fMax float? null
  • <Max> process(any value, TIMER|ALL|CURRENT|RESET|EXPIRED eventType) returns (any)

    Parameter Name Data Type Default Value Description
    value any
    eventType TIMER|ALL|CURRENT|RESET|EXPIRED
    Return Type Description
    any
  • <Max> clone() returns (Aggregator)

    Return Type Description
    Aggregator

public type MaxForever object

Field Name Data Type Default Value Description
iMax int? null
fMax float? null
  • <MaxForever> process(any value, TIMER|ALL|CURRENT|RESET|EXPIRED eventType) returns (any)

    Parameter Name Data Type Default Value Description
    value any
    eventType TIMER|ALL|CURRENT|RESET|EXPIRED
    Return Type Description
    any
  • <MaxForever> clone() returns (Aggregator)

    Return Type Description
    Aggregator

public type Min object

Field Name Data Type Default Value Description
iMinQueue streams:LinkedList
fMinQueue streams:LinkedList
iMin int? null
fMin float? null
  • <Min> process(any value, TIMER|ALL|CURRENT|RESET|EXPIRED eventType) returns (any)

    Parameter Name Data Type Default Value Description
    value any
    eventType TIMER|ALL|CURRENT|RESET|EXPIRED
    Return Type Description
    any
  • <Min> clone() returns (Aggregator)

    Return Type Description
    Aggregator

public type MinForever object

Field Name Data Type Default Value Description
iMin int? null
fMin float? null
  • <MinForever> process(any value, TIMER|ALL|CURRENT|RESET|EXPIRED eventType) returns (any)

    Parameter Name Data Type Default Value Description
    value any
    eventType TIMER|ALL|CURRENT|RESET|EXPIRED
    Return Type Description
    any
  • <MinForever> clone() returns (Aggregator)

    Return Type Description
    Aggregator

public type Node object

Field Name Data Type Default Value Description
data any
next streams:Node?
prev streams:Node?

public type OrderBy object

Field Name Data Type Default Value Description
nextProcessorPointer function (streams:StreamEvent[]) returns (())
fieldFuncs function (map) returns (any)[]
sortTypes string[]
  • <OrderBy> new(function (streams:StreamEvent[]) returns (()) nextProcessorPointer, function (map) returns (any)[] fieldFuncs, string[] sortTypes)

    Parameter Name Data Type Default Value Description
    nextProcessorPointer function (streams:StreamEvent[]) returns (())
    fieldFuncs function (map) returns (any)[]
    sortTypes string[]
  • <OrderBy> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]

public type OutputProcess object

  • <OutputProcess> new(function (map[]) returns (()) outputFunc)

    Parameter Name Data Type Default Value Description
    outputFunc function (map[]) returns (())
  • <OutputProcess> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]

public type Select object

  • <Select> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]

public type SimpleSelect object

  • <SimpleSelect> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]

public type StdDev object

Field Name Data Type Default Value Description
mean float 0.0
stdDeviation float 0.0
sumValue float 0.0
count int 0
  • <StdDev> process(any value, TIMER|ALL|CURRENT|RESET|EXPIRED eventType) returns (any)

    Parameter Name Data Type Default Value Description
    value any
    eventType TIMER|ALL|CURRENT|RESET|EXPIRED
    Return Type Description
    any
  • <StdDev> clone() returns (Aggregator)

    Return Type Description
    Aggregator

public type StreamEvent object

Field Name Data Type Default Value Description
eventType TIMER|ALL|CURRENT|RESET|EXPIRED
timestamp int
data map
  • <StreamEvent> new((string,map)|map eventData, TIMER|ALL|CURRENT|RESET|EXPIRED eventType, int timestamp)

    Parameter Name Data Type Default Value Description
    eventData (string,map)|map
    eventType TIMER|ALL|CURRENT|RESET|EXPIRED
    timestamp int
  • <StreamEvent> clone() returns (StreamEvent)

    Return Type Description
    StreamEvent
  • <StreamEvent> addData(map eventData)

    Parameter Name Data Type Default Value Description
    eventData map

public type StreamJoinProcessor object

Field Name Data Type Default Value Description
lhsWindow streams:Window?
rhsWindow streams:Window?
lhsStream string?
rhsStream string?
unidirectionalStream string?
joinType RIGHTOUTERJOIN|FULLOUTERJOIN|LEFTOUTERJOIN|JOIN
  • <StreamJoinProcessor> new(function (any) returns (()) nextProcessor, RIGHTOUTERJOIN|FULLOUTERJOIN|LEFTOUTERJOIN|JOIN joinType, function (map,map) returns (boolean)? onConditionFunc)

    Parameter Name Data Type Default Value Description
    nextProcessor function (any) returns (())
    joinType RIGHTOUTERJOIN|FULLOUTERJOIN|LEFTOUTERJOIN|JOIN
    onConditionFunc function (map,map) returns (boolean)?
  • <StreamJoinProcessor> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <StreamJoinProcessor> setLHS(string streamName, streams:Window windowInstance)

    Parameter Name Data Type Default Value Description
    streamName string
    windowInstance streams:Window
  • <StreamJoinProcessor> setRHS(string streamName, streams:Window windowInstance)

    Parameter Name Data Type Default Value Description
    streamName string
    windowInstance streams:Window
  • <StreamJoinProcessor> setUnidirectionalStream(string streamName)

    Parameter Name Data Type Default Value Description
    streamName string

public type Sum object

Field Name Data Type Default Value Description
iSum int 0
fSum float 0.0
  • <Sum> process(any value, TIMER|ALL|CURRENT|RESET|EXPIRED eventType) returns (any)

    Parameter Name Data Type Default Value Description
    value any
    eventType TIMER|ALL|CURRENT|RESET|EXPIRED
    Return Type Description
    any
  • <Sum> clone() returns (Aggregator)

    Return Type Description
    Aggregator

public type TableJoinProcessor object

Field Name Data Type Default Value Description
windowInstance streams:Window?
streamName string
tableName string
joinType RIGHTOUTERJOIN|FULLOUTERJOIN|LEFTOUTERJOIN|JOIN
  • <TableJoinProcessor> new(function (any) returns (()) nextProcessor, RIGHTOUTERJOIN|FULLOUTERJOIN|LEFTOUTERJOIN|JOIN joinType, function (streams:StreamEvent) returns (map[]) tableQuery)

    Parameter Name Data Type Default Value Description
    nextProcessor function (any) returns (())
    joinType RIGHTOUTERJOIN|FULLOUTERJOIN|LEFTOUTERJOIN|JOIN
    tableQuery function (streams:StreamEvent) returns (map[])
  • <TableJoinProcessor> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <TableJoinProcessor> setJoinProperties(string tn, string sn, streams:Window wi)

    Parameter Name Data Type Default Value Description
    tn string
    sn string
    wi streams:Window

public type TimeBatchWindow object

Field Name Data Type Default Value Description
timeInMilliSeconds int
nextEmitTime int -1
currentEventQueue streams:LinkedList
expiredEventQueue streams:LinkedList?
resetEvent streams:StreamEvent?
timer task:Timer?
nextProcessPointer function (streams:StreamEvent[]) returns (())?
  • <TimeBatchWindow> new(function (streams:StreamEvent[]) returns (())? nextProcessPointer, int timeInMilliSeconds)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    timeInMilliSeconds int
  • <TimeBatchWindow> invokeProcess() returns (error)

    Return Type Description
    error
  • <TimeBatchWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <TimeBatchWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map,map) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]
  • <TimeBatchWindow> handleError(error e)

    Parameter Name Data Type Default Value Description
    e error

public type TimeLengthWindow object

Field Name Data Type Default Value Description
timeInMilliSeconds int
length int
count int 0
expiredEventChunk streams:LinkedList
nextProcessPointer function (streams:StreamEvent[]) returns (())?
timer task:Timer?
  • <TimeLengthWindow> new(function (streams:StreamEvent[]) returns (())? nextProcessPointer, int timeInMilliSeconds, int length)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    timeInMilliSeconds int
    length int
  • <TimeLengthWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <TimeLengthWindow> invokeProcess() returns (error)

    Return Type Description
    error
  • <TimeLengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map,map) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]
  • <TimeLengthWindow> handleError(error e)

    Parameter Name Data Type Default Value Description
    e error

public type TimeWindow object

Field Name Data Type Default Value Description
timeInMillis int
expiredEventQueue streams:LinkedList
timerQueue streams:LinkedList
nextProcessPointer function (streams:StreamEvent[]) returns (())?
lastTimestamp int -9223372036854775808
  • <TimeWindow> new(function (streams:StreamEvent[]) returns (())? nextProcessPointer, int timeInMillis)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    timeInMillis int
  • <TimeWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <TimeWindow> invokeProcess() returns (error)

    Return Type Description
    error
  • <TimeWindow> handleError(error e)

    Parameter Name Data Type Default Value Description
    e error
  • <TimeWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map,map) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]

public type UniqueLengthWindow object

Field Name Data Type Default Value Description
uniqueKey string
length int
count int 0
uniqueMap map
expiredEventChunk streams:LinkedList
nextProcessPointer function (streams:StreamEvent[]) returns (())?
  • <UniqueLengthWindow> new(function (streams:StreamEvent[]) returns (())? nextProcessPointer, string uniqueKey, int length)

    Parameter Name Data Type Default Value Description
    nextProcessPointer function (streams:StreamEvent[]) returns (())?
    uniqueKey string
    length int
  • <UniqueLengthWindow> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <UniqueLengthWindow> getCandidateEvents(streams:StreamEvent originEvent, function (map,map) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]

public type Window object

  • <Window> process(streams:StreamEvent[] streamEvents)

    Parameter Name Data Type Default Value Description
    streamEvents streams:StreamEvent[]
  • <Window> getCandidateEvents(streams:StreamEvent originEvent, function (map,map) returns (boolean)? conditionFunc, boolean isLHSTrigger) returns ((StreamEvent,StreamEvent)[])

    Parameter Name Data Type Default Value Description
    originEvent streams:StreamEvent
    conditionFunc function (map,map) returns (boolean)?
    isLHSTrigger boolean true
    Return Type Description
    (StreamEvent,StreamEvent)[]