/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.WindowDefinition;
import com.hazelcast.jet.datamodel.TimestampedEntry;
import com.hazelcast.jet.function.DistributedComparator;
import com.hazelcast.jet.function.DistributedToLongFunction;
import com.hazelcast.jet.impl.processor.SnapshotKey;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.util.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.LongStream;
import javax.annotation.Nonnull;

public class SlidingWindowP<T, A, R>
extends AbstractProcessor {
    final Map<Long, Map<Object, A>> tsToKeyToAcc = new HashMap<Long, Map<Object, A>>();
    Map<Object, A> slidingWindow;
    long nextWinToEmit = Long.MIN_VALUE;
    private final WindowDefinition wDef;
    private final DistributedToLongFunction<? super T> getFrameTsFn;
    private final Function<? super T, ?> getKeyFn;
    private final AggregateOperation1<? super T, A, R> aggrOp;
    private final boolean isLastStage;
    private final AbstractProcessor.FlatMapper<Watermark, ?> wmFlatMapper;
    private final A emptyAcc;
    private Traverser<Object> flushTraverser;
    private Traverser<Map.Entry> snapshotTraverser;
    private long topTs = Long.MIN_VALUE;
    private long minRestoredNextWinToEmit = Long.MAX_VALUE;
    private ProcessingGuarantee processingGuarantee;

    public SlidingWindowP(Function<? super T, ?> getKeyFn, DistributedToLongFunction<? super T> getFrameTsFn, WindowDefinition winDef, AggregateOperation1<? super T, A, R> aggrOp, boolean isLastStage) {
        if (!winDef.isTumbling()) {
            Preconditions.checkNotNull(aggrOp.combineFn(), "AggregateOperation lacks the combine primitive");
        }
        this.wDef = winDef;
        this.getFrameTsFn = getFrameTsFn;
        this.getKeyFn = getKeyFn;
        this.aggrOp = aggrOp;
        this.isLastStage = isLastStage;
        this.wmFlatMapper = this.flatMapper(wm -> this.windowTraverserAndEvictor(wm.timestamp()).append(wm).onFirstNull(() -> {
            this.nextWinToEmit = this.wDef.higherFrameTs(wm.timestamp());
        }));
        this.emptyAcc = aggrOp.createFn().get();
    }

    @Override
    protected void init(@Nonnull Processor.Context context) throws Exception {
        this.processingGuarantee = context.processingGuarantee();
    }

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        Object t = item;
        long frameTs = this.getFrameTsFn.applyAsLong(t);
        assert (frameTs == this.wDef.floorFrameTs(frameTs)) : "getFrameTsFn returned an invalid frame timestamp";
        assert (frameTs + this.wDef.windowLength() >= this.nextWinToEmit) : "late event received, it should have been filtered out by InsertWatermarksP: item=" + item + ", nextWinToEmit=" + this.nextWinToEmit;
        Object key = this.getKeyFn.apply(t);
        Object acc = this.tsToKeyToAcc.computeIfAbsent(frameTs, x -> new HashMap()).computeIfAbsent(key, k -> this.aggrOp.createFn().get());
        this.aggrOp.accumulateFn().accept(acc, t);
        this.topTs = Math.max(this.topTs, frameTs);
        return true;
    }

    @Override
    protected boolean tryProcessWm0(@Nonnull Watermark wm) {
        return this.wmFlatMapper.tryProcess(wm);
    }

    @Override
    public boolean complete() {
        return this.flushBuffers();
    }

    @Override
    public boolean saveToSnapshot() {
        if (!this.isLastStage || this.flushTraverser != null) {
            return this.flushBuffers();
        }
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseIterable(this.tsToKeyToAcc.entrySet()).flatMap(e -> Traversers.traverseIterable(((Map)e.getValue()).entrySet()).map(e2 -> Util.entry(new SnapshotKey((Long)e.getKey(), e2.getKey()), e2.getValue()))).append(Util.entry(BroadcastKey.broadcastKey(Keys.NEXT_WIN_TO_EMIT), this.nextWinToEmit)).onFirstNull(() -> {
                this.snapshotTraverser = null;
            });
        }
        return this.emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    @Override
    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        if (key instanceof BroadcastKey) {
            assert (Keys.NEXT_WIN_TO_EMIT.equals(((BroadcastKey)key).key())) : "key=" + ((BroadcastKey)key).key();
            long newNextWinToEmit = (Long)value;
            assert (this.processingGuarantee != ProcessingGuarantee.EXACTLY_ONCE || this.minRestoredNextWinToEmit == Long.MAX_VALUE || this.minRestoredNextWinToEmit == newNextWinToEmit) : "different values for nextWinToEmit restored, before=" + this.minRestoredNextWinToEmit + ", new=" + newNextWinToEmit;
            this.minRestoredNextWinToEmit = Math.min(newNextWinToEmit, this.minRestoredNextWinToEmit);
            return;
        }
        SnapshotKey k = (SnapshotKey)key;
        if (this.tsToKeyToAcc.computeIfAbsent(k.timestamp, x -> new HashMap()).put(k.key, value) != null) {
            throw new JetException("Duplicate key in snapshot: " + k);
        }
        this.topTs = Math.max(this.topTs, k.timestamp);
    }

    @Override
    public boolean finishSnapshotRestore() {
        if (this.isLastStage) {
            this.nextWinToEmit = this.minRestoredNextWinToEmit;
        }
        LoggingUtil.logFine(this.getLogger(), "Restored nextWinToEmit from snapshot to: %s", this.nextWinToEmit);
        return true;
    }

    private Traverser<Object> windowTraverserAndEvictor(long wm) {
        long rangeStart;
        if (this.nextWinToEmit != Long.MIN_VALUE) {
            rangeStart = this.nextWinToEmit;
        } else {
            if (this.tsToKeyToAcc.isEmpty()) {
                return Traversers.empty();
            }
            long bottomTs = (Long)this.tsToKeyToAcc.keySet().stream().min(DistributedComparator.naturalOrder()).orElseThrow(() -> new AssertionError((Object)"Failed to find the min key in a non-empty map"));
            rangeStart = Math.min(bottomTs, this.wDef.floorFrameTs(wm));
        }
        return Traversers.traverseStream(SlidingWindowP.range(rangeStart, wm, this.wDef.frameLength()).boxed()).flatMap(window -> Traversers.traverseIterable(this.computeWindow((long)window).entrySet()).map(e -> new TimestampedEntry((long)window, e.getKey(), this.aggrOp.finishFn().apply(e.getValue()))).onFirstNull(() -> this.completeWindow((long)window)));
    }

    private Map<Object, A> computeWindow(long frameTs) {
        if (this.wDef.isTumbling()) {
            return this.tsToKeyToAcc.getOrDefault(frameTs, Collections.emptyMap());
        }
        if (this.aggrOp.deductFn() == null) {
            return this.recomputeWindow(frameTs);
        }
        if (this.slidingWindow == null) {
            this.slidingWindow = this.recomputeWindow(frameTs);
        } else {
            this.patchSlidingWindow(this.aggrOp.combineFn(), this.tsToKeyToAcc.get(frameTs));
        }
        return this.slidingWindow;
    }

    private Map<Object, A> recomputeWindow(long frameTs) {
        HashMap window = new HashMap();
        for (long ts = frameTs - this.wDef.windowLength() + this.wDef.frameLength(); ts <= frameTs; ts += this.wDef.frameLength()) {
            this.tsToKeyToAcc.getOrDefault(ts, Collections.emptyMap()).forEach((key, currAcc) -> this.aggrOp.combineFn().accept(window.computeIfAbsent(key, k -> this.aggrOp.createFn().get()), currAcc));
        }
        return window;
    }

    private void patchSlidingWindow(BiConsumer<? super A, ? super A> patchOp, Map<Object, A> patchingFrame) {
        if (patchingFrame == null) {
            return;
        }
        for (Map.Entry<Object, A> e : patchingFrame.entrySet()) {
            this.slidingWindow.compute(e.getKey(), (k, acc) -> {
                Object result = acc != null ? acc : this.aggrOp.createFn().get();
                patchOp.accept((A)result, (A)e.getValue());
                return result.equals(this.emptyAcc) ? null : result;
            });
        }
    }

    private void completeWindow(long frameTs) {
        long frameToEvict = frameTs - this.wDef.windowLength() + this.wDef.frameLength();
        Map<Object, A> evictedFrame = this.tsToKeyToAcc.remove(frameToEvict);
        if (!this.wDef.isTumbling() && this.aggrOp.deductFn() != null) {
            this.patchSlidingWindow(this.aggrOp.deductFn(), evictedFrame);
        }
    }

    private boolean flushBuffers() {
        if (this.flushTraverser == null) {
            if (this.tsToKeyToAcc.isEmpty()) {
                return true;
            }
            this.flushTraverser = this.windowTraverserAndEvictor(this.topTs + this.wDef.windowLength() - this.wDef.frameLength()).onFirstNull(() -> {
                this.flushTraverser = null;
            });
        }
        return this.emitFromTraverser(this.flushTraverser);
    }

    private static LongStream range(long start, long end, long step) {
        return start > end ? LongStream.empty() : LongStream.iterate(start, n -> n + step).limit(1L + (end - start) / step);
    }

    static enum Keys {
        NEXT_WIN_TO_EMIT;

    }
}

