package io.cellery.observability.model.generator;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

@Extension(name = "traceGroupWindow", namespace = "observe", description = "This window groups the spans of the specific trace key and sends as expired events after the window. The spans are collected until idle time configured in window.period, and if no spans are received within the idle time, the spans are grouped together and released from window as expired events.", parameters = {@Parameter(name = "window.period", description = "The time period for which the trace considered is valid. This is specified in seconds, minutes, or milliseconds (i.e., 'min', 'sec', or 'ms'.", type = {DataType.INT, DataType.LONG, DataType.TIME}), @Parameter(name = "window.key", description = "The grouping attribute for events. ie, trace-key attribute", type = {DataType.STRING})}, examples = {@Example(syntax = "from ProcessedZipkinStream#observe:traceGroupWindow(60 sec,traceId,startTime) \nselect * \ninsert all events into OutputStream;", description = "This will send all events to output stream immediately, and also collect the events until the configured window.time, and group all events as a single event chunk as expired events.")})
/* loaded from: input_file:io/cellery/observability/model/generator/TraceGroupWindowProcessor.class */
public class TraceGroupWindowProcessor extends StreamProcessor implements SchedulingProcessor, FindableProcessor {
    private static final Logger log = Logger.getLogger(TraceGroupWindowProcessor.class);
    private long idleTimeGap = 0;
    private VariableExpressionExecutor tracekeyExecutor;
    private Scheduler scheduler;
    private Map<String, TraceGroup> traceGroupMap;
    private ComplexEventChunk<StreamEvent> expiredEventChunk;

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    protected List<Attribute> init(AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.traceGroupMap = new ConcurrentHashMap();
        this.expiredEventChunk = new ComplexEventChunk<>(false);
        if (this.attributeExpressionExecutors.length != 2) {
            throw new SiddhiAppValidationException("Tracegroup window should have two parameters (<int|long|time> idleTimeGap, <String> traceKeybut found " + this.attributeExpressionExecutors.length + " input attributes");
        }
        if (!(this.attributeExpressionExecutors[0] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppValidationException("Tracegroup window's 1st parameter, idle time gap should be a constant parameter attribute but found a dynamic attribute " + this.attributeExpressionExecutors[0].getClass().getCanonicalName());
        }
        if (this.attributeExpressionExecutors[0].getReturnType() != Attribute.Type.INT && this.attributeExpressionExecutors[0].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("Tracegroup window's idle time gap parameter should be either int or long, but found " + this.attributeExpressionExecutors[0].getReturnType());
        }
        this.idleTimeGap = ((Long) this.attributeExpressionExecutors[0].getValue()).longValue();
        if (!(this.attributeExpressionExecutors[1] instanceof VariableExpressionExecutor)) {
            throw new SiddhiAppValidationException("Tracegroup window's 2nd parameter, trace key should be a dynamic parameter attribute but found a constant attribute " + this.attributeExpressionExecutors[1].getClass().getCanonicalName());
        }
        if (this.attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppValidationException("Tracegroup window's trace key parameter type should be string, but found " + this.attributeExpressionExecutors[1].getReturnType());
        }
        this.tracekeyExecutor = this.attributeExpressionExecutors[1];
        return new ArrayList();
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
        boolean z = false;
        synchronized (this) {
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                long timestamp = next.getTimestamp();
                long j = timestamp + this.idleTimeGap;
                if (next.getType() == ComplexEvent.Type.CURRENT) {
                    String str = (String) this.tracekeyExecutor.execute(next);
                    TraceGroup traceGroup = this.traceGroupMap.get(str);
                    TraceGroup traceGroup2 = traceGroup;
                    if (traceGroup == null) {
                        traceGroup2 = new TraceGroup(str, timestamp);
                    }
                    this.traceGroupMap.put(str, traceGroup2);
                    StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(next);
                    copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                    synchronized (str.intern()) {
                        if (traceGroup2.isEmpty() || timestamp >= traceGroup2.getStartTimestamp()) {
                            traceGroup2.add(copyStreamEvent);
                            traceGroup2.setEndTimestamp(j);
                            this.scheduler.notifyAt(j);
                        } else {
                            addLateEvent(complexEventChunk, timestamp, copyStreamEvent, traceGroup2);
                        }
                    }
                } else if (next.getType() == ComplexEvent.Type.TIMER) {
                    z = true;
                    currentTraceTimeout(timestamp);
                }
            }
        }
        if (!z) {
            this.nextProcessor.process(complexEventChunk);
        }
        if (this.expiredEventChunk == null || this.expiredEventChunk.getFirst() == null) {
            return;
        }
        this.nextProcessor.process(this.expiredEventChunk);
        this.expiredEventChunk.clear();
    }

    private void addLateEvent(ComplexEventChunk<StreamEvent> complexEventChunk, long j, StreamEvent streamEvent, TraceGroup traceGroup) {
        if (j >= traceGroup.getStartTimestamp() - this.idleTimeGap) {
            traceGroup.add(streamEvent);
            traceGroup.setStartTimestamp(j);
        } else {
            complexEventChunk.remove();
            log.info("The event, " + streamEvent + " is late and it's tracegroup window has been timeout");
        }
    }

    private void currentTraceTimeout(long j) {
        Iterator it = new TreeSet(this.traceGroupMap.values()).iterator();
        while (it.hasNext()) {
            TraceGroup traceGroup = (TraceGroup) it.next();
            if (j < traceGroup.getEndTimestamp()) {
                return;
            }
            synchronized (traceGroup.getKey().intern()) {
                TraceGroup traceGroup2 = this.traceGroupMap.get(traceGroup.getKey());
                ComplexEventChunk<StreamEvent> currentTraceGroup = traceGroup2.getCurrentTraceGroup();
                if (currentTraceGroup.getFirst() != null) {
                    this.expiredEventChunk.add(currentTraceGroup.getFirst());
                    traceGroup2.clear();
                }
                this.traceGroupMap.remove(traceGroup.getKey());
            }
        }
    }

    public void start() {
    }

    public void stop() {
    }

    public synchronized Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put("traceGroupMap", this.traceGroupMap);
        hashMap.put("expiredEventChunk", this.expiredEventChunk);
        return hashMap;
    }

    public synchronized void restoreState(Map<String, Object> map) {
        this.traceGroupMap = (ConcurrentHashMap) map.get("traceGroupMap");
        this.expiredEventChunk = (ComplexEventChunk) map.get("expiredEventChunk");
    }

    public synchronized StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        return ((Operator) compiledCondition).find(stateEvent, this.expiredEventChunk, this.streamEventCloner);
    }

    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> list, Map<String, Table> map, String str) {
        return OperatorParser.constructOperator(this.expiredEventChunk, expression, matchingMetaInfoHolder, siddhiAppContext, list, map, this.queryName);
    }
}
