package org.apache.stratos.cep.extension;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.config.SiddhiContext;
import org.wso2.siddhi.core.event.StreamEvent;
import org.wso2.siddhi.core.event.in.InEvent;
import org.wso2.siddhi.core.event.in.InListEvent;
import org.wso2.siddhi.core.persistence.ThreadBarrier;
import org.wso2.siddhi.core.query.QueryPostProcessingElement;
import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.expression.Expression;
import org.wso2.siddhi.query.api.expression.Variable;
import org.wso2.siddhi.query.api.expression.constant.IntConstant;
import org.wso2.siddhi.query.api.expression.constant.LongConstant;
import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;

@SiddhiExtension(namespace = "stratos", function = "faultHandling")
/* loaded from: input_file:org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.class */
public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
    private static final int MILI_TO_MINUTE = 1000;
    private static final int TIME_OUT = 60;
    static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
    private ScheduledExecutorService eventRemoverScheduler;
    private int subjectedAttrIndex;
    private ThreadBarrier threadBarrier;
    private long timeToKeep;
    private ISchedulerSiddhiQueue<StreamEvent> window;
    private ConcurrentHashMap<String, InEvent> timeStampMap = new ConcurrentHashMap<>();
    private String memberID;

    protected void processEvent(InEvent inEvent) {
        addDataToMap(inEvent);
    }

    protected void processEvent(InListEvent inListEvent) {
        System.out.println(inListEvent);
        int activeEvents = inListEvent.getActiveEvents();
        for (int i = 0; i < activeEvents; i++) {
            addDataToMap((InEvent) inListEvent.getEvent(i));
        }
    }

    protected void addDataToMap(InEvent inEvent) {
        if (this.memberID == null) {
            System.out.println("Member ID null");
            log.error("NULL Member ID");
        } else {
            String str = (String) inEvent.getData()[this.subjectedAttrIndex];
            this.timeStampMap.put(str, inEvent);
            log.debug("Add member : " + str);
        }
    }

    public Iterator<StreamEvent> iterator() {
        return this.window.iterator();
    }

    public Iterator<StreamEvent> iterator(String str) {
        return this.siddhiContext.isDistributedProcessingEnabled() ? this.window.iterator(str) : this.window.iterator();
    }

    public void run() {
        while (true) {
            try {
                this.threadBarrier.pass();
                Iterator<Map.Entry<String, InEvent>> it = this.timeStampMap.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<String, InEvent> next = it.next();
                    long currentTimeMillis = System.currentTimeMillis();
                    InEvent value = next.getValue();
                    if ((currentTimeMillis - value.getTimeStamp()) / 1000 > 60) {
                        log.info("Member Inactive : " + ((Object) next.getKey()) + " : for " + TIME_OUT + " seconds");
                        it.remove();
                        log.debug("Inactive member : " + ((Object) next.getKey()) + " : for " + TIME_OUT + " seconds");
                        this.nextProcessor.process(value);
                    }
                }
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            } catch (Throwable th) {
                log.error(th.getMessage(), th);
                return;
            }
        }
    }

    protected Object[] currentState() {
        return new Object[]{this.window.currentState()};
    }

    protected void restoreState(Object[] objArr) {
        this.window.restoreState(objArr);
        this.window.restoreState((Object[]) objArr[0]);
        this.window.reSchedule();
    }

    protected void init(Expression[] expressionArr, QueryPostProcessingElement queryPostProcessingElement, AbstractDefinition abstractDefinition, String str, boolean z, SiddhiContext siddhiContext) {
        if (expressionArr[0] instanceof IntConstant) {
            this.timeToKeep = ((IntConstant) expressionArr[0]).getValue().intValue();
        } else {
            this.timeToKeep = ((LongConstant) expressionArr[0]).getValue().longValue();
        }
        this.memberID = ((Variable) expressionArr[1]).getAttributeName();
        this.subjectedAttrIndex = abstractDefinition.getAttributePosition(((Variable) expressionArr[1]).getAttributeName());
        if (this.siddhiContext.isDistributedProcessingEnabled()) {
            this.window = new SchedulerSiddhiQueueGrid(str, this, this.siddhiContext, this.async);
        } else {
            this.window = new SchedulerSiddhiQueue(this);
        }
        this.window.schedule();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void schedule() {
        this.eventRemoverScheduler.schedule((Runnable) this, this.timeToKeep, TimeUnit.MILLISECONDS);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void scheduleNow() {
        this.eventRemoverScheduler.schedule((Runnable) this, 0L, TimeUnit.MILLISECONDS);
    }

    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
        this.eventRemoverScheduler = scheduledExecutorService;
    }

    public void setThreadBarrier(ThreadBarrier threadBarrier) {
        this.threadBarrier = threadBarrier;
    }

    public void destroy() {
        this.window = null;
    }
}
