package com.hazelcast.jet.core;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.function.ObjLongBiFunction;
import com.hazelcast.jet.impl.execution.WatermarkCoalescer;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* loaded from: input_file:com/hazelcast/jet/core/WatermarkSourceUtil.class */
public class WatermarkSourceUtil<T> {
    private static final WatermarkPolicy[] EMPTY_WATERMARK_POLICIES;
    private static final long[] EMPTY_LONGS;
    private final long idleTimeoutNanos;
    private final ToLongFunction<? super T> timestampFn;
    private final Supplier<? extends WatermarkPolicy> newWmPolicyFn;
    private final ObjLongBiFunction<? super T, ?> wrapFn;
    private final WatermarkEmissionPolicy wmEmitPolicy;
    private final AppendableTraverser<Object> traverser = new AppendableTraverser<>(2);
    private WatermarkPolicy[] wmPolicies = EMPTY_WATERMARK_POLICIES;
    private long[] watermarks = EMPTY_LONGS;
    private long[] markIdleAt = EMPTY_LONGS;
    private long lastEmittedWm = Long.MIN_VALUE;
    private long topObservedWm = Long.MIN_VALUE;
    private boolean allAreIdle;
    static final /* synthetic */ boolean $assertionsDisabled;

    public WatermarkSourceUtil(WatermarkGenerationParams<? super T> watermarkGenerationParams) {
        this.idleTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(watermarkGenerationParams.idleTimeoutMillis());
        this.timestampFn = watermarkGenerationParams.timestampFn();
        this.wrapFn = watermarkGenerationParams.wrapFn();
        this.newWmPolicyFn = watermarkGenerationParams.newWmPolicyFn();
        this.wmEmitPolicy = watermarkGenerationParams.wmEmitPolicy();
    }

    @Nonnull
    public Traverser<Object> handleEvent(T t, int i) {
        return handleEvent(System.nanoTime(), t, i);
    }

    @Nonnull
    public Traverser<Object> handleNoEvent() {
        return handleEvent(System.nanoTime(), null, -1);
    }

    Traverser<Object> handleEvent(long j, @Nullable T t, int i) {
        if (!$assertionsDisabled && !this.traverser.isEmpty()) {
            throw new AssertionError("the traverser returned previously not yet drained: remove all items from the traverser before you call this method again.");
        }
        if (t != null) {
            long applyAsLong = this.timestampFn.applyAsLong(t);
            handleEventInt(j, i, applyAsLong);
            this.traverser.append(this.wrapFn.apply(t, applyAsLong));
        } else {
            handleNoEventInt(j);
        }
        return this.traverser;
    }

    private void handleEventInt(long j, int i, long j2) {
        this.wmPolicies[i].reportEvent(j2);
        this.markIdleAt[i] = j + this.idleTimeoutNanos;
        this.allAreIdle = false;
        handleNoEventInt(j);
    }

    private void handleNoEventInt(long j) {
        long j2 = Long.MAX_VALUE;
        for (int i = 0; i < this.watermarks.length; i++) {
            if (this.idleTimeoutNanos <= 0 || this.markIdleAt[i] > j) {
                this.watermarks[i] = this.wmPolicies[i].getCurrentWatermark();
                this.topObservedWm = Math.max(this.topObservedWm, this.watermarks[i]);
                j2 = Math.min(j2, this.watermarks[i]);
            }
        }
        if (j2 != Long.MAX_VALUE) {
            this.allAreIdle = false;
        } else {
            if (this.allAreIdle) {
                return;
            }
            j2 = this.topObservedWm;
            this.allAreIdle = true;
        }
        long throttleWm = this.wmEmitPolicy.throttleWm(j2, this.lastEmittedWm);
        if (throttleWm > this.lastEmittedWm) {
            this.traverser.append(new Watermark(throttleWm));
            this.lastEmittedWm = throttleWm;
        }
        if (this.allAreIdle) {
            this.traverser.append(WatermarkCoalescer.IDLE_MESSAGE);
        }
    }

    public void increasePartitionCount(int i) {
        increasePartitionCount(System.nanoTime(), i);
    }

    void increasePartitionCount(long j, int i) {
        int length = this.wmPolicies.length;
        if (i < length) {
            throw new IllegalArgumentException("partition count must increase. Old count=" + length + ", new count=" + i);
        }
        this.wmPolicies = (WatermarkPolicy[]) Arrays.copyOf(this.wmPolicies, i);
        this.watermarks = Arrays.copyOf(this.watermarks, i);
        this.markIdleAt = Arrays.copyOf(this.markIdleAt, i);
        for (int i2 = length; i2 < i; i2++) {
            this.wmPolicies[i2] = this.newWmPolicyFn.get();
            this.watermarks[i2] = Long.MIN_VALUE;
            this.markIdleAt[i2] = j + this.idleTimeoutNanos;
        }
    }

    public long getWatermark(int i) {
        return this.watermarks[i];
    }

    public void restoreWatermark(int i, long j) {
        this.watermarks[i] = j;
    }

    static {
        $assertionsDisabled = !WatermarkSourceUtil.class.desiredAssertionStatus();
        EMPTY_WATERMARK_POLICIES = new WatermarkPolicy[0];
        EMPTY_LONGS = new long[0];
    }
}
