package org.wso2.carbon.analytics.apim.siddhi.stream.processor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;

/* loaded from: input_file:org/wso2/carbon/analytics/apim/siddhi/stream/processor/StateTransitionsTimeBatchStreamProcessor.class */
public class StateTransitionsTimeBatchStreamProcessor extends StreamProcessor implements SchedulingProcessor {
    private Map<String, Map<String, String>> userToLastStates = null;
    private List<StreamEvent> currentEvents = null;
    private long lastScheduledTime = -1;
    private long timeToKeep;
    private Scheduler scheduler;
    private VariableExpressionExecutor[] variableExecutors;

    protected List<Attribute> init(AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ExecutionPlanContext executionPlanContext) {
        try {
            this.variableExecutors = new VariableExpressionExecutor[expressionExecutorArr.length - 1];
            System.arraycopy(expressionExecutorArr, 0, this.variableExecutors, 0, expressionExecutorArr.length - 1);
            if (expressionExecutorArr[expressionExecutorArr.length - 1].getReturnType() == Attribute.Type.INT) {
                this.timeToKeep = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[expressionExecutorArr.length - 1]).getValue()).intValue();
            } else {
                if (expressionExecutorArr[expressionExecutorArr.length - 1].getReturnType() != Attribute.Type.LONG) {
                    throw new ExecutionPlanValidationException("StateTransitionsTimeBatchStreamProcessor's last parameter windowTime should be either int or long, but found " + expressionExecutorArr[expressionExecutorArr.length - 1].getReturnType());
                }
                this.timeToKeep = ((Long) ((ConstantExpressionExecutor) expressionExecutorArr[expressionExecutorArr.length - 1]).getValue()).longValue();
            }
            this.userToLastStates = new HashMap();
            this.currentEvents = new ArrayList();
            ArrayList arrayList = new ArrayList(1);
            arrayList.add(new Attribute("startState", Attribute.Type.STRING));
            return arrayList;
        } catch (ArrayStoreException e) {
            throw new ExecutionPlanValidationException("StateTransitionsTimeBatchStreamProcessor's parameters other than last should be a variable.");
        }
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
        if (complexEventChunk.getFirst() == null) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        synchronized (this) {
            if (this.lastScheduledTime < 0) {
                Scheduler scheduler = this.scheduler;
                long currentTime = this.executionPlanContext.getTimestampGenerator().currentTime() + this.timeToKeep;
                this.lastScheduledTime = currentTime;
                scheduler.notifyAt(currentTime);
            }
            StreamEvent streamEvent = (StreamEvent) complexEventChunk.getFirst();
            while (streamEvent != null) {
                StreamEvent streamEvent2 = streamEvent;
                streamEvent = streamEvent.getNext();
                if (this.currentEvents.size() >= this.timeToKeep) {
                    flushToOutputChunk(streamEventCloner, arrayList, this.executionPlanContext.getTimestampGenerator().currentTime(), true);
                }
                if (streamEvent2.getType() == ComplexEvent.Type.TIMER) {
                    if (this.currentEvents.size() > 0) {
                        flushToOutputChunk(streamEventCloner, arrayList, this.executionPlanContext.getTimestampGenerator().currentTime(), true);
                    }
                    this.lastScheduledTime = this.executionPlanContext.getTimestampGenerator().currentTime() + this.timeToKeep;
                    this.scheduler.notifyAt(this.lastScheduledTime);
                } else if (streamEvent2.getType() == ComplexEvent.Type.CURRENT) {
                    String str = (String) this.variableExecutors[0].execute(streamEvent2);
                    String str2 = (String) this.variableExecutors[1].execute(streamEvent2);
                    String str3 = (String) this.variableExecutors[2].execute(streamEvent2);
                    Map<String, String> map = this.userToLastStates.get(str2);
                    if (map == null) {
                        map = new HashMap();
                    }
                    String str4 = map.get(str);
                    if (str4 != null) {
                        cloneAppend(streamEventCloner, streamEvent2, complexEventPopulater, str4);
                    }
                    map.put(str, str3);
                    this.userToLastStates.put(str2, map);
                }
            }
        }
        Iterator<ComplexEventChunk<StreamEvent>> it = arrayList.iterator();
        while (it.hasNext()) {
            processor.process(it.next());
        }
    }

    private void cloneAppend(StreamEventCloner streamEventCloner, StreamEvent streamEvent, ComplexEventPopulater complexEventPopulater, String str) {
        StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
        complexEventPopulater.populateComplexEvent(copyStreamEvent, new Object[]{str});
        this.currentEvents.add(copyStreamEvent);
    }

    private void flushToOutputChunk(StreamEventCloner streamEventCloner, List<ComplexEventChunk<StreamEvent>> list, long j, boolean z) {
        ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>(true);
        Iterator<StreamEvent> it = this.currentEvents.iterator();
        while (it.hasNext()) {
            complexEventChunk.add(it.next());
        }
        this.currentEvents.clear();
        if (complexEventChunk.getFirst() != null) {
            list.add(complexEventChunk);
        }
    }

    public void start() {
    }

    public void stop() {
    }

    public Object[] currentState() {
        return new Object[]{this.userToLastStates, this.currentEvents, Long.valueOf(this.lastScheduledTime)};
    }

    public void restoreState(Object[] objArr) {
        this.userToLastStates = (Map) objArr[0];
        this.currentEvents = (List) objArr[1];
        this.lastScheduledTime = ((Long) objArr[2]).longValue();
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }
}
