package com.hazelcast.jet.impl.processor;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.WindowDefinition;
import com.hazelcast.jet.datamodel.TimestampedEntry;
import com.hazelcast.jet.function.DistributedComparator;
import com.hazelcast.jet.function.DistributedToLongFunction;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.util.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.LongStream;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/processor/SlidingWindowP.class */
public class SlidingWindowP<T, A, R> extends AbstractProcessor {
    Map<Object, A> slidingWindow;
    private final WindowDefinition wDef;
    private final DistributedToLongFunction<? super T> getFrameTsFn;
    private final Function<? super T, ?> getKeyFn;
    private final AggregateOperation1<? super T, A, R> aggrOp;
    private final boolean isLastStage;
    private final AbstractProcessor.FlatMapper<Watermark, ?> wmFlatMapper;
    private final A emptyAcc;
    private Traverser<Object> flushTraverser;
    private Traverser<Map.Entry> snapshotTraverser;
    private ProcessingGuarantee processingGuarantee;
    static final /* synthetic */ boolean $assertionsDisabled;
    final Map<Long, Map<Object, A>> tsToKeyToAcc = new HashMap();
    long nextWinToEmit = Long.MIN_VALUE;
    private long topTs = Long.MIN_VALUE;
    private long minRestoredNextWinToEmit = Long.MAX_VALUE;

    /* loaded from: input_file:com/hazelcast/jet/impl/processor/SlidingWindowP$Keys.class */
    enum Keys {
        NEXT_WIN_TO_EMIT
    }

