package org.wso2.siddhi.extension.reorder;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantLock;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.exception.ExecutionPlanCreationException;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;

/* loaded from: input_file:org/wso2/siddhi/extension/reorder/KSlackExtension.class */
public class KSlackExtension extends StreamProcessor implements SchedulingProcessor {
    private TreeMap<Long, ArrayList<StreamEvent>> eventTreeMap;
    private TreeMap<Long, ArrayList<StreamEvent>> expiredEventTreeMap;
    private ExpressionExecutor timestampExecutor;
    private Scheduler scheduler;
    private long k = 0;
    private long greatestTimestamp = 0;
    private long MAX_K = Long.MAX_VALUE;
    private long TIMER_DURATION = -1;
    private boolean expireFlag = false;
    private long lastSentTimeStamp = -1;
    private long lastScheduledTimestamp = -1;
    private ReentrantLock lock = new ReentrantLock();

    public void start() {
    }

    public void stop() {
    }

    public Object[] currentState() {
        return new Object[0];
    }

    public void restoreState(Object[] objArr) {
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
        ComplexEventChunk complexEventChunk2 = new ComplexEventChunk(false);
        this.lock.lock();
        while (complexEventChunk.hasNext()) {
            try {
                StreamEvent next = complexEventChunk.next();
                if (next.getType() != ComplexEvent.Type.TIMER) {
                    complexEventChunk.remove();
                    long longValue = ((Long) this.timestampExecutor.execute(next)).longValue();
                    if (!this.expireFlag || longValue >= this.lastSentTimeStamp) {
                        ArrayList<StreamEvent> arrayList = this.eventTreeMap.get(Long.valueOf(longValue));
                        if (arrayList == null) {
                            arrayList = new ArrayList<>();
                        }
                        arrayList.add(next);
                        this.eventTreeMap.put(Long.valueOf(longValue), arrayList);
                        if (longValue > this.greatestTimestamp) {
                            this.greatestTimestamp = longValue;
                            long longValue2 = this.greatestTimestamp - this.eventTreeMap.firstKey().longValue();
                            if (longValue2 > this.k) {
                                if (longValue2 < this.MAX_K) {
                                    this.k = longValue2;
                                } else {
                                    this.k = this.MAX_K;
                                }
                            }
                            for (Map.Entry<Long, ArrayList<StreamEvent>> entry : this.eventTreeMap.entrySet()) {
                                ArrayList<StreamEvent> arrayList2 = this.expiredEventTreeMap.get(entry.getKey());
                                if (arrayList2 != null) {
                                    arrayList2.addAll(entry.getValue());
                                } else {
                                    this.expiredEventTreeMap.put(entry.getKey(), entry.getValue());
                                }
                            }
                            this.eventTreeMap = new TreeMap<>();
                            Iterator<Map.Entry<Long, ArrayList<StreamEvent>>> it = this.expiredEventTreeMap.entrySet().iterator();
                            while (it.hasNext()) {
                                Map.Entry<Long, ArrayList<StreamEvent>> next2 = it.next();
                                if (next2.getKey().longValue() + this.k <= this.greatestTimestamp) {
                                    it.remove();
                                    ArrayList<StreamEvent> value = next2.getValue();
                                    this.lastSentTimeStamp = next2.getKey().longValue();
                                    Iterator<StreamEvent> it2 = value.iterator();
                                    while (it2.hasNext()) {
                                        complexEventChunk2.add(it2.next());
                                    }
                                }
                            }
                        }
                    }
                } else if (this.expiredEventTreeMap.size() > 0) {
                    TreeMap<Long, ArrayList<StreamEvent>> treeMap = this.expiredEventTreeMap;
                    this.expiredEventTreeMap = new TreeMap<>();
                    onTimerEvent(treeMap, processor);
                    this.lastScheduledTimestamp += this.TIMER_DURATION;
                    this.scheduler.notifyAt(this.lastScheduledTimestamp);
                }
            } catch (ArrayIndexOutOfBoundsException e) {
                throw new ExecutionPlanCreationException("The very first parameter must be an Integer with a valid  field index (0 to (fieldsLength-1)).");
            }
        }
        this.lock.unlock();
        processor.process(complexEventChunk2);
    }

