package org.wso2.siddhi.core.query.processor.stream.window;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.SessionComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.SessionContainer;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

/* JADX WARN: Classes with same name are omitted:
  input_file:dependencies/siddhi-core-4.4.9.jar:org/wso2/siddhi/core/query/processor/stream/window/SessionWindowProcessor.class
 */
@Extension(name = "session", namespace = "", description = "This is a session window that holds events that belong to a specific session. The events that belong to a specific session are identified by a grouping attribute (i.e., a session key). A session gap period is specified to determine the time period after which the session is considered to be expired. A new event that arrives with a specific value for the session key is matched with the session window with the same session key.\n  When performing aggregations for a specific session, you can include events with the matching session key that arrive after the session is expired if required. This is done by specifying a latency time period that is less than the session gap period.\nTo have aggregate functions with session windows, the events need to be grouped by the session key via a 'group by' clause.", parameters = {@Parameter(name = "window.session", description = "The time period for which the session considered is valid. This is specified in seconds, minutes, or milliseconds (i.e., 'min', 'sec', or 'ms'.", type = {DataType.INT, DataType.LONG, DataType.TIME}), @Parameter(name = "window.key", description = "The grouping attribute for events.", type = {DataType.STRING}, optional = true, defaultValue = SessionWindowProcessor.DEFAULT_KEY), @Parameter(name = "window.allowedlatency", description = "This specifies the time period for which the session window is valid after the expiration of the session. The time period specified here should be less than the session time gap (which is specified via the 'window.session' parameter).", type = {DataType.INT, DataType.LONG, DataType.TIME}, optional = true, defaultValue = "0")}, examples = {@Example(syntax = "define stream PurchaseEventStream (user string, item_number int, price float, quantity int);\n\n@info(name='query0) \nfrom PurchaseEventStream#window.session(5 sec, user, 2 sec) \nselect * \ninsert all events into OutputStream;", description = "This query processes events that arrive at the PurchaseEvent input stream. The 'user' attribute is the session key, and the session gap is 5 seconds. '2 sec' is specified as the allowed latency. Therefore, events with the matching user name that arrive 2 seconds after the expiration of the session are also considered when performing aggregations for the session identified by the given user name.")})
/* loaded from: input_file:org/wso2/siddhi/core/query/processor/stream/window/SessionWindowProcessor.class */
public class SessionWindowProcessor extends WindowProcessor implements SchedulingProcessor, FindableProcessor {
    private static final Logger log = Logger.getLogger(SessionWindowProcessor.class);
    private long sessionGap = 0;
    private long allowedLatency = 0;
    private VariableExpressionExecutor sessionKeyExecutor;
    private Scheduler scheduler;
    private Map<String, SessionContainer> sessionMap;
    private Map<String, Long> sessionKeyEndTimeMap;
    private SessionContainer sessionContainer;
    private SessionComplexEventChunk<StreamEvent> expiredEventChunk;
    private static final String DEFAULT_KEY = "default-key";

    @Override // org.wso2.siddhi.core.query.processor.SchedulingProcessor
    public Scheduler getScheduler() {
        return this.scheduler;
    }

