package io.cellery.observability.telemetry.deduplicator;

import io.cellery.observability.telemetry.deduplicator.internal.Constants;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;

@Extension(name = "deduplicate", namespace = "telemetry", description = "This is a sliding time window that holds the latest unique events that arrived during the last window time period. The unique events are determined based on some attributes (spanId, parentSpanId, etc..). The window is updated with each event arrival and expiry. When a new duplicate event that arrives within a window time period, the previous event and the new event will be merged into one and processed forward", examples = {@Example(syntax = "from TelemetryStream#telemetry:deduplicate(60 sec, runtime, traceId, spanId, parentSpanId, sourceNamespace, sourceInstance, sourceComponent, destinationNamespace, destinationInstance, destinationComponent, requestSizeBytes, responseDuration, responseSizeBytes)insert into ProcessedTelemetryStream;", description = "This window will hold every event from Telemetry stream for 60 seconds and remove duplicate events received within the time interval.")})
/* loaded from: input_file:io/cellery/observability/telemetry/deduplicator/DeduplicationStreamProcessor.class */
public class DeduplicationStreamProcessor extends StreamProcessor implements SchedulingProcessor {
    private long windowTimeMilliSeconds;
    private ComplexEventChunk<StreamEvent> expiredEventChunk;
    private Scheduler scheduler;
    private SiddhiAppContext siddhiAppContext;
    private volatile long lastTimestamp = 0;
    private ExpressionExecutor runtimeExecutor;
    private ExpressionExecutor traceIdExecutor;
    private ExpressionExecutor spanIdExecutor;
    private ExpressionExecutor parentSpanIdExecutor;
    private ExpressionExecutor sourceNamespaceExecutor;
    private ExpressionExecutor sourceInstanceExecutor;
    private ExpressionExecutor sourceComponentExecutor;
    private ExpressionExecutor destinationNamespaceExecutor;
    private ExpressionExecutor destinationInstanceExecutor;
    private ExpressionExecutor destinationComponentExecutor;
    private ExpressionExecutor requestSizeExecutor;
    private ExpressionExecutor responseDurationExecutor;
    private ExpressionExecutor responseSizeExecutor;

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
        synchronized (this) {
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
                StreamEvent streamEvent = null;
                if (next.getType() == ComplexEvent.Type.CURRENT) {
                    streamEvent = streamEventCloner.copyStreamEvent(next);
                    streamEvent.setType(ComplexEvent.Type.EXPIRED);
                }
                this.expiredEventChunk.reset();
                boolean z = false;
                while (this.expiredEventChunk.hasNext()) {
                    StreamEvent streamEvent2 = (StreamEvent) this.expiredEventChunk.next();
                    long timestamp = (streamEvent2.getTimestamp() + this.windowTimeMilliSeconds) - currentTime;
                    if (next.getType() != ComplexEvent.Type.CURRENT) {
                        if (timestamp > 0) {
                            break;
                        }
                        this.expiredEventChunk.remove();
                        streamEvent2.setTimestamp(currentTime);
                        complexEventChunk.insertBeforeCurrent(mergeEvents(null, streamEvent2));
                    } else if (isDuplicate(streamEvent2, streamEvent)) {
                        z = true;
                        this.expiredEventChunk.remove();
                        streamEvent.setTimestamp(currentTime);
                        complexEventChunk.insertBeforeCurrent(mergeEvents(streamEvent2, streamEvent));
                    }
                }
                if (!z && next.getType() == ComplexEvent.Type.CURRENT) {
                    this.expiredEventChunk.add(streamEvent);
                    if (this.lastTimestamp < streamEvent.getTimestamp() && this.scheduler != null) {
                        this.scheduler.notifyAt(streamEvent.getTimestamp() + this.windowTimeMilliSeconds);
                        this.lastTimestamp = streamEvent.getTimestamp();
                    }
                }
                this.expiredEventChunk.reset();
            }
        }
        processor.process(complexEventChunk);
    }

    private boolean isDuplicate(StreamEvent streamEvent, StreamEvent streamEvent2) {
        String str = (String) this.runtimeExecutor.execute(streamEvent);
        String str2 = (String) this.traceIdExecutor.execute(streamEvent);
        String str3 = (String) this.spanIdExecutor.execute(streamEvent);
        String str4 = (String) this.parentSpanIdExecutor.execute(streamEvent);
        String str5 = (String) this.sourceNamespaceExecutor.execute(streamEvent);
        String str6 = (String) this.sourceInstanceExecutor.execute(streamEvent);
        String str7 = (String) this.sourceComponentExecutor.execute(streamEvent);
        String str8 = (String) this.destinationNamespaceExecutor.execute(streamEvent);
        String str9 = (String) this.destinationInstanceExecutor.execute(streamEvent);
        String str10 = (String) this.destinationComponentExecutor.execute(streamEvent);
        String str11 = (String) this.runtimeExecutor.execute(streamEvent2);
        String str12 = (String) this.traceIdExecutor.execute(streamEvent2);
        String str13 = (String) this.spanIdExecutor.execute(streamEvent2);
        return Objects.equals(str11, str) && Objects.equals((String) this.sourceNamespaceExecutor.execute(streamEvent2), str5) && Objects.equals((String) this.sourceInstanceExecutor.execute(streamEvent2), str6) && Objects.equals((String) this.sourceComponentExecutor.execute(streamEvent2), str7) && Objects.equals((String) this.destinationNamespaceExecutor.execute(streamEvent2), str8) && Objects.equals((String) this.destinationInstanceExecutor.execute(streamEvent2), str9) && Objects.equals((String) this.destinationComponentExecutor.execute(streamEvent2), str10) && Objects.equals(str12, str2) && (Objects.equals((String) this.parentSpanIdExecutor.execute(streamEvent2), str3) || Objects.equals(str13, str4));
    }

    private StreamEvent mergeEvents(StreamEvent streamEvent, StreamEvent streamEvent2) {
        Long l = (Long) this.requestSizeExecutor.execute(streamEvent2);
        Long l2 = (Long) this.responseDurationExecutor.execute(streamEvent2);
        Long l3 = (Long) this.responseSizeExecutor.execute(streamEvent2);
        if (streamEvent != null) {
            Long l4 = (Long) this.requestSizeExecutor.execute(streamEvent);
            Long l5 = (Long) this.responseDurationExecutor.execute(streamEvent);
            Long l6 = (Long) this.responseSizeExecutor.execute(streamEvent);
            l = Long.valueOf(Long.max(l4.longValue(), l.longValue()));
            l2 = Long.valueOf(Long.max(l5.longValue(), l2.longValue()));
            l3 = Long.valueOf(Long.max(l6.longValue(), l3.longValue()));
        }
        this.complexEventPopulater.populateComplexEvent(streamEvent2, new Object[]{l, l2, l3});
        return streamEvent2;
    }

    protected List<Attribute> init(AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppContext = siddhiAppContext;
        this.expiredEventChunk = new ComplexEventChunk<>(false);
        if (expressionExecutorArr.length != 14) {
            throw new SiddhiAppValidationException("14 arguments are required, but " + expressionExecutorArr.length + " given");
        }
        if (!(expressionExecutorArr[0] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppValidationException("UniqueTime window should have constant for time parameter but found a dynamic attribute " + expressionExecutorArr[0].getClass().getCanonicalName());
        }
        if (expressionExecutorArr[0].getReturnType() == Attribute.Type.INT) {
            this.windowTimeMilliSeconds = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[0]).getValue()).intValue();
        } else {
            if (expressionExecutorArr[0].getReturnType() != Attribute.Type.LONG) {
                throw new SiddhiAppValidationException("UniqueTime window's parameter time should be either int or long, but found " + expressionExecutorArr[0].getReturnType());
            }
            this.windowTimeMilliSeconds = ((Long) ((ConstantExpressionExecutor) expressionExecutorArr[0]).getValue()).longValue();
        }
        if (expressionExecutorArr[1].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppValidationException("Expected a field with String return type for the runtime field, but found a field with return type - " + expressionExecutorArr[1].getReturnType());
        }
        this.runtimeExecutor = expressionExecutorArr[1];
        if (expressionExecutorArr[2].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppValidationException("Expected a field with String return type for the traceId field, but found a field with return type - " + expressionExecutorArr[2].getReturnType());
        }
        this.traceIdExecutor = expressionExecutorArr[2];
        if (expressionExecutorArr[3].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppValidationException("Expected a field with String return type for the spanId field, but found a field with return type - " + expressionExecutorArr[3].getReturnType());
        }
        this.spanIdExecutor = expressionExecutorArr[3];
        if (expressionExecutorArr[4].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppValidationException("Expected a field with String return type for the parentId field, but found a field with return type - " + expressionExecutorArr[4].getReturnType());
        }
        this.parentSpanIdExecutor = expressionExecutorArr[4];
        if (expressionExecutorArr[5].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppValidationException("Expected a field with String return type for the sourceNamespace field, but found a field with return type - " + expressionExecutorArr[5].getReturnType());
        }
        this.sourceNamespaceExecutor = expressionExecutorArr[5];
        if (expressionExecutorArr[6].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppValidationException("Expected a field with String return type for the sourceInstance field, but found a field with return type - " + expressionExecutorArr[6].getReturnType());
        }
        this.sourceInstanceExecutor = expressionExecutorArr[6];
        if (expressionExecutorArr[7].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppValidationException("Expected a field with String return type for the  sourceComponent field, but found a field with return type - " + expressionExecutorArr[7].getReturnType());
        }
        this.sourceComponentExecutor = expressionExecutorArr[7];
        if (expressionExecutorArr[8].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppValidationException("Expected a field with String return type for the destinationNamespace field, but found a field with return type - " + expressionExecutorArr[8].getReturnType());
        }
        this.destinationNamespaceExecutor = expressionExecutorArr[8];
        if (expressionExecutorArr[9].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppValidationException("Expected a field with String return type for the destinationInstance field, but found a field with return type - " + expressionExecutorArr[9].getReturnType());
        }
        this.destinationInstanceExecutor = expressionExecutorArr[9];
        if (expressionExecutorArr[10].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppValidationException("Expected a field with String return type for the destinationComponent field, but found a field with return type - " + expressionExecutorArr[10].getReturnType());
        }
        this.destinationComponentExecutor = expressionExecutorArr[10];
        if (expressionExecutorArr[11].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("Expected a field with long return type for the requestSizeBytes field, but found a field with return type - " + expressionExecutorArr[11].getReturnType());
        }
        this.requestSizeExecutor = expressionExecutorArr[11];
        if (expressionExecutorArr[12].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("Expected a field with long return type for the responseDuration field, but found a field with return type - " + expressionExecutorArr[12].getReturnType());
        }
        this.responseDurationExecutor = expressionExecutorArr[12];
        if (expressionExecutorArr[13].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("Expected a field with long return type for the responseSizeBytes field, but found a field with return type - " + expressionExecutorArr[13].getReturnType());
        }
        this.responseSizeExecutor = expressionExecutorArr[13];
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(new Attribute(Constants.MAX_REQUEST_SIZE, Attribute.Type.LONG));
        arrayList.add(new Attribute(Constants.MAX_RESPONSE_DURATION, Attribute.Type.LONG));
        arrayList.add(new Attribute(Constants.MAX_RESPONSE_SIZE, Attribute.Type.LONG));
        return arrayList;
    }

    public synchronized Scheduler getScheduler() {
        return this.scheduler;
    }

    public synchronized void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    public void start() {
    }

    public void stop() {
    }

    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put("expiredEventchunck", this.expiredEventChunk.getFirst());
        return hashMap;
    }

    public void restoreState(Map<String, Object> map) {
        this.expiredEventChunk.clear();
        this.expiredEventChunk.add((StreamEvent) map.get("expiredEventchunck"));
    }
}
