/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.heron.api.windowing;

import com.twitter.heron.api.windowing.Event;
import com.twitter.heron.api.windowing.EventImpl;
import com.twitter.heron.api.windowing.EvictionPolicy;
import com.twitter.heron.api.windowing.TriggerHandler;
import com.twitter.heron.api.windowing.TriggerPolicy;
import com.twitter.heron.api.windowing.WindowLifecycleListener;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

public class WindowManager<T extends Serializable>
implements TriggerHandler {
    private static final Logger LOG = Logger.getLogger(WindowManager.class.getName());
    private static final String EVICTION_STATE_KEY = "es";
    private static final String TRIGGER_STATE_KEY = "ts";
    private static final String QUEUE = "queue";
    private static final String EXPIRED_EVENTS = "expired.events";
    private static final String PRE_WINDOW_EVENTS = "pre.window.events";
    private static final String EVENTS_SINCE_LAST_EXPIRY = "events.since.last.expiry";
    public static final int EXPIRE_EVENTS_THRESHOLD = 100;
    protected final Collection<Event<T>> queue;
    protected EvictionPolicy<T, ?> evictionPolicy;
    protected TriggerPolicy<T, ?> triggerPolicy;
    protected final WindowLifecycleListener<T> windowLifecycleListener;
    private final List<T> expiredEvents;
    private final Set<Event<T>> prevWindowEvents;
    private final AtomicInteger eventsSinceLastExpiry;

    public WindowManager(WindowLifecycleListener<T> lifecycleListener, Collection<Event<T>> queue) {
        this.windowLifecycleListener = lifecycleListener;
        this.queue = queue;
        this.expiredEvents = new ArrayList<T>();
        this.prevWindowEvents = new HashSet<Event<T>>();
        this.eventsSinceLastExpiry = new AtomicInteger();
    }

    public WindowManager(WindowLifecycleListener<T> lifecycleListener) {
        this(lifecycleListener, new ConcurrentLinkedQueue<Event<T>>());
    }

    public void setEvictionPolicy(EvictionPolicy<T, ?> evictionPolicy) {
        this.evictionPolicy = evictionPolicy;
    }

    public void setTriggerPolicy(TriggerPolicy<T, ?> triggerPolicy) {
        this.triggerPolicy = triggerPolicy;
    }

    public void add(T event) {
        this.add(event, System.currentTimeMillis());
    }

    public void add(T event, long ts) {
        this.add(new EventImpl<T>(event, ts));
    }

    public void add(Event<T> windowEvent) {
        if (windowEvent.isWatermark()) {
            LOG.fine(String.format("Got watermark event with ts %d", windowEvent.getTimestamp()));
        } else {
            this.queue.add(windowEvent);
        }
        this.track(windowEvent);
        this.compactWindow();
    }

    @Override
    public boolean onTrigger() {
        List<Event<T>> windowEvents = null;
        ArrayList<T> expired = null;
        windowEvents = this.scanEvents(true);
        expired = new ArrayList<T>(this.expiredEvents);
        this.expiredEvents.clear();
        ArrayList<T> events = new ArrayList<T>();
        ArrayList<T> newEvents = new ArrayList<T>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if (this.prevWindowEvents.contains(event)) continue;
            newEvents.add(event.get());
        }
        this.prevWindowEvents.clear();
        if (!events.isEmpty()) {
            this.prevWindowEvents.addAll(windowEvents);
            LOG.fine(String.format("invoking windowLifecycleListener onActivation, [%d] events in window.", events.size()));
            this.windowLifecycleListener.onActivation(events, newEvents, expired, this.evictionPolicy.getContext().getReferenceTime());
        } else {
            LOG.fine("No events in the window, skipping onActivation");
        }
        this.triggerPolicy.reset();
        return !events.isEmpty();
    }

    public void shutdown() {
        LOG.fine("Shutting down WindowManager");
        if (this.triggerPolicy != null) {
            this.triggerPolicy.shutdown();
        }
    }

    protected void compactWindow() {
        if (this.eventsSinceLastExpiry.incrementAndGet() >= 100) {
            this.scanEvents(false);
        }
    }

    private void track(Event<T> windowEvent) {
        this.evictionPolicy.track(windowEvent);
        this.triggerPolicy.track(windowEvent);
    }

    private List<Event<T>> scanEvents(boolean fullScan) {
        LOG.fine(String.format("Scan events, eviction policy %s", this.evictionPolicy));
        ArrayList<T> eventsToExpire = new ArrayList<T>();
        ArrayList<Event<T>> eventsToProcess = new ArrayList<Event<T>>();
        Iterator<Event<T>> it = this.queue.iterator();
        while (it.hasNext()) {
            Event<T> windowEvent = it.next();
            EvictionPolicy.Action action = this.evictionPolicy.evict(windowEvent);
            if (action == EvictionPolicy.Action.EXPIRE) {
                eventsToExpire.add(windowEvent.get());
                it.remove();
                continue;
            }
            if (!fullScan || action == EvictionPolicy.Action.STOP) break;
            if (action != EvictionPolicy.Action.PROCESS) continue;
            eventsToProcess.add(windowEvent);
        }
        this.expiredEvents.addAll(eventsToExpire);
        this.eventsSinceLastExpiry.set(0);
        LOG.fine(String.format("[%d] events expired from window.", eventsToExpire.size()));
        if (!eventsToExpire.isEmpty()) {
            LOG.fine("invoking windowLifecycleListener.onExpiry");
            this.windowLifecycleListener.onExpiry(eventsToExpire);
        }
        return eventsToProcess;
    }

    public long getEarliestEventTs(long startTs, long endTs) {
        long minTs = Long.MAX_VALUE;
        for (Event<T> event : this.queue) {
            if (event.getTimestamp() <= startTs || event.getTimestamp() > endTs) continue;
            minTs = Math.min(minTs, event.getTimestamp());
        }
        return minTs;
    }

    public int getEventCount(long referenceTime) {
        int count = 0;
        for (Event<T> event : this.queue) {
            if (event.getTimestamp() > referenceTime) continue;
            ++count;
        }
        return count;
    }

    public List<Long> getSlidingCountTimestamps(long startTs, long endTs, int slidingCount) {
        ArrayList<Long> timestamps = new ArrayList<Long>();
        if (endTs > startTs) {
            int count = 0;
            long ts = Long.MIN_VALUE;
            for (Event<T> event : this.queue) {
                if (event.getTimestamp() <= startTs || event.getTimestamp() > endTs) continue;
                ts = Math.max(ts, event.getTimestamp());
                if (++count % slidingCount != 0) continue;
                timestamps.add(ts);
            }
        }
        return timestamps;
    }

    public String toString() {
        return "WindowManager{evictionPolicy=" + this.evictionPolicy + ", triggerPolicy=" + this.triggerPolicy + '}';
    }

    public void restoreState(Map<String, Serializable> state) {
        LOG.info("Restoring window manager state");
        if (state.get(EVICTION_STATE_KEY) != null) {
            this.evictionPolicy.restoreState(state.get(EVICTION_STATE_KEY));
        }
        if (state.get(TRIGGER_STATE_KEY) != null) {
            this.triggerPolicy.restoreState(state.get(TRIGGER_STATE_KEY));
        }
        this.queue.addAll((Collection)((Object)state.get(QUEUE)));
        this.expiredEvents.addAll((List)((Object)state.get(EXPIRED_EVENTS)));
        this.prevWindowEvents.addAll((Set)((Object)state.get(PRE_WINDOW_EVENTS)));
        this.eventsSinceLastExpiry.set((Integer)state.get(EVENTS_SINCE_LAST_EXPIRY));
    }

    public Map<String, Serializable> getState() {
        HashMap<String, Serializable> ret = new HashMap<String, Serializable>();
        if (this.evictionPolicy.getState() != null) {
            ret.put(EVICTION_STATE_KEY, (Serializable)this.evictionPolicy.getState());
        }
        if (this.triggerPolicy.getState() != null) {
            ret.put(TRIGGER_STATE_KEY, (Serializable)this.triggerPolicy.getState());
        }
        ret.put(QUEUE, (Serializable)((Object)this.queue));
        ret.put(EXPIRED_EVENTS, (Serializable)((Object)this.expiredEvents));
        ret.put(PRE_WINDOW_EVENTS, (Serializable)((Object)this.prevWindowEvents));
        ret.put(EVENTS_SINCE_LAST_EXPIRY, Integer.valueOf(this.eventsSinceLastExpiry.get()));
        return ret;
    }
}