    @Override // org.wso2.siddhi.core.query.processor.SchedulingProcessor
    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void init(ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, boolean z, SiddhiAppContext siddhiAppContext) {
        this.sessionMap = new ConcurrentHashMap();
        this.sessionKeyEndTimeMap = new HashMap();
        this.sessionContainer = new SessionContainer();
        this.expiredEventChunk = new SessionComplexEventChunk<>();
        if (expressionExecutorArr.length < 1 || expressionExecutorArr.length > 3) {
            throw new SiddhiAppValidationException("Session window should only have one to three parameters (<int|long|time> sessionGap, <String> sessionKey, <int|long|time> allowedLatency, but found " + expressionExecutorArr.length + " input attributes");
        }
        if (!(expressionExecutorArr[0] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppValidationException("Session window's 1st parameter, session gap should be a constant parameter attribute but found a dynamic attribute " + expressionExecutorArr[0].getClass().getCanonicalName());
        }
        if (expressionExecutorArr[0].getReturnType() != Attribute.Type.INT && expressionExecutorArr[0].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("Session window's session gap parameter should be either int or long, but found " + expressionExecutorArr[0].getReturnType());
        }
        this.sessionGap = ((Long) ((ConstantExpressionExecutor) expressionExecutorArr[0]).getValue()).longValue();
        if (expressionExecutorArr.length == 3) {
            if (!(expressionExecutorArr[1] instanceof VariableExpressionExecutor)) {
                throw new SiddhiAppValidationException("Session window's 2nd parameter, session key should be a dynamic parameter attribute but found a constant attribute " + expressionExecutorArr[1].getClass().getCanonicalName());
            }
            if (expressionExecutorArr[1].getReturnType() != Attribute.Type.STRING) {
                throw new SiddhiAppValidationException("Session window's session key parameter type should be string, but found " + expressionExecutorArr[1].getReturnType());
            }
            this.sessionKeyExecutor = (VariableExpressionExecutor) expressionExecutorArr[1];
            if (!(expressionExecutorArr[2] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppValidationException("Session window's 3rd parameter, allowedLatency should be a constant parameter attribute but found a dynamic attribute " + expressionExecutorArr[2].getClass().getCanonicalName());
            }
            if (expressionExecutorArr[2].getReturnType() != Attribute.Type.INT && expressionExecutorArr[2].getReturnType() != Attribute.Type.LONG) {
                throw new SiddhiAppValidationException("Session window's allowedLatency parameter should be either int or long, but found " + expressionExecutorArr[2].getReturnType());
            }
            this.allowedLatency = ((Long) ((ConstantExpressionExecutor) expressionExecutorArr[2]).getValue()).longValue();
            validateAllowedLatency(this.allowedLatency, this.sessionGap);
        }
        if (expressionExecutorArr.length == 2) {
            if (expressionExecutorArr[1] instanceof VariableExpressionExecutor) {
                if (expressionExecutorArr[1].getReturnType() != Attribute.Type.STRING) {
                    throw new SiddhiAppValidationException("Session window's session key parameter type should be string, but found " + expressionExecutorArr[1].getReturnType());
                }
                this.sessionKeyExecutor = (VariableExpressionExecutor) expressionExecutorArr[1];
            } else {
                if (expressionExecutorArr[1].getReturnType() != Attribute.Type.INT && expressionExecutorArr[1].getReturnType() != Attribute.Type.LONG) {
                    throw new SiddhiAppValidationException("Session window's allowedLatency parameter should be either int or long, but found " + expressionExecutorArr[1].getReturnType());
                }
                this.allowedLatency = ((Long) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).longValue();
                validateAllowedLatency(this.allowedLatency, this.sessionGap);
            }
        }
    }

    private void validateAllowedLatency(long j, long j2) {
        if (j > j2) {
            throw new SiddhiAppValidationException("Session window's allowedLatency parameter value should not be greater than the session gap parameter value");
        }
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        String str = DEFAULT_KEY;
        synchronized (this) {
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                long timestamp = next.getTimestamp();
                long j = timestamp + this.sessionGap;
                long j2 = j + this.allowedLatency;
                if (next.getType() == ComplexEvent.Type.CURRENT) {
                    if (this.sessionKeyExecutor != null) {
                        str = (String) this.sessionKeyExecutor.execute(next);
                    }
                    SessionContainer sessionContainer = this.sessionMap.get(str);
                    this.sessionContainer = sessionContainer;
                    if (sessionContainer == null) {
                        this.sessionContainer = new SessionContainer(str);
                    }
                    this.sessionMap.put(str, this.sessionContainer);
                    StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(next);
                    copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                    SessionComplexEventChunk<StreamEvent> currentSession = this.sessionContainer.getCurrentSession();
                    if (this.sessionContainer.getCurrentSession().getFirst() == null) {
                        currentSession.add(copyStreamEvent);
                        currentSession.setTimestamps(timestamp, j, j2);
                        this.scheduler.notifyAt(j);
                    } else if (timestamp < currentSession.getStartTimestamp()) {
                        addLateEvent(complexEventChunk, timestamp, copyStreamEvent);
                    } else if (timestamp <= currentSession.getEndTimestamp()) {
                        currentSession.setTimestamps(currentSession.getStartTimestamp(), j, j2);
                        currentSession.add(copyStreamEvent);
                        this.scheduler.notifyAt(j);
                    } else if (this.allowedLatency > 0) {
                        moveCurrentSessionToPreviousSession();
                        currentSession.clear();
                        currentSession.setTimestamps(timestamp, j, j2);
                        currentSession.add(copyStreamEvent);
                        this.scheduler.notifyAt(j);
                    }
                } else {
                    currentSessionTimeout(timestamp);
                    if (this.allowedLatency > 0) {
                        previousSessionTimeout(timestamp);
                    }
                }
            }
        }
        processor.process(complexEventChunk);
        if (this.expiredEventChunk == null || this.expiredEventChunk.getFirst() == null) {
            return;
        }
        processor.process(this.expiredEventChunk);
        this.expiredEventChunk.clear();
    }

    private void mergeWindows(SessionComplexEventChunk<StreamEvent> sessionComplexEventChunk, SessionComplexEventChunk<StreamEvent> sessionComplexEventChunk2) {
        if (sessionComplexEventChunk.getFirst() == null || sessionComplexEventChunk.getEndTimestamp() < sessionComplexEventChunk2.getStartTimestamp() - this.sessionGap) {
            return;
        }
        if (sessionComplexEventChunk2.hasNext()) {
            sessionComplexEventChunk2.next();
        }
        sessionComplexEventChunk2.insertBeforeCurrent(sessionComplexEventChunk.getFirst());
        sessionComplexEventChunk2.setStartTimestamp(sessionComplexEventChunk.getStartTimestamp());
        sessionComplexEventChunk.clear();
    }

    private void moveCurrentSessionToPreviousSession() {
        SessionComplexEventChunk<StreamEvent> currentSession = this.sessionContainer.getCurrentSession();
        SessionComplexEventChunk<StreamEvent> previousSession = this.sessionContainer.getPreviousSession();
        if (previousSession.getFirst() == null) {
            previousSession.add(currentSession.getFirst());
        } else {
            this.expiredEventChunk.setKey(previousSession.getKey());
            this.expiredEventChunk.setTimestamps(previousSession.getStartTimestamp(), previousSession.getEndTimestamp(), previousSession.getAliveTimestamp());
            this.expiredEventChunk.add(previousSession.getFirst());
            previousSession.clear();
            previousSession.add(currentSession.getFirst());
        }
        previousSession.setTimestamps(currentSession.getStartTimestamp(), currentSession.getEndTimestamp(), currentSession.getAliveTimestamp());
        this.scheduler.notifyAt(currentSession.getAliveTimestamp());
    }

    private void addLateEvent(ComplexEventChunk<StreamEvent> complexEventChunk, long j, StreamEvent streamEvent) {
        SessionComplexEventChunk<StreamEvent> currentSession = this.sessionContainer.getCurrentSession();
        SessionComplexEventChunk<StreamEvent> previousSession = this.sessionContainer.getPreviousSession();
        if (this.allowedLatency <= 0) {
            if (j < currentSession.getStartTimestamp() - this.sessionGap) {
                complexEventChunk.remove();
                log.info("The event, " + streamEvent + " is late and it's session window has been timeout");
                return;
            } else {
                if (currentSession.hasNext()) {
                    currentSession.next();
                }
                currentSession.insertBeforeCurrent(streamEvent);
                currentSession.setStartTimestamp(j);
                return;
            }
        }
        if (j >= currentSession.getStartTimestamp() - this.sessionGap) {
            if (currentSession.hasNext()) {
                currentSession.next();
            }
            currentSession.insertBeforeCurrent(streamEvent);
            currentSession.setStartTimestamp(j);
            mergeWindows(previousSession, currentSession);
            return;
        }
        if (previousSession.getFirst() == null && j < currentSession.getStartTimestamp() - this.sessionGap) {
            complexEventChunk.remove();
            log.info("The event, " + streamEvent + " is late and it's session window has been timeout");
            return;
        }
        if (j < previousSession.getStartTimestamp() - this.sessionGap) {
            complexEventChunk.remove();
            log.info("The event, " + streamEvent + " is late and it's session window has been timeout");
            return;
        }
        previousSession.add(streamEvent);
        if (j <= previousSession.getEndTimestamp() - this.sessionGap && j < previousSession.getStartTimestamp()) {
            previousSession.setStartTimestamp(j);
            return;
        }
        previousSession.setEndTimestamp(j + this.sessionGap);
        previousSession.setAliveTimestamp(j + this.sessionGap + this.allowedLatency);
        mergeWindows(previousSession, currentSession);
    }

    private void currentSessionTimeout(long j) {
        Map<String, Long> findAllCurrentEndTimestamps = findAllCurrentEndTimestamps(this.sessionMap);
        if (findAllCurrentEndTimestamps.size() > 1) {
            findAllCurrentEndTimestamps = (Map) findAllCurrentEndTimestamps.entrySet().stream().sorted(Map.Entry.comparingByValue()).collect(Collectors.toMap(entry -> {
                return (String) entry.getKey();
            }, entry2 -> {
                return (Long) entry2.getValue();
            }, (l, l2) -> {
                return l;
            }, LinkedHashMap::new));
        }
        for (Map.Entry<String, Long> entry3 : findAllCurrentEndTimestamps.entrySet()) {
            long longValue = entry3.getValue().longValue();
            SessionComplexEventChunk<StreamEvent> currentSession = this.sessionMap.get(entry3.getKey()).getCurrentSession();
            SessionComplexEventChunk<StreamEvent> previousSession = this.sessionMap.get(entry3.getKey()).getPreviousSession();
            if (currentSession.getFirst() == null || j < longValue) {
                return;
            }
            if (this.allowedLatency > 0) {
                previousSession.add(currentSession.getFirst());
                previousSession.setTimestamps(currentSession.getStartTimestamp(), currentSession.getEndTimestamp(), currentSession.getAliveTimestamp());
                this.scheduler.notifyAt(currentSession.getAliveTimestamp());
                currentSession.clear();
            } else {
                this.expiredEventChunk.setKey(currentSession.getKey());
                this.expiredEventChunk.setTimestamps(currentSession.getStartTimestamp(), currentSession.getEndTimestamp(), currentSession.getAliveTimestamp());
                this.expiredEventChunk.add(currentSession.getFirst());
                currentSession.clear();
            }
        }
    }

    private void previousSessionTimeout(long j) {
        Map<String, Long> findAllPreviousEndTimestamps = findAllPreviousEndTimestamps(this.sessionMap);
        if (findAllPreviousEndTimestamps.size() > 1) {
            findAllPreviousEndTimestamps = (Map) findAllPreviousEndTimestamps.entrySet().stream().sorted(Map.Entry.comparingByValue()).collect(Collectors.toMap(entry -> {
                return (String) entry.getKey();
            }, entry2 -> {
                return (Long) entry2.getValue();
            }, (l, l2) -> {
                return l;
            }, LinkedHashMap::new));
        }
        Iterator<Map.Entry<String, Long>> it = findAllPreviousEndTimestamps.entrySet().iterator();
        while (it.hasNext()) {
            SessionComplexEventChunk<StreamEvent> previousSession = this.sessionMap.get(it.next().getKey()).getPreviousSession();
            if (previousSession == null || previousSession.getFirst() == null || j < previousSession.getAliveTimestamp()) {
                return;
            }
            this.expiredEventChunk.setKey(previousSession.getKey());
            this.expiredEventChunk.setTimestamps(previousSession.getStartTimestamp(), previousSession.getEndTimestamp(), previousSession.getAliveTimestamp());
            this.expiredEventChunk.add(previousSession.getFirst());
            previousSession.clear();
        }
    }

    private Map<String, Long> findAllCurrentEndTimestamps(Map<String, SessionContainer> map) {
        Collection<SessionContainer> values = map.values();
        if (!this.sessionKeyEndTimeMap.isEmpty()) {
            this.sessionKeyEndTimeMap.clear();
        }
        for (SessionContainer sessionContainer : values) {
            if (sessionContainer.getCurrentSessionEndTimestamp() != -1) {
                this.sessionKeyEndTimeMap.put(sessionContainer.getKey(), Long.valueOf(sessionContainer.getCurrentSessionEndTimestamp()));
            }
        }
        return this.sessionKeyEndTimeMap;
    }

    private Map<String, Long> findAllPreviousEndTimestamps(Map<String, SessionContainer> map) {
        Collection<SessionContainer> values = map.values();
        if (!this.sessionKeyEndTimeMap.isEmpty()) {
            this.sessionKeyEndTimeMap.clear();
        }
        for (SessionContainer sessionContainer : values) {
            if (sessionContainer.getPreviousSessionEndTimestamp() != -1) {
                this.sessionKeyEndTimeMap.put(sessionContainer.getKey(), Long.valueOf(sessionContainer.getPreviousSessionEndTimestamp()));
            }
        }
        return this.sessionKeyEndTimeMap;
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void start() {
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void stop() {
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public synchronized Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put("sessionMap", this.sessionMap);
        hashMap.put("sessionContainer", this.sessionContainer);
        hashMap.put("expiredEventChunk", this.expiredEventChunk);
        return hashMap;
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public synchronized void restoreState(Map<String, Object> map) {
        this.sessionMap = (ConcurrentHashMap) map.get("sessionMap");
        this.sessionContainer = (SessionContainer) map.get("sessionContainer");
        this.expiredEventChunk = (SessionComplexEventChunk) map.get("expiredEventChunk");
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public synchronized StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        return ((Operator) compiledCondition).find(stateEvent, this.expiredEventChunk, this.streamEventCloner);
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> list, Map<String, Table> map, String str) {
        return OperatorParser.constructOperator(this.expiredEventChunk, expression, matchingMetaInfoHolder, siddhiAppContext, list, map, this.queryName);
    }
}
