KernelMinMaxStreamProcessor.java
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.extension.siddhi.execution.timeseries.extrema;
import org.wso2.extension.siddhi.execution.timeseries.extrema.util.ExtremaCalculator;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
/**
* Implementation of Kernel Min Max for siddhiQL.
*/
@Extension(
name = "kernelMinMax",
namespace = "timeseries",
description = "TBD",
parameters = {
@Parameter(name = "variable",
description = "The time series value to be considered for minima maxima detection.",
type = {DataType.DOUBLE, DataType.FLOAT, DataType.INT, DataType.LONG}),
@Parameter(name = "bandwidth",
description = "The bandwidth of the Gaussian Kernel calculation.",
type = {DataType.DOUBLE}),
@Parameter(name = "window.size",
description = "The number of values to be considered for smoothing and" +
" determining the extremes.",
type = {DataType.INT}),
@Parameter(name = "extrema.type",
description = "This can be min, max or minmax.",
type = {DataType.STRING}),
},
examples = {
@Example(
syntax = "from inputStream#timeseries:kernelMinMax(price, 3, 7, ‘min’)\n" +
"select *\n" +
"insert into outputStream;",
description = "This example returns the maximum values for a set of price values."
),
@Example(
syntax = "from inputStream#timeseries:kernelMinMax(price, 3, 7, 'max')\n" +
"select *\n" +
"insert into outputStream;",
description = "This example returns the minimum values for a set of price values."
),
@Example(
syntax = "from inputStream#timeseries:kernelMinMax(price, 3, 7, ‘minmax’)\n" +
"select *\n" +
"insert into outputStream;",
description = "This example returns both the minimum values and the maximum values for a " +
"set of price values."
)
}
)
public class KernelMinMaxStreamProcessor extends StreamProcessor {
ExtremaType extremaType;
int[] variablePosition;
double bw = 0;
int windowSize = 0;
LinkedList<StreamEvent> eventStack = null;
Queue<Double> valueStack = null;
Queue<StreamEvent> uniqueQueue = null;
ExtremaCalculator extremaCalculator = null;
private int minEventPos;
private int maxEventPos;
@Override
protected List<Attribute> init(AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutors,
ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
if (attributeExpressionExecutors.length != 4) {
throw new SiddhiAppValidationException("Invalid no of arguments passed to KernelMinMaxStreamProcessor," +
" required 4, " +
"but found " + attributeExpressionExecutors.length);
}
if (!(attributeExpressionExecutors[0].getReturnType() == Attribute.Type.DOUBLE ||
attributeExpressionExecutors[0].getReturnType() == Attribute.Type.INT
|| attributeExpressionExecutors[0].getReturnType() == Attribute.Type.FLOAT ||
attributeExpressionExecutors[0].getReturnType() == Attribute.Type.LONG)) {
throw new SiddhiAppValidationException("Invalid parameter type found for the 1st argument" +
" of KernelMinMaxStreamProcessor, " +
"required " + Attribute.Type.DOUBLE + " or " + Attribute.Type.FLOAT + " or "
+ Attribute.Type.INT + " or " +
Attribute.Type.LONG + " but found " + attributeExpressionExecutors[0]
.getReturnType().toString());
}
try {
bw = Double.parseDouble(String.valueOf(((ConstantExpressionExecutor) attributeExpressionExecutors[1])
.getValue()));
} catch (NumberFormatException e) {
throw new SiddhiAppValidationException("Invalid parameter type found for the 2nd argument" +
" of KernelMinMaxStreamProcessor " +
"required " + Attribute.Type.DOUBLE + " constant, but found " + attributeExpressionExecutors[1]
.getReturnType().toString());
}
if (!(attributeExpressionExecutors[2] instanceof ConstantExpressionExecutor) || attributeExpressionExecutors[2]
.getReturnType() != Attribute.Type.INT) {
throw new SiddhiAppValidationException("Invalid parameter type found for the 3rd argument " +
"of KernelMinMaxStreamProcessor, " +
"required " + Attribute.Type.INT + " constant, but found " + attributeExpressionExecutors[2]
.getReturnType().toString());
}
if (!(attributeExpressionExecutors[3] instanceof ConstantExpressionExecutor) || attributeExpressionExecutors[3]
.getReturnType() != Attribute.Type.STRING) {
throw new SiddhiAppValidationException("Invalid parameter type found for the 4th argument " +
"of KernelMinMaxStreamProcessor, " +
"required " + Attribute.Type.STRING + " constant, but found " + attributeExpressionExecutors[2]
.getReturnType().toString());
}
variablePosition = ((VariableExpressionExecutor) attributeExpressionExecutors[0]).getPosition();
windowSize = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor) attributeExpressionExecutors[2])
.getValue()));
String extremeType = (String) ((ConstantExpressionExecutor) attributeExpressionExecutors[3]).getValue();
if ("min".equalsIgnoreCase(extremeType)) {
extremaType = ExtremaType.MIN;
} else if ("max".equalsIgnoreCase(extremeType)) {
extremaType = ExtremaType.MAX;
} else {
extremaType = ExtremaType.MINMAX;
}
extremaCalculator = new ExtremaCalculator();
eventStack = new LinkedList<StreamEvent>();
valueStack = new LinkedList<Double>();
uniqueQueue = new LinkedList<StreamEvent>();
List<Attribute> attributeList = new ArrayList<Attribute>();
attributeList.add(new Attribute("extremaType", Attribute.Type.STRING));
return attributeList;
}
@Override
protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor,
StreamEventCloner streamEventCloner,
ComplexEventPopulater complexEventPopulater) {
ComplexEventChunk<StreamEvent> returnEventChunk = new ComplexEventChunk<StreamEvent>(false);
synchronized (this) {
while (streamEventChunk.hasNext()) {
StreamEvent event = streamEventChunk.next();
streamEventChunk.remove();
Double eventKey = (Double) event.getAttribute(variablePosition);
eventStack.add(event);
valueStack.add(eventKey);
if (eventStack.size() > windowSize) {
Queue<Double> smoothedValues = extremaCalculator.smooth(valueStack, bw);
StreamEvent minimumEvent;
StreamEvent maximumEvent;
switch (extremaType) {
case MINMAX:
maximumEvent = getMaxEvent(smoothedValues);
minimumEvent = getMinEvent(smoothedValues);
if (maximumEvent != null && minimumEvent != null) {
if (maxEventPos > minEventPos) {
returnEventChunk.add(minimumEvent);
returnEventChunk.add(maximumEvent);
} else {
returnEventChunk.add(maximumEvent);
returnEventChunk.add(minimumEvent);
}
} else if (maximumEvent != null) {
returnEventChunk.add(maximumEvent);
} else if (minimumEvent != null) {
returnEventChunk.add(minimumEvent);
}
break;
case MIN:
minimumEvent = getMinEvent(smoothedValues);
if (minimumEvent != null) {
returnEventChunk.add(minimumEvent);
}
break;
case MAX:
maximumEvent = getMaxEvent(smoothedValues);
if (maximumEvent != null) {
returnEventChunk.add(maximumEvent);
}
break;
}
eventStack.remove();
valueStack.remove();
}
}
}
if (returnEventChunk.getFirst() != null) {
nextProcessor.process(returnEventChunk);
}
}
private StreamEvent getMaxEvent(Queue<Double> smoothedValues) {
//value 1 is an optimized value for stock market domain, this value may change for other domains
Integer maxPosition = extremaCalculator.findMax(smoothedValues, 1);
if (maxPosition != null) {
//values 5 and 3 are optimized values for stock market domain, these value may change for other domains
Integer maxEventPosition = extremaCalculator.findMax(valueStack, windowSize / 5, windowSize / 3);
StreamEvent returnMaximumEvent = getExtremaEvent(maxPosition, maxEventPosition);
if (returnMaximumEvent != null) {
maxEventPos = maxEventPosition;
complexEventPopulater.populateComplexEvent(returnMaximumEvent, new Object[]{"max"});
return returnMaximumEvent;
}
}
return null;
}
private StreamEvent getMinEvent(Queue<Double> smoothedValues) {
//value 1 is an optimized value for stock market domain, this value may change for other domains
Integer minPosition = extremaCalculator.findMin(smoothedValues, 1);
if (minPosition != null) {
//values 5 and 3 are optimized values for stock market domain, these value may change for other domains
Integer minEventPosition = extremaCalculator.findMin(valueStack, windowSize / 5, windowSize / 3);
StreamEvent returnMinimumEvent = getExtremaEvent(minPosition, minEventPosition);
if (returnMinimumEvent != null) {
minEventPos = minEventPosition;
complexEventPopulater.populateComplexEvent(returnMinimumEvent, new Object[]{"min"});
return returnMinimumEvent;
}
}
return null;
}
private StreamEvent getExtremaEvent(Integer smoothenedPosition, Integer eventPosition) {
//values 5 and 3 are optimized values for stock market domain, these value may change for other domains
if (eventPosition != null && eventPosition - smoothenedPosition <= windowSize / 5 &&
smoothenedPosition - eventPosition <= windowSize / 2) {
StreamEvent extremaEvent = eventStack.get(eventPosition);
if (!uniqueQueue.contains(extremaEvent)) {
//value 5 is an optimized value for stock market domain, this value may change for other domains
if (uniqueQueue.size() > 5) {
uniqueQueue.remove();
}
uniqueQueue.add(extremaEvent);
eventStack.remove();
valueStack.remove();
return streamEventCloner.copyStreamEvent(extremaEvent);
}
}
return null;
}
@Override
public void start() {
}
@Override
public void stop() {
}
@Override
public synchronized Map<String, Object> currentState() {
Map<String, Object> state = new HashMap<String, Object>();
state.put("eventStack", eventStack);
state.put("valueStack", valueStack);
state.put("uniqueQueue", uniqueQueue);
return state;
}
@Override
public synchronized void restoreState(Map<String, Object> state) {
eventStack = (LinkedList<StreamEvent>) state.get("eventStack");
valueStack = (Queue<Double>) state.get("valueStack");
uniqueQueue = (Queue<StreamEvent>) state.get("uniqueQueue");
}
/**
* Enumeration for extrema types.
*/
public enum ExtremaType {
MIN, MAX, MINMAX
}
}