public class PriorityStreamProcessor
extends io.siddhi.core.query.processor.stream.StreamProcessor<org.wso2.extension.siddhi.execution.priority.PriorityStreamProcessor.ExtensionState>
implements io.siddhi.core.query.processor.SchedulingProcessor
PriorityStreamProcessor injects two new attributes into the stream which are
- priorityKey (see the constant ATTRIBUTE_PRIORITY_KEY
)
- currentPriority (see the constant ATTRIBUTE_CURRENT_PRIORITY
)
When an event with new unique key arrives, PriorityStreamProcessor checks the priority and if the priority is 0 the event will be sent out without being stored internally. If the event has a priority greater than 0, it will be stored in the stream processor and the current priority will be injected into that event.
When an event with existing priority key arrives, it will be stored as the recent event and the priority will be increased by the priority of the received event, and the priorityKey and currentPriority will be injected into the event.
After every given timeout, priority of every events will be reduced by 1 and the updated priority, will be sent out with the last known attributes of those events. It will continue until their priority reduced to 0.
When an event with existing id and a large negative priority, the output will be 0 not a negative priority.
Modifier and Type | Field and Description |
---|---|
static String |
ATTRIBUTE_CURRENT_PRIORITY
Second attribute name injected by PriorityStreamProcessor into the output stream event.
|
static String |
ATTRIBUTE_PRIORITY_KEY
First attribute name injected by PriorityStreamProcessor into the output stream event.
|
Constructor and Description |
---|
PriorityStreamProcessor() |
Modifier and Type | Method and Description |
---|---|
io.siddhi.core.query.processor.ProcessingMode |
getProcessingMode() |
List<io.siddhi.query.api.definition.Attribute> |
getReturnAttributes() |
io.siddhi.core.util.Scheduler |
getScheduler() |
protected io.siddhi.core.util.snapshot.state.StateFactory<org.wso2.extension.siddhi.execution.priority.PriorityStreamProcessor.ExtensionState> |
init(io.siddhi.core.event.stream.MetaStreamEvent metaStreamEvent,
io.siddhi.query.api.definition.AbstractDefinition inputDefinition,
io.siddhi.core.executor.ExpressionExecutor[] attributeExpressionExecutors,
io.siddhi.core.util.config.ConfigReader configReader,
io.siddhi.core.event.stream.holder.StreamEventClonerHolder streamEventClonerHolder,
boolean outputExpectsExpiredEvents,
boolean findToBeExecuted,
io.siddhi.core.config.SiddhiQueryContext siddhiQueryContext)
Initialize the PriorityStreamProcessor.
|
protected void |
process(io.siddhi.core.event.ComplexEventChunk<io.siddhi.core.event.stream.StreamEvent> streamEventChunk,
io.siddhi.core.query.processor.Processor nextProcessor,
io.siddhi.core.event.stream.StreamEventCloner streamEventCloner,
io.siddhi.core.event.stream.populater.ComplexEventPopulater complexEventPopulater,
org.wso2.extension.siddhi.execution.priority.PriorityStreamProcessor.ExtensionState state)
Process events received by PriorityStreamProcessor.
|
void |
setScheduler(io.siddhi.core.util.Scheduler scheduler) |
void |
start() |
void |
stop() |
processEventChunk
constructStreamEventPopulater, getNextProcessor, initProcessor, process, setNextProcessor, setStreamEventCloner, setToLast
public static final String ATTRIBUTE_PRIORITY_KEY
public static final String ATTRIBUTE_CURRENT_PRIORITY
protected io.siddhi.core.util.snapshot.state.StateFactory<org.wso2.extension.siddhi.execution.priority.PriorityStreamProcessor.ExtensionState> init(io.siddhi.core.event.stream.MetaStreamEvent metaStreamEvent, io.siddhi.query.api.definition.AbstractDefinition inputDefinition, io.siddhi.core.executor.ExpressionExecutor[] attributeExpressionExecutors, io.siddhi.core.util.config.ConfigReader configReader, io.siddhi.core.event.stream.holder.StreamEventClonerHolder streamEventClonerHolder, boolean outputExpectsExpiredEvents, boolean findToBeExecuted, io.siddhi.core.config.SiddhiQueryContext siddhiQueryContext)
init
in class io.siddhi.core.query.processor.stream.AbstractStreamProcessor<org.wso2.extension.siddhi.execution.priority.PriorityStreamProcessor.ExtensionState>
inputDefinition
- Input DefinitionattributeExpressionExecutors
- Array of AttributeExpressionExecutorsiddhiQueryContext
- siddhiQueryContext of Siddhiprotected void process(io.siddhi.core.event.ComplexEventChunk<io.siddhi.core.event.stream.StreamEvent> streamEventChunk, io.siddhi.core.query.processor.Processor nextProcessor, io.siddhi.core.event.stream.StreamEventCloner streamEventCloner, io.siddhi.core.event.stream.populater.ComplexEventPopulater complexEventPopulater, org.wso2.extension.siddhi.execution.priority.PriorityStreamProcessor.ExtensionState state)
process
in class io.siddhi.core.query.processor.stream.StreamProcessor<org.wso2.extension.siddhi.execution.priority.PriorityStreamProcessor.ExtensionState>
streamEventChunk
- the event chunk that need to be processednextProcessor
- the next processor to which the success events need to be passedstreamEventCloner
- helps to clone the incoming event for local storage or modificationcomplexEventPopulater
- helps to populate the events with the resultant attributespublic void setScheduler(io.siddhi.core.util.Scheduler scheduler)
setScheduler
in interface io.siddhi.core.query.processor.SchedulingProcessor
public io.siddhi.core.util.Scheduler getScheduler()
getScheduler
in interface io.siddhi.core.query.processor.SchedulingProcessor
public void start()
start
in interface io.siddhi.core.util.extension.holder.ExternalReferencedHolder
public void stop()
stop
in interface io.siddhi.core.util.extension.holder.ExternalReferencedHolder
public List<io.siddhi.query.api.definition.Attribute> getReturnAttributes()
getReturnAttributes
in class io.siddhi.core.query.processor.stream.AbstractStreamProcessor<org.wso2.extension.siddhi.execution.priority.PriorityStreamProcessor.ExtensionState>
public io.siddhi.core.query.processor.ProcessingMode getProcessingMode()
getProcessingMode
in class io.siddhi.core.query.processor.stream.AbstractStreamProcessor<org.wso2.extension.siddhi.execution.priority.PriorityStreamProcessor.ExtensionState>
Copyright © 2019 WSO2. All rights reserved.