package org.wso2.extension.siddhi.execution.priority;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ReturnAttribute;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.SchedulingProcessor;
import io.siddhi.core.query.processor.stream.StreamProcessor;
import io.siddhi.core.util.Scheduler;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.log4j.Logger;

@Extension(name = "time", namespace = "priority", description = "The PriorityStreamProcessor keeps track of the priority of events in a stream. When an event with a new unique key arrives, PriorityStreamProcessor checks the priority and if the priority is 0 the event is sent out without being stored internally. If the event has a priority greater than 0, it is stored in the stream processor and the current priority is injected into that event.  When an event with an existing priority key arrives, it is stored as the recent event and the priority is increased by the priority of the received event, and the priorityKey and the  currentPriority is injected into the event. After every given timeout, priority of every event is reduced by 1 and the updated priority is sent out with the last known attributes of those events. It continues until their priorities reduce to 0.", parameters = {@Parameter(name = "unique.key", description = "The unique key variable to identify the event.", type = {DataType.STRING, DataType.DOUBLE, DataType.FLOAT, DataType.INT, DataType.LONG, DataType.OBJECT}), @Parameter(name = "priority", description = "The variable that contains the priority increment.", type = {DataType.INT, DataType.LONG}), @Parameter(name = "timeout.constant", description = "The constant value to decrease the priority by one, after the given timeout.", type = {DataType.INT, DataType.LONG})}, returnAttributes = {@ReturnAttribute(name = PriorityStreamProcessor.ATTRIBUTE_PRIORITY_KEY, description = "The key for which the priority is calculated.", type = {DataType.STRING}), @ReturnAttribute(name = PriorityStreamProcessor.ATTRIBUTE_CURRENT_PRIORITY, description = "The current priority associated to the given key", type = {DataType.STRING})}, examples = {@Example(description = "This keeps track of the priority of events in a stream and injects the priority key and the current priority to the output event.", syntax = "time(symbol, priority, 1 sec)")})
/* loaded from: input_file:org/wso2/extension/siddhi/execution/priority/PriorityStreamProcessor.class */
public class PriorityStreamProcessor extends StreamProcessor<ExtensionState> implements SchedulingProcessor {
    private static final Logger log = Logger.getLogger(PriorityStreamProcessor.class);
    public static final String ATTRIBUTE_PRIORITY_KEY = "priorityKey";
    public static final String ATTRIBUTE_CURRENT_PRIORITY = "currentPriority";
    private long timeInMilliSeconds;
    private Scheduler scheduler;
    private SiddhiAppContext siddhiAppContext;
    private VariableExpressionExecutor keyExpressionExecutor;
    private VariableExpressionExecutor priorityExpressionExecutor;
    private volatile long lastTimestamp = Long.MIN_VALUE;
    private final Queue<Object> keyBuffer = new ArrayDeque();
    private List<Attribute> attributes = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/extension/siddhi/execution/priority/PriorityStreamProcessor$EventHolder.class */
    public class EventHolder {
        private Object key;
        private Object priority;
        private StreamEvent event;

        public EventHolder(Object obj, StreamEvent streamEvent) {
            if (obj == null) {
                throw new IllegalArgumentException("Priority unique key '" + PriorityStreamProcessor.this.keyExpressionExecutor.getAttribute().getName() + "' cannot be null");
            }
            this.priority = PriorityStreamProcessor.this.priorityExpressionExecutor.execute(streamEvent);
            if (this.priority == null) {
                throw new IllegalArgumentException("Priority value '" + PriorityStreamProcessor.this.priorityExpressionExecutor.getAttribute().getName() + "' cannot be null, but event with priority key " + obj + " contains a null priority");
            }
            this.key = obj;
            this.event = streamEvent;
        }

        public Object getPriority() {
            return this.priority;
        }

        public long getTimestamp() {
            return this.event.getTimestamp();
        }

        public void decreasePriority(long j) {
            if (this.priority instanceof Integer) {
                this.priority = Integer.valueOf(((Integer) this.priority).intValue() - 1);
            } else {
                this.priority = Long.valueOf(((Long) this.priority).longValue() - 1);
            }
            this.event.setTimestamp(j);
        }

        public boolean isExpired() {
            boolean z;
            if (this.priority instanceof Integer) {
                z = ((Integer) this.priority).intValue() <= 0;
            } else {
                z = ((Long) this.priority).longValue() <= 0;
            }
            return z;
        }

        public StreamEvent copyStreamEvent() {
            StreamEvent copyStreamEvent = PriorityStreamProcessor.this.streamEventClonerHolder.getStreamEventCloner().copyStreamEvent(this.event);
            PriorityStreamProcessor.this.complexEventPopulater.populateComplexEvent(copyStreamEvent, new Object[]{this.key, this.priority});
            copyStreamEvent.setTimestamp(PriorityStreamProcessor.this.siddhiAppContext.getTimestampGenerator().currentTime());
            return copyStreamEvent;
        }