    protected List<Attribute> init(AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ExecutionPlanContext executionPlanContext, boolean z) {
        ArrayList arrayList = new ArrayList();
        if (this.attributeExpressionLength > 4) {
            throw new ExecutionPlanCreationException("Maximum four input parameters can be specified for KSlack.  Timestamp field (long), k-slack buffer expiration time-out window (long), Max_K size (long), and boolean  flag to indicate whether the late events should get discarded. But found " + this.attributeExpressionLength + " attributes.");
        }
        if (expressionExecutorArr.length == 1) {
            if (expressionExecutorArr[0].getReturnType() != Attribute.Type.LONG) {
                throw new ExecutionPlanCreationException("Invalid parameter type found for the first argument of reorder:kslack() function. Required LONG, but found " + expressionExecutorArr[0].getReturnType());
            }
            this.timestampExecutor = expressionExecutorArr[0];
            arrayList.add(new Attribute("beta0", Attribute.Type.LONG));
        } else if (expressionExecutorArr.length == 2) {
            if (expressionExecutorArr[0].getReturnType() != Attribute.Type.LONG) {
                throw new ExecutionPlanCreationException("Invalid parameter type found for the first argument of  reorder:kslack() function. Required LONG, but found " + expressionExecutorArr[0].getReturnType());
            }
            this.timestampExecutor = expressionExecutorArr[0];
            arrayList.add(new Attribute("beta0", Attribute.Type.LONG));
            if (expressionExecutorArr[1].getReturnType() != Attribute.Type.LONG) {
                throw new ExecutionPlanCreationException("Invalid parameter type found for the second argument of  reorder:kslack() function. Required LONG, but found " + expressionExecutorArr[1].getReturnType());
            }
            this.TIMER_DURATION = ((Long) expressionExecutorArr[1].execute((ComplexEvent) null)).longValue();
            arrayList.add(new Attribute("beta1", Attribute.Type.LONG));
        } else if (expressionExecutorArr.length == 3) {
            if (expressionExecutorArr[0].getReturnType() != Attribute.Type.LONG) {
                throw new ExecutionPlanCreationException("Invalid parameter type found for the first argument of  reorder:kslack() function. Required LONG, but found " + expressionExecutorArr[0].getReturnType());
            }
            this.timestampExecutor = expressionExecutorArr[0];
            arrayList.add(new Attribute("beta0", Attribute.Type.LONG));
            if (expressionExecutorArr[1].getReturnType() != Attribute.Type.LONG) {
                throw new ExecutionPlanCreationException("Invalid parameter type found for the second argument of  reorder:kslack() function. Required LONG, but found " + expressionExecutorArr[1].getReturnType());
            }
            this.TIMER_DURATION = ((Long) expressionExecutorArr[1].execute((ComplexEvent) null)).longValue();
            arrayList.add(new Attribute("beta1", Attribute.Type.LONG));
            if (expressionExecutorArr[2].getReturnType() != Attribute.Type.LONG) {
                throw new ExecutionPlanCreationException("Invalid parameter type found for the third argument of  reorder:kslack() function. Required LONG, but found " + expressionExecutorArr[2].getReturnType());
            }
            this.MAX_K = ((Long) expressionExecutorArr[2].execute((ComplexEvent) null)).longValue();
            arrayList.add(new Attribute("beta2", Attribute.Type.LONG));
        } else if (expressionExecutorArr.length == 4) {
            if (expressionExecutorArr[0].getReturnType() != Attribute.Type.LONG) {
                throw new ExecutionPlanCreationException("Invalid parameter type found for the first argument of  reorder:kslack() function. Required LONG, but found " + expressionExecutorArr[0].getReturnType());
            }
            this.timestampExecutor = expressionExecutorArr[0];
            arrayList.add(new Attribute("beta0", Attribute.Type.LONG));
            if (expressionExecutorArr[1].getReturnType() != Attribute.Type.LONG) {
                throw new ExecutionPlanCreationException("Invalid parameter type found for the second argument of  reorder:kslack() function. Required LONG, but found " + expressionExecutorArr[1].getReturnType());
            }
            this.TIMER_DURATION = ((Long) expressionExecutorArr[1].execute((ComplexEvent) null)).longValue();
            arrayList.add(new Attribute("beta1", Attribute.Type.LONG));
            if (expressionExecutorArr[2].getReturnType() != Attribute.Type.LONG) {
                throw new ExecutionPlanCreationException("Invalid parameter type found for the third argument of  reorder:kslack() function. Required LONG, but found " + expressionExecutorArr[2].getReturnType());
            }
            this.MAX_K = ((Long) expressionExecutorArr[2].execute((ComplexEvent) null)).longValue();
            arrayList.add(new Attribute("beta2", Attribute.Type.LONG));
            if (expressionExecutorArr[3].getReturnType() != Attribute.Type.BOOL) {
                throw new ExecutionPlanCreationException("Invalid parameter type found for the fourth argument of  reorder:kslack() function. Required BOOL, but found " + expressionExecutorArr[3].getReturnType());
            }
            this.expireFlag = ((Boolean) expressionExecutorArr[3].execute((ComplexEvent) null)).booleanValue();
            arrayList.add(new Attribute("beta3", Attribute.Type.BOOL));
        }
        if (expressionExecutorArr[0].getReturnType() != Attribute.Type.LONG) {
            throw new ExecutionPlanCreationException("Return type expected by KSlack is LONG but found " + expressionExecutorArr[0].getReturnType());
        }
        this.timestampExecutor = expressionExecutorArr[0];
        this.eventTreeMap = new TreeMap<>();
        this.expiredEventTreeMap = new TreeMap<>();
        if (this.TIMER_DURATION != -1 && this.scheduler != null) {
            this.lastScheduledTimestamp = executionPlanContext.getTimestampGenerator().currentTime() + this.TIMER_DURATION;
            this.scheduler.notifyAt(this.lastScheduledTimestamp);
        }
        return arrayList;
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
        if (this.lastScheduledTimestamp < 0) {
            this.lastScheduledTimestamp = this.executionPlanContext.getTimestampGenerator().currentTime() + this.TIMER_DURATION;
            scheduler.notifyAt(this.lastScheduledTimestamp);
        }
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    private void onTimerEvent(TreeMap<Long, ArrayList<StreamEvent>> treeMap, Processor processor) {
        Iterator<Map.Entry<Long, ArrayList<StreamEvent>>> it = treeMap.entrySet().iterator();
        ComplexEventChunk complexEventChunk = new ComplexEventChunk(false);
        while (it.hasNext()) {
            Iterator<StreamEvent> it2 = it.next().getValue().iterator();
            while (it2.hasNext()) {
                complexEventChunk.add(it2.next());
            }
        }
        processor.process(complexEventChunk);
    }
}
