/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.api.windowing.triggers;

import java.io.Serializable;
import java.util.logging.Logger;
import org.apache.heron.api.windowing.DefaultEvictionContext;
import org.apache.heron.api.windowing.Event;
import org.apache.heron.api.windowing.triggers.AbstractBaseTriggerPolicy;

public class WatermarkTimeTriggerPolicy<T extends Serializable>
extends AbstractBaseTriggerPolicy<T, Long> {
    private static final Logger LOG = Logger.getLogger(WatermarkTimeTriggerPolicy.class.getName());
    private final long slidingIntervalMs;
    private volatile long nextWindowEndTs;

    public WatermarkTimeTriggerPolicy(long slidingIntervalMs) {
        this.slidingIntervalMs = slidingIntervalMs;
    }

    @Override
    public void track(Event<T> event) {
        if (this.started.booleanValue() && event.isWatermark()) {
            this.handleWaterMarkEvent(event);
        }
    }

    @Override
    public void reset() {
    }

    @Override
    public void shutdown() {
    }

    private void handleWaterMarkEvent(Event<T> event) {
        long watermarkTs = event.getTimestamp();
        long windowEndTs = this.nextWindowEndTs;
        LOG.fine(String.format("Window end ts %d Watermark ts %d", windowEndTs, watermarkTs));
        while (windowEndTs <= watermarkTs) {
            long currentCount = this.windowManager.getEventCount(windowEndTs);
            this.evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount));
            if (this.handler.onTrigger()) {
                windowEndTs += this.slidingIntervalMs;
                continue;
            }
            long ts = this.getNextAlignedWindowTs(windowEndTs, watermarkTs);
            LOG.fine(String.format("Next aligned window end ts %d", ts));
            if (ts == Long.MAX_VALUE) {
                LOG.fine(String.format("No events to process between %d and watermark ts %d", windowEndTs, watermarkTs));
                break;
            }
            windowEndTs = ts;
        }
        this.nextWindowEndTs = windowEndTs;
    }

    private long getNextAlignedWindowTs(long startTs, long endTs) {
        long nextTs = this.windowManager.getEarliestEventTs(startTs, endTs);
        if (nextTs == Long.MAX_VALUE || nextTs % this.slidingIntervalMs == 0L) {
            return nextTs;
        }
        return nextTs + (this.slidingIntervalMs - nextTs % this.slidingIntervalMs);
    }

    @Override
    public Long getState() {
        return this.nextWindowEndTs;
    }

    @Override
    public void restoreState(Long state) {
        this.nextWindowEndTs = state;
    }

    public String toString() {
        return "WatermarkTimeTriggerPolicy{slidingIntervalMs=" + this.slidingIntervalMs + ", nextWindowEndTs=" + this.nextWindowEndTs + ", started=" + this.started + '}';
    }
}

