package org.wso2.extension.siddhi.execution.env;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.snapshot.state.SnapshotStateList;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;

@Extension(name = "resourceBatch", namespace = "env", description = "A resource batch (tumbling) window that holds a number of events with specified attribute as grouping key and based on the resource count inferred from env:resourceIdentifier extension. The window is updated each time a batch of events with same key value that equals the number of resources count.", parameters = {@Parameter(name = "resource.group.id", description = "The resource group name.", type = {DataType.STRING}), @Parameter(name = "correlation.id", description = "The attribute that should be used for event correlation.", type = {DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE}), @Parameter(name = "time.in.milliseconds", description = "Time to wait for arrival of new event, before flushing and giving output for events belonging to a specific batch.", type = {DataType.INT, DataType.LONG, DataType.TIME}, optional = true, defaultValue = "300000")}, examples = {@Example(syntax = "define stream SweetProductDefectsDetector(productId string, colorCode string, height long, width long);\ndefine stream SweetProductDefectAlert(productId string, isDefected bool);\n\n@info(name='product_color_code_rule') \nfrom SweetProductDefectsDetector#env:resourceIdentifier(\"rule-group-1\")\nselect productId, if(colorCode == '#FF0000', true, false) as isValid\ninsert into DefectDetectionResult;\n\n@info(name='product_dimensions_rule') \nfrom SweetProductDefectsDetector#env:resourceIdentifier(\"rule-group-1\")\nselect productId, if(height == 5 && width ==10, true, false) as isValid\ninsert into DefectDetectionResult;\n\n@info(name='defect_analyzer') \nfrom DefectDetectionResult#window.env:resourceBatch(\"rule-group-1\", productId, 60000)\nselect productId, and(not isValid) as isDefected\ninsert into SweetProductDefectAlert;", description = "This example demonstrate the usage of 'env:resourceBatch' widow extension with 'env:resourceIdentifier' stream processor and 'and' attribute aggregator extensions.\n Use Case: The SweetProductDefectsDetector gets the Sweet Production data as an input stream and each event will be sent to the 'rule' queries( 'product_color_code_rule' and 'product_dimensions_rule') . The query 'defect_analyzer' should wait for both the output results from the 'rule' queries output and based on the aggregated results(take the logical AND aggregation of the 'isValid' attribute both events from 'product_color_code_rule' and 'product_dimensions_rule'), generate events and insert into the output stream  'SweetProductDefectAlert'.\nIn the above example, a number of 'rule' queries can be changed and the 'defect_analyzer' query should wait for results from the all available rules.\n\nTo address this use case, we have defined the same resource.group.id: rule-group-1 in all the 'rule' queries, and its registering the resources using 'resourceIdentifier' extension.  In the 'defect_analyzer' query we defined the env:resourceBatch(\"rule-group-1\", productId, 2000) window as it will accumulating the events with correlation.id:productId, where it holds the events for same 'productId' until it matches the number of available \"rule-group-1\" resources or flushing the events if the timeout(time.in.milliseconds:2000) occurs.\nTo aggregate the results from 'rule' queries, we have used 'and(not isValid)' attribute aggregator where it logically computes AND operation of not isValid boolean attribute values and outputs the results as a boolean value.\n\nInput 1: [SweetProductDefectsDetector]\n{  \n   \"event\":{  \n      \"productId\":\"Cake\",\n      \"colorCode\":\"FF0000\",\n      \"height\": 5,\n      \"width\": 10\n\n   }\n}\n\nOutput 1:[SweetProductDefectAlert]\n{  \n   \"event\":{  \n      \"productId\":\"Cake\",\n      \"isDefected\":\"false\"\n   }\n}\n\nInput 2: [SweetProductDefectsDetector]\n{  \n   \"event\":{  \n      \"productId\":\"Cake\",\n      \"colorCode\":\"FF0000\",\n      \"height\": 10,\n      \"width\": 20\n\n   }\n}\n\nOutput 2:[SweetProductDefectAlert]\n{  \n   \"event\":{  \n      \"productId\":\"Cake\",\n      \"isDefected\":\"true\"\n   }\n}")})
/* loaded from: input_file:org/wso2/extension/siddhi/execution/env/ResourceBatchWindowProcessor.class */
public class ResourceBatchWindowProcessor extends WindowProcessor implements SchedulingProcessor {
    private static final int LATE_EVENT_FLUSHING_DURATION = 300000;
    private SiddhiAppContext siddhiAppContext;
    private ExpressionExecutor groupKeyExpressionExecutor;
    private boolean outputExpectsExpiredEvents;
    private Scheduler scheduler;
    private String resourceName;
    private SnapshotableStreamEventQueue eventsToBeExpiredEventChunk = new SnapshotableStreamEventQueue(this.streamEventClonerHolder);
    private Map<Object, ResourceStreamEventList> groupEventMap = new LinkedHashMap();
    private long timeInMilliSeconds = 300000;
    private long nextEmitTime = -1;

    /* loaded from: input_file:org/wso2/extension/siddhi/execution/env/ResourceBatchWindowProcessor$ResourceStreamEventList.class */
    public static class ResourceStreamEventList {
        private List<StreamEvent> streamEventList;
        private long expiryTimestamp;
        private boolean isExpired;

        public ResourceStreamEventList(List<StreamEvent> list, long j) {
            this.streamEventList = list;
            this.expiryTimestamp = j;
        }

        public void setExpired(boolean z) {
            this.isExpired = z;
        }
    }

