/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
public class ContinuousProcessingTimeTrigger<W extends Window>
extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;
    private final long interval;
    private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor("fire-time", (ReduceFunction)new Min(), (TypeSerializer)LongSerializer.INSTANCE);

    private ContinuousProcessingTimeTrigger(long interval) {
        this.interval = interval;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        ReducingState fireTimestamp = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        timestamp = ctx.getCurrentProcessingTime();
        if (fireTimestamp.get() == null) {
            long start = timestamp - timestamp % this.interval;
            long nextFireTimestamp = start + this.interval;
            ctx.registerProcessingTimeTimer(nextFireTimestamp);
            fireTimestamp.add((Object)nextFireTimestamp);
            return TriggerResult.CONTINUE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        ReducingState fireTimestamp = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        if (((Long)fireTimestamp.get()).equals(time)) {
            fireTimestamp.clear();
            fireTimestamp.add((Object)(time + this.interval));
            ctx.registerProcessingTimeTimer(time + this.interval);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        ReducingState fireTimestamp = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        long timestamp = (Long)fireTimestamp.get();
        ctx.deleteProcessingTimeTimer(timestamp);
        fireTimestamp.clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public TriggerResult onMerge(W window, Trigger.OnMergeContext ctx) {
        ctx.mergePartitionedState(this.stateDesc);
        return TriggerResult.CONTINUE;
    }

    @VisibleForTesting
    public long getInterval() {
        return this.interval;
    }

    public String toString() {
        return "ContinuousProcessingTimeTrigger(" + this.interval + ")";
    }

    public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval) {
        return new ContinuousProcessingTimeTrigger<W>(interval.toMilliseconds());
    }

    private static class Min
    implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private Min() {
        }

        public Long reduce(Long value1, Long value2) throws Exception {
            return Math.min(value1, value2);
        }
    }
}

