package com.hazelcast.jet.impl.processor;

import com.hazelcast.jet.AbstractProcessor;
import com.hazelcast.jet.AggregateOperation;
import com.hazelcast.jet.TimestampedEntry;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Watermark;
import com.hazelcast.jet.WindowDefinition;
import com.hazelcast.jet.function.DistributedComparator;
import com.hazelcast.jet.function.DistributedToLongFunction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.LongStream;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/processor/SlidingWindowP.class */
public class SlidingWindowP<T, A, R> extends AbstractProcessor {
    private final WindowDefinition wDef;
    private final DistributedToLongFunction<? super T> getFrameTimestampF;
    private final Function<? super T, ?> getKeyF;
    private final AggregateOperation<? super T, A, R> aggrOp;
    private final A emptyAcc;
    private Traverser<Object> finalTraverser;
    static final /* synthetic */ boolean $assertionsDisabled;
    final Map<Long, Map<Object, A>> tsToKeyToAcc = new HashMap();
    final Map<Object, A> slidingWindow = new HashMap();
    private long nextFrameTsToEmit = Long.MIN_VALUE;
    private final AbstractProcessor.FlatMapper<Watermark, Object> flatMapper = flatMapper(watermark -> {
        return windowTraverserAndEvictor(watermark.timestamp()).append(watermark);
    });

    public SlidingWindowP(Function<? super T, ?> function, DistributedToLongFunction<? super T> distributedToLongFunction, WindowDefinition windowDefinition, AggregateOperation<? super T, A, R> aggregateOperation) {
        this.wDef = windowDefinition;
        this.getFrameTimestampF = distributedToLongFunction;
        this.getKeyF = function;
        this.aggrOp = aggregateOperation;
        this.emptyAcc = aggregateOperation.createAccumulatorF().get();
    }

    @Override // com.hazelcast.jet.AbstractProcessor
    protected boolean tryProcess0(@Nonnull Object obj) {
        Long valueOf = Long.valueOf(this.getFrameTimestampF.applyAsLong(obj));
        if (!$assertionsDisabled && valueOf.longValue() != this.wDef.floorFrameTs(valueOf.longValue())) {
            throw new AssertionError("timestamp not on the verge of a frame");
        }
        this.aggrOp.accumulateItemF().accept(this.tsToKeyToAcc.computeIfAbsent(valueOf, l -> {
            return new HashMap();
        }).computeIfAbsent(this.getKeyF.apply(obj), obj2 -> {
            return this.aggrOp.createAccumulatorF().get();
        }), obj);
        return true;
    }

    @Override // com.hazelcast.jet.AbstractProcessor
    protected boolean tryProcessWm0(@Nonnull Watermark watermark) {
        return this.flatMapper.tryProcess(watermark);
    }

    @Override // com.hazelcast.jet.Processor
    public boolean complete() {
        if (this.finalTraverser == null) {
            if (this.tsToKeyToAcc.isEmpty()) {
                return true;
            }
            this.finalTraverser = windowTraverserAndEvictor(this.tsToKeyToAcc.keySet().stream().max(DistributedComparator.naturalOrder()).get().longValue() + this.wDef.frameLength());
        }
        return emitFromTraverser(this.finalTraverser);
    }

    private Traverser<Object> windowTraverserAndEvictor(long j) {
        if (this.nextFrameTsToEmit == Long.MIN_VALUE) {
            if (this.tsToKeyToAcc.isEmpty()) {
                return Traversers.empty();
            }
            this.nextFrameTsToEmit = Math.min(this.tsToKeyToAcc.keySet().stream().min(DistributedComparator.naturalOrder()).orElseThrow(() -> {
                return new AssertionError("Failed to find the min key in a non-empty map");
            }).longValue(), this.wDef.floorFrameTs(j));
        }
        long j2 = this.nextFrameTsToEmit;
        this.nextFrameTsToEmit = this.wDef.higherFrameTs(j);
        return Traversers.traverseStream(range(j2, this.nextFrameTsToEmit, this.wDef.frameLength()).boxed()).flatMap(l -> {
            return Traversers.traverseIterable(computeWindow(l.longValue()).entrySet()).map(entry -> {
                return new TimestampedEntry(l.longValue(), entry.getKey(), this.aggrOp.finishAccumulationF().apply((Object) entry.getValue()));
            }).onFirstNull(() -> {
                completeWindow(l.longValue());
            });
        });
    }

    private Map<Object, A> computeWindow(long j) {
        if (this.wDef.isTumbling()) {
            return this.tsToKeyToAcc.getOrDefault(Long.valueOf(j), Collections.emptyMap());
        }
        if (this.aggrOp.deductAccumulatorF() != null) {
            patchSlidingWindow(this.aggrOp.combineAccumulatorsF(), this.tsToKeyToAcc.get(Long.valueOf(j)));
            return this.slidingWindow;
        }
        HashMap hashMap = new HashMap();
        long windowLength = j - this.wDef.windowLength();
        long frameLength = this.wDef.frameLength();
        while (true) {
            long j2 = windowLength + frameLength;
            if (j2 > j) {
                return hashMap;
            }
            this.tsToKeyToAcc.getOrDefault(Long.valueOf(j2), Collections.emptyMap()).forEach((obj, obj2) -> {
                this.aggrOp.combineAccumulatorsF().accept((Object) hashMap.computeIfAbsent(obj, obj -> {
                    return this.aggrOp.createAccumulatorF().get();
                }), obj2);
            });
            windowLength = j2;
            frameLength = this.wDef.frameLength();
        }
    }

    private void completeWindow(long j) {
        Map<Object, A> remove = this.tsToKeyToAcc.remove(Long.valueOf((j - this.wDef.windowLength()) + this.wDef.frameLength()));
        if (this.aggrOp.deductAccumulatorF() != null) {
            patchSlidingWindow(this.aggrOp.deductAccumulatorF(), remove);
        }
    }

    private void patchSlidingWindow(BiConsumer<? super A, ? super A> biConsumer, Map<Object, A> map) {
        if (map == null) {
            return;
        }
        for (Map.Entry<Object, A> entry : map.entrySet()) {
            this.slidingWindow.compute(entry.getKey(), (obj, obj2) -> {
                T t = obj2 != null ? (A) obj2 : this.aggrOp.createAccumulatorF().get();
                biConsumer.accept(t, entry.getValue());
                if (t.equals(this.emptyAcc)) {
                    return null;
                }
                return t;
            });
        }
    }

    private static LongStream range(long j, long j2, long j3) {
        return j >= j2 ? LongStream.empty() : LongStream.iterate(j, j4 -> {
            return j4 + j3;
        }).limit(1 + (((j2 - j) - 1) / j3));
    }

    static {
        $assertionsDisabled = !SlidingWindowP.class.desiredAssertionStatus();
    }
}