        public void setEvent(StreamEvent streamEvent) {
            this.event = streamEvent;
            if (this.priority instanceof Integer) {
                this.priority = Integer.valueOf(((Integer) this.priority).intValue() + ((Integer) PriorityStreamProcessor.this.priorityExpressionExecutor.execute(streamEvent)).intValue());
                if (((Integer) this.priority).intValue() < 0) {
                    this.priority = 0;
                    return;
                }
                return;
            }
            this.priority = Long.valueOf(((Long) this.priority).longValue() + ((Long) PriorityStreamProcessor.this.priorityExpressionExecutor.execute(streamEvent)).longValue());
            if (((Long) this.priority).longValue() < 0) {
                this.priority = 0L;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/extension/siddhi/execution/priority/PriorityStreamProcessor$ExtensionState.class */
    public class ExtensionState extends State {
        private Map<Object, EventHolder> eventHolderMap = new HashMap();

        ExtensionState() {
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            HashMap hashMap;
            synchronized (PriorityStreamProcessor.this) {
                hashMap = new HashMap();
                hashMap.put("eventHolderMap", this.eventHolderMap);
            }
            return hashMap;
        }

        public void restore(Map<String, Object> map) {
            synchronized (PriorityStreamProcessor.this) {
                this.eventHolderMap = (Map) map.get("eventHolderMap");
            }
        }
    }

    protected StateFactory<ExtensionState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean z, boolean z2, SiddhiQueryContext siddhiQueryContext) {
        this.siddhiAppContext = siddhiQueryContext.getSiddhiAppContext();
        if (expressionExecutorArr.length != 3) {
            throw new UnsupportedOperationException("Invalid number of arguments passed to priority stream fin. Required 3, but found " + expressionExecutorArr.length);
        }
        if (!(expressionExecutorArr[0] instanceof VariableExpressionExecutor)) {
            throw new UnsupportedOperationException("First parameter of priority stream processor must be a variable but found " + expressionExecutorArr[0].getClass().getCanonicalName());
        }
        this.keyExpressionExecutor = (VariableExpressionExecutor) expressionExecutorArr[0];
        if (!(expressionExecutorArr[1] instanceof VariableExpressionExecutor)) {
            throw new UnsupportedOperationException("Second parameter of priority stream processor must be a variable but found " + expressionExecutorArr[1].getClass().getCanonicalName());
        }
        Attribute.Type returnType = expressionExecutorArr[1].getReturnType();
        if (returnType != Attribute.Type.INT && returnType != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("Second parameter of priority stream processor should be either int or long, but found " + returnType);
        }
        this.priorityExpressionExecutor = (VariableExpressionExecutor) expressionExecutorArr[1];
        if (!(expressionExecutorArr[2] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppValidationException("Third parameter of priority stream processor must be a variable but found " + expressionExecutorArr[2].getClass().getCanonicalName());
        }
        Attribute.Type returnType2 = expressionExecutorArr[2].getReturnType();
        if (returnType2 == Attribute.Type.INT) {
            this.timeInMilliSeconds = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[2]).getValue()).intValue();
        } else {
            if (returnType2 != Attribute.Type.LONG) {
                throw new SiddhiAppValidationException("Third parameter of priority stream processor should be either int or long, but found " + returnType2);
            }
            this.timeInMilliSeconds = ((Long) ((ConstantExpressionExecutor) expressionExecutorArr[2]).getValue()).longValue();
        }
        this.attributes.add(new Attribute(ATTRIBUTE_PRIORITY_KEY, this.keyExpressionExecutor.getAttribute().getType()));
        this.attributes.add(new Attribute(ATTRIBUTE_CURRENT_PRIORITY, this.priorityExpressionExecutor.getAttribute().getType()));
        return () -> {
            return new ExtensionState();
        };
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, ExtensionState extensionState) {
        synchronized (this) {
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
                if (next.getType() == ComplexEvent.Type.TIMER) {
                    while (!this.keyBuffer.isEmpty()) {
                        Object peek = this.keyBuffer.peek();
                        EventHolder eventHolder = (EventHolder) extensionState.eventHolderMap.get(peek);
                        if ((eventHolder.getTimestamp() + this.timeInMilliSeconds) - currentTime > 0) {
                            break;
                        }
                        if (null == this.keyBuffer.poll() && log.isDebugEnabled()) {
                            log.debug("The Queue is empty. There are no events to remove.");
                        }
                        eventHolder.decreasePriority(currentTime);
                        if (eventHolder.isExpired()) {
                            extensionState.eventHolderMap.remove(peek);
                        } else if (!this.keyBuffer.offer(peek)) {
                            log.error("Task of adding '" + peek + "' to thequeue couldn't be complete.");
                        }
                        complexEventChunk.insertBeforeCurrent(eventHolder.copyStreamEvent());
                    }
                }
                if (next.getType() == ComplexEvent.Type.CURRENT) {
                    StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(next);
                    Object execute = this.keyExpressionExecutor.execute(next);
                    EventHolder eventHolder2 = (EventHolder) extensionState.eventHolderMap.get(execute);
                    if (eventHolder2 == null) {
                        eventHolder2 = new EventHolder(execute, copyStreamEvent);
                        if (!eventHolder2.isExpired()) {
                            extensionState.eventHolderMap.put(execute, eventHolder2);
                            if (!this.keyBuffer.offer(execute)) {
                                log.error("Task of adding '" + execute + "' to thequeue couldn't be complete.");
                            }
                        }
                    } else {
                        eventHolder2.setEvent(copyStreamEvent);
                        if (eventHolder2.isExpired()) {
                            extensionState.eventHolderMap.remove(execute);
                            this.keyBuffer.remove(execute);
                        }
                    }
                    complexEventPopulater.populateComplexEvent(next, new Object[]{execute, eventHolder2.getPriority()});
                }
                if (this.lastTimestamp < currentTime && !extensionState.eventHolderMap.isEmpty()) {
                    this.scheduler.notifyAt(currentTime + this.timeInMilliSeconds);
                    this.lastTimestamp = currentTime;
                }
            }
        }
        processor.process(complexEventChunk);
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public void start() {
    }

    public void stop() {
    }

    public List<Attribute> getReturnAttributes() {
        return this.attributes;
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    protected /* bridge */ /* synthetic */ void process(ComplexEventChunk complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, State state) {
        process((ComplexEventChunk<StreamEvent>) complexEventChunk, processor, streamEventCloner, complexEventPopulater, (ExtensionState) state);
    }
}
