/*
 * Decompiled with CFR 0.152.
 */
package io.cellery.observability.model.generator;

import io.cellery.observability.model.generator.TraceGroup;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

@Extension(name="traceGroupWindow", namespace="observe", description="This window groups the spans of the specific trace key and sends as expired events after the window. The spans are collected until idle time configured in window.period, and if no spans are received within the idle time, the spans are grouped together and released from window as expired events.", parameters={@Parameter(name="window.period", description="The time period for which the trace considered is valid. This is specified in seconds, minutes, or milliseconds (i.e., 'min', 'sec', or 'ms'.", type={DataType.INT, DataType.LONG, DataType.TIME}), @Parameter(name="window.key", description="The grouping attribute for events. ie, trace-key attribute", type={DataType.STRING})}, examples={@Example(syntax="from ProcessedZipkinStream#observe:traceGroupWindow(60 sec,traceId,startTime) \nselect * \ninsert all events into OutputStream;", description="This will send all events to output stream immediately, and also collect the events until the configured window.time, and group all events as a single event chunk as expired events.")})
public class TraceGroupWindowProcessor
extends StreamProcessor
implements SchedulingProcessor,
FindableProcessor {
    private static final Logger log = Logger.getLogger(TraceGroupWindowProcessor.class);
    private long idleTimeGap = 0L;
    private VariableExpressionExecutor tracekeyExecutor;
    private Scheduler scheduler;
    private Map<String, TraceGroup> traceGroupMap;
    private ComplexEventChunk<StreamEvent> expiredEventChunk;

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected List<Attribute> init(AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutors, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.traceGroupMap = new ConcurrentHashMap<String, TraceGroup>();
        this.expiredEventChunk = new ComplexEventChunk(false);
        if (this.attributeExpressionExecutors.length != 2) throw new SiddhiAppValidationException("Tracegroup window should have two parameters (<int|long|time> idleTimeGap, <String> traceKeybut found " + this.attributeExpressionExecutors.length + " input attributes");
        if (!(this.attributeExpressionExecutors[0] instanceof ConstantExpressionExecutor)) throw new SiddhiAppValidationException("Tracegroup window's 1st parameter, idle time gap should be a constant parameter attribute but found a dynamic attribute " + this.attributeExpressionExecutors[0].getClass().getCanonicalName());
        if (this.attributeExpressionExecutors[0].getReturnType() != Attribute.Type.INT && this.attributeExpressionExecutors[0].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("Tracegroup window's idle time gap parameter should be either int or long, but found " + this.attributeExpressionExecutors[0].getReturnType());
        }
        this.idleTimeGap = (Long)((ConstantExpressionExecutor)this.attributeExpressionExecutors[0]).getValue();
        if (!(this.attributeExpressionExecutors[1] instanceof VariableExpressionExecutor)) throw new SiddhiAppValidationException("Tracegroup window's 2nd parameter, trace key should be a dynamic parameter attribute but found a constant attribute " + this.attributeExpressionExecutors[1].getClass().getCanonicalName());
        if (this.attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppValidationException("Tracegroup window's trace key parameter type should be string, but found " + this.attributeExpressionExecutors[1].getReturnType());
        }
        this.tracekeyExecutor = (VariableExpressionExecutor)this.attributeExpressionExecutors[1];
        return new ArrayList<Attribute>();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
        boolean isTimerEvent = false;
        TraceGroupWindowProcessor traceGroupWindowProcessor = this;
        // MONITORENTER : traceGroupWindowProcessor
        while (streamEventChunk.hasNext()) {
            StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
            long eventTimestamp = streamEvent.getTimestamp();
            long maxTimestamp = eventTimestamp + this.idleTimeGap;
            if (streamEvent.getType() == ComplexEvent.Type.CURRENT) {
                String key = (String)this.tracekeyExecutor.execute((ComplexEvent)streamEvent);
                TraceGroup traceGroup = this.traceGroupMap.get(key);
                if (traceGroup == null) {
                    traceGroup = new TraceGroup(key, eventTimestamp);
                }
                this.traceGroupMap.put(key, traceGroup);
                StreamEvent clonedStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
                clonedStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                String string = key.intern();
                // MONITORENTER : string
                if (traceGroup.isEmpty() || eventTimestamp >= traceGroup.getStartTimestamp()) {
                    traceGroup.add(clonedStreamEvent);
                    traceGroup.setEndTimestamp(maxTimestamp);
                    this.scheduler.notifyAt(maxTimestamp);
                } else {
                    this.addLateEvent(streamEventChunk, eventTimestamp, clonedStreamEvent, traceGroup);
                }
                // MONITOREXIT : string
                continue;
            }
            if (streamEvent.getType() != ComplexEvent.Type.TIMER) continue;
            isTimerEvent = true;
            this.currentTraceTimeout(eventTimestamp);
        }
        // MONITOREXIT : traceGroupWindowProcessor
        if (!isTimerEvent) {
            this.nextProcessor.process(streamEventChunk);
        }
        if (this.expiredEventChunk == null) return;
        if (this.expiredEventChunk.getFirst() == null) return;
        this.nextProcessor.process(this.expiredEventChunk);
        this.expiredEventChunk.clear();
    }

    private void addLateEvent(ComplexEventChunk<StreamEvent> streamEventChunk, long eventTimestamp, StreamEvent streamEvent, TraceGroup traceGroup) {
        if (eventTimestamp >= traceGroup.getStartTimestamp() - this.idleTimeGap) {
            traceGroup.add(streamEvent);
            traceGroup.setStartTimestamp(eventTimestamp);
        } else {
            streamEventChunk.remove();
            log.info((Object)("The event, " + streamEvent + " is late and it's tracegroup window has been timeout"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void currentTraceTimeout(long eventTimestamp) {
        TraceGroup aTraceGroup;
        long traceEndTime;
        Collection<TraceGroup> traceGroupList = this.traceGroupMap.values();
        TreeSet<TraceGroup> currentEndTimestamps = new TreeSet<TraceGroup>(traceGroupList);
        Iterator<TraceGroup> iterator = currentEndTimestamps.iterator();
        while (iterator.hasNext() && eventTimestamp >= (traceEndTime = (aTraceGroup = iterator.next()).getEndTimestamp())) {
            String string = aTraceGroup.getKey().intern();
            synchronized (string) {
                TraceGroup currentTraceGroup = this.traceGroupMap.get(aTraceGroup.getKey());
                ComplexEventChunk<StreamEvent> events = currentTraceGroup.getCurrentTraceGroup();
                if (events.getFirst() != null) {
                    this.expiredEventChunk.add(events.getFirst());
                    currentTraceGroup.clear();
                }
                this.traceGroupMap.remove(aTraceGroup.getKey());
            }
        }
    }

    public void start() {
    }

    public void stop() {
    }

    public synchronized Map<String, Object> currentState() {
        HashMap<String, Object> state = new HashMap<String, Object>();
        state.put("traceGroupMap", this.traceGroupMap);
        state.put("expiredEventChunk", this.expiredEventChunk);
        return state;
    }

    public synchronized void restoreState(Map<String, Object> state) {
        this.traceGroupMap = (ConcurrentHashMap)state.get("traceGroupMap");
        this.expiredEventChunk = (ComplexEventChunk)state.get("expiredEventChunk");
    }

    public synchronized StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
        return ((Operator)compiledCondition).find(matchingEvent, this.expiredEventChunk, this.streamEventCloner);
    }

    public CompiledCondition compileCondition(Expression condition, MatchingMetaInfoHolder matchingMetaInfoHolder, SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> tableMap, String queryName) {
        return OperatorParser.constructOperator(this.expiredEventChunk, (Expression)condition, (MatchingMetaInfoHolder)matchingMetaInfoHolder, (SiddhiAppContext)siddhiAppContext, variableExpressionExecutors, tableMap, (String)this.queryName);
    }
}