    public SlidingWindowP(Function<? super T, ?> function, DistributedToLongFunction<? super T> distributedToLongFunction, WindowDefinition windowDefinition, AggregateOperation1<? super T, A, R> aggregateOperation1, boolean z) {
        if (!windowDefinition.isTumbling()) {
            Preconditions.checkNotNull(aggregateOperation1.combineFn(), "AggregateOperation lacks the combine primitive");
        }
        this.wDef = windowDefinition;
        this.getFrameTsFn = distributedToLongFunction;
        this.getKeyFn = function;
        this.aggrOp = aggregateOperation1;
        this.isLastStage = z;
        this.wmFlatMapper = flatMapper(watermark -> {
            return windowTraverserAndEvictor(watermark.timestamp()).append(watermark).onFirstNull(() -> {
                this.nextWinToEmit = this.wDef.higherFrameTs(watermark.timestamp());
            });
        });
        this.emptyAcc = aggregateOperation1.createFn().get();
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected void init(@Nonnull Processor.Context context) throws Exception {
        this.processingGuarantee = context.processingGuarantee();
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected boolean tryProcess0(@Nonnull Object obj) {
        long applyAsLong = this.getFrameTsFn.applyAsLong(obj);
        if (!$assertionsDisabled && applyAsLong != this.wDef.floorFrameTs(applyAsLong)) {
            throw new AssertionError("getFrameTsFn returned an invalid frame timestamp");
        }
        if (!$assertionsDisabled && applyAsLong + this.wDef.windowLength() < this.nextWinToEmit) {
            throw new AssertionError("late event received, it should have been filtered out by InsertWatermarksP: item=" + obj + ", nextWinToEmit=" + this.nextWinToEmit);
        }
        this.aggrOp.accumulateFn().accept(this.tsToKeyToAcc.computeIfAbsent(Long.valueOf(applyAsLong), l -> {
            return new HashMap();
        }).computeIfAbsent(this.getKeyFn.apply(obj), obj2 -> {
            return this.aggrOp.createFn().get();
        }), obj);
        this.topTs = Math.max(this.topTs, applyAsLong);
        return true;
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected boolean tryProcessWm0(@Nonnull Watermark watermark) {
        return this.wmFlatMapper.tryProcess(watermark);
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean complete() {
        return flushBuffers();
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean saveToSnapshot() {
        if (!this.isLastStage || this.flushTraverser != null) {
            return flushBuffers();
        }
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseIterable(this.tsToKeyToAcc.entrySet()).flatMap(entry -> {
                return Traversers.traverseIterable(((Map) entry.getValue()).entrySet()).map(entry -> {
                    return Util.entry(new SnapshotKey(((Long) entry.getKey()).longValue(), entry.getKey()), entry.getValue());
                });
            }).append(Util.entry(BroadcastKey.broadcastKey(Keys.NEXT_WIN_TO_EMIT), Long.valueOf(this.nextWinToEmit))).onFirstNull(() -> {
                this.snapshotTraverser = null;
            });
        }
        return emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        if (!(obj instanceof BroadcastKey)) {
            SnapshotKey snapshotKey = (SnapshotKey) obj;
            if (this.tsToKeyToAcc.computeIfAbsent(Long.valueOf(snapshotKey.timestamp), l -> {
                return new HashMap();
            }).put(snapshotKey.key, obj2) != null) {
                throw new JetException("Duplicate key in snapshot: " + snapshotKey);
            }
            this.topTs = Math.max(this.topTs, snapshotKey.timestamp);
            return;
        }
        if (!$assertionsDisabled && !Keys.NEXT_WIN_TO_EMIT.equals(((BroadcastKey) obj).key())) {
            throw new AssertionError("key=" + ((BroadcastKey) obj).key());
        }
        long longValue = ((Long) obj2).longValue();
        if (!$assertionsDisabled && this.processingGuarantee == ProcessingGuarantee.EXACTLY_ONCE && this.minRestoredNextWinToEmit != Long.MAX_VALUE && this.minRestoredNextWinToEmit != longValue) {
            throw new AssertionError("different values for nextWinToEmit restored, before=" + this.minRestoredNextWinToEmit + ", new=" + longValue);
        }
        this.minRestoredNextWinToEmit = Math.min(longValue, this.minRestoredNextWinToEmit);
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean finishSnapshotRestore() {
        if (this.isLastStage) {
            this.nextWinToEmit = this.minRestoredNextWinToEmit;
        }
        LoggingUtil.logFine(getLogger(), "Restored nextWinToEmit from snapshot to: %s", Long.valueOf(this.nextWinToEmit));
        return true;
    }

    private Traverser<Object> windowTraverserAndEvictor(long j) {
        long min;
        if (this.nextWinToEmit != Long.MIN_VALUE) {
            min = this.nextWinToEmit;
        } else {
            if (this.tsToKeyToAcc.isEmpty()) {
                return Traversers.empty();
            }
            min = Math.min(this.tsToKeyToAcc.keySet().stream().min(DistributedComparator.naturalOrder()).orElseThrow(() -> {
                return new AssertionError("Failed to find the min key in a non-empty map");
            }).longValue(), this.wDef.floorFrameTs(j));
        }
        return Traversers.traverseStream(range(min, j, this.wDef.frameLength()).boxed()).flatMap(l -> {
            return Traversers.traverseIterable(computeWindow(l.longValue()).entrySet()).map(entry -> {
                return new TimestampedEntry(l.longValue(), entry.getKey(), this.aggrOp.finishFn().apply((Object) entry.getValue()));
            }).onFirstNull(() -> {
                completeWindow(l.longValue());
            });
        });
    }

    private Map<Object, A> computeWindow(long j) {
        if (this.wDef.isTumbling()) {
            return this.tsToKeyToAcc.getOrDefault(Long.valueOf(j), Collections.emptyMap());
        }
        if (this.aggrOp.deductFn() == null) {
            return recomputeWindow(j);
        }
        if (this.slidingWindow == null) {
            this.slidingWindow = recomputeWindow(j);
        } else {
            patchSlidingWindow(this.aggrOp.combineFn(), this.tsToKeyToAcc.get(Long.valueOf(j)));
        }
        return this.slidingWindow;
    }

    private Map<Object, A> recomputeWindow(long j) {
        HashMap hashMap = new HashMap();
        long windowLength = j - this.wDef.windowLength();
        long frameLength = this.wDef.frameLength();
        while (true) {
            long j2 = windowLength + frameLength;
            if (j2 > j) {
                return hashMap;
            }
            this.tsToKeyToAcc.getOrDefault(Long.valueOf(j2), Collections.emptyMap()).forEach((obj, obj2) -> {
                this.aggrOp.combineFn().accept((Object) hashMap.computeIfAbsent(obj, obj -> {
                    return this.aggrOp.createFn().get();
                }), obj2);
            });
            windowLength = j2;
            frameLength = this.wDef.frameLength();
        }
    }

    private void patchSlidingWindow(BiConsumer<? super A, ? super A> biConsumer, Map<Object, A> map) {
        if (map == null) {
            return;
        }
        for (Map.Entry<Object, A> entry : map.entrySet()) {
            this.slidingWindow.compute(entry.getKey(), (obj, obj2) -> {
                T t = obj2 != null ? (A) obj2 : this.aggrOp.createFn().get();
                biConsumer.accept(t, entry.getValue());
                if (t.equals(this.emptyAcc)) {
                    return null;
                }
                return t;
            });
        }
    }

    private void completeWindow(long j) {
        Map<Object, A> remove = this.tsToKeyToAcc.remove(Long.valueOf((j - this.wDef.windowLength()) + this.wDef.frameLength()));
        if (this.wDef.isTumbling() || this.aggrOp.deductFn() == null) {
            return;
        }
        patchSlidingWindow(this.aggrOp.deductFn(), remove);
    }

    private boolean flushBuffers() {
        if (this.flushTraverser == null) {
            if (this.tsToKeyToAcc.isEmpty()) {
                return true;
            }
            this.flushTraverser = windowTraverserAndEvictor((this.topTs + this.wDef.windowLength()) - this.wDef.frameLength()).onFirstNull(() -> {
                this.flushTraverser = null;
            });
        }
        return emitFromTraverser(this.flushTraverser);
    }

    private static LongStream range(long j, long j2, long j3) {
        return j > j2 ? LongStream.empty() : LongStream.iterate(j, j4 -> {
            return j4 + j3;
        }).limit(1 + ((j2 - j) / j3));
    }

    static {
        $assertionsDisabled = !SlidingWindowP.class.desiredAssertionStatus();
    }
}