    protected void init(ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, boolean z, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppContext = siddhiAppContext;
        this.outputExpectsExpiredEvents = z;
        if (expressionExecutorArr.length < 2) {
            throw new SiddhiAppValidationException("Resource batch window should only have two or three parameters, but found " + expressionExecutorArr.length + " input attributes");
        }
        if (!(expressionExecutorArr[0] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppValidationException("Resource Batch window should have constant for 'resource.group.id' parameter but found a dynamic attribute " + expressionExecutorArr[1].getClass().getCanonicalName());
        }
        if (expressionExecutorArr[0].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppValidationException("Resource Batch window's 'resource.group.id' parameter should be String, but found " + expressionExecutorArr[1].getReturnType());
        }
        this.resourceName = (String) ((ConstantExpressionExecutor) expressionExecutorArr[0]).getValue();
        this.groupKeyExpressionExecutor = expressionExecutorArr[1];
        if (expressionExecutorArr.length == 3) {
            this.groupKeyExpressionExecutor = expressionExecutorArr[1];
            if (!(expressionExecutorArr[2] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppValidationException("ResourceBatch window's 3rd parameter 'time.in.milliseconds' should either be a constant (of type int or long)");
            }
            if (expressionExecutorArr[2].getReturnType() == Attribute.Type.INT) {
                this.timeInMilliSeconds = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[2]).getValue()));
            } else {
                if (expressionExecutorArr[2].getReturnType() != Attribute.Type.LONG) {
                    throw new SiddhiAppValidationException("ResourceBatch window's 3rd parameter 'time.in.milliseconds' should be either be a constant (of type int or long), but found " + expressionExecutorArr[2].getReturnType());
                }
                this.timeInMilliSeconds = Long.parseLong(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[2]).getValue()));
            }
        }
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        synchronized (this) {
            long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
            ComplexEventChunk complexEventChunk2 = new ComplexEventChunk(true);
            if (this.eventsToBeExpiredEventChunk.getFirst() != null) {
                while (this.eventsToBeExpiredEventChunk.hasNext()) {
                    this.eventsToBeExpiredEventChunk.next().setTimestamp(currentTime);
                }
                complexEventChunk2.add(this.eventsToBeExpiredEventChunk.getFirst());
            }
            this.eventsToBeExpiredEventChunk.clear();
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(next);
                if (!next.getType().equals(ComplexEvent.Type.TIMER)) {
                    Object execute = this.groupKeyExpressionExecutor.execute(copyStreamEvent);
                    ResourceStreamEventList resourceStreamEventList = this.groupEventMap.get(execute);
                    if (resourceStreamEventList != null) {
                        resourceStreamEventList.streamEventList.add(copyStreamEvent);
                    } else {
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(copyStreamEvent);
                        this.groupEventMap.put(execute, new ResourceStreamEventList(arrayList, copyStreamEvent.getTimestamp() + this.timeInMilliSeconds));
                        long j = currentTime + this.timeInMilliSeconds;
                        if (j > this.nextEmitTime) {
                            this.nextEmitTime = j;
                            if (this.scheduler != null) {
                                this.scheduler.notifyAt(this.nextEmitTime);
                                this.scheduler.notifyAt(this.nextEmitTime + 300000);
                            }
                        }
                    }
                }
                for (Map.Entry<Object, ResourceStreamEventList> entry : this.groupEventMap.entrySet()) {
                    int resourceCount = ResourceIdentifierStreamProcessor.getResourceCount(this.resourceName);
                    List list = null;
                    if (entry.getValue().isExpired) {
                        if (entry.getValue().expiryTimestamp + 300000 < currentTime) {
                            this.groupEventMap.remove(entry.getKey());
                        }
                    } else if (entry.getValue().streamEventList.size() >= resourceCount || entry.getValue().expiryTimestamp + 300000 < currentTime) {
                        list = entry.getValue().streamEventList;
                        this.groupEventMap.remove(entry.getKey());
                    } else if (entry.getValue().expiryTimestamp < currentTime) {
                        list = entry.getValue().streamEventList;
                        entry.getValue().setExpired(true);
                    }
                    if (list != null) {
                        if (this.outputExpectsExpiredEvents) {
                            Iterator it = entry.getValue().streamEventList.iterator();
                            while (it.hasNext()) {
                                StreamEvent copyStreamEvent2 = streamEventCloner.copyStreamEvent((StreamEvent) it.next());
                                copyStreamEvent2.setType(ComplexEvent.Type.EXPIRED);
                                this.eventsToBeExpiredEventChunk.add(copyStreamEvent2);
                            }
                        }
                        StreamEvent copyStreamEvent3 = streamEventCloner.copyStreamEvent((StreamEvent) list.get(0));
                        copyStreamEvent3.setType(ComplexEvent.Type.RESET);
                        this.eventsToBeExpiredEventChunk.add(copyStreamEvent3);
                        for (StreamEvent streamEvent : entry.getValue().streamEventList) {
                            streamEvent.setType(ComplexEvent.Type.CURRENT);
                            complexEventChunk2.add(streamEvent);
                        }
                    }
                }
            }
            complexEventChunk.clear();
            if (complexEventChunk2.getFirst() != null) {
                complexEventChunk2.setBatch(true);
                processor.process(complexEventChunk2);
                complexEventChunk2.setBatch(false);
            }
        }
    }

    public void start() {
    }

    public void stop() {
    }

    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        synchronized (this) {
            if (this.eventsToBeExpiredEventChunk != null) {
                hashMap.put("ExpiredEventQueue", this.eventsToBeExpiredEventChunk.getSnapshot());
            }
            hashMap.put("GroupEventMap", this.groupEventMap);
        }
        return hashMap;
    }

    public synchronized void restoreState(Map<String, Object> map) {
        if (this.eventsToBeExpiredEventChunk != null) {
            this.eventsToBeExpiredEventChunk.restore((SnapshotStateList) map.get("ExpiredEventQueue"));
        }
        this.groupEventMap = (Map) map.get("GroupEventMap");
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }
}
