package com.hazelcast.jet.impl.processor;

import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ResettableSingletonTraverser;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.datamodel.TimestampedItem;
import com.hazelcast.jet.function.TriFunction;
import com.hazelcast.jet.impl.memory.AccumulationLimitExceededException;
import com.hazelcast.jet.impl.util.Util;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* loaded from: input_file:com/hazelcast/jet/impl/processor/TransformStatefulP.class */
public class TransformStatefulP<T, K, S, R> extends AbstractProcessor {
    private static final int HASH_MAP_INITIAL_CAPACITY = 16;
    private static final float HASH_MAP_LOAD_FACTOR = 0.75f;
    private static final Watermark FLUSHING_WATERMARK;
    private final long ttl;
    private final Function<? super T, ? extends K> keyFn;
    private final ToLongFunction<? super T> timestampFn;
    private final Function<K, TimestampedItem<S>> createIfAbsentFn;
    private final TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> statefulFlatMapFn;

    @Nullable
    private final TriFunction<? super S, ? super K, ? super Long, ? extends Traverser<R>> onEvictFn;
    private Traverser<? extends Map.Entry<?, ?>> snapshotTraverser;
    private boolean inComplete;
    private long maxEntries;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Probe(name = "lateEventsDropped")
    private final Counter lateEventsDropped = SwCounter.newSwCounter();
    private final Map<K, TimestampedItem<S>> keyToState = new LinkedHashMap(16, 0.75f, true);
    private final AbstractProcessor.FlatMapper<T, R> flatMapper = flatMapper(this::flatMapEvent);
    private final AbstractProcessor.FlatMapper<Watermark, Object> wmFlatMapper = flatMapper(this::flatMapWm);
    private final TransformStatefulP<T, K, S, R>.EvictingTraverser evictingTraverser = new EvictingTraverser();
    private final Traverser<?> evictingTraverserFlattened = this.evictingTraverser.flatMap(traverser -> {
        return traverser;
    });
    private long currentWm = Long.MIN_VALUE;

    /* loaded from: input_file:com/hazelcast/jet/impl/processor/TransformStatefulP$EvictingTraverser.class */
    private class EvictingTraverser implements Traverser<Traverser<?>> {
        private Iterator<Map.Entry<K, TimestampedItem<S>>> keyToStateIterator;
        private final ResettableSingletonTraverser<Watermark> wmTraverser = new ResettableSingletonTraverser<>();

        private EvictingTraverser() {
        }

        void reset(Watermark watermark) {
            this.keyToStateIterator = TransformStatefulP.this.keyToState.entrySet().iterator();
            if (watermark == TransformStatefulP.FLUSHING_WATERMARK) {
                return;
            }
            this.wmTraverser.accept(watermark);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.hazelcast.jet.Traverser
        public Traverser<?> next() {
            if (this.keyToStateIterator == null) {
                return null;
            }
            while (this.keyToStateIterator.hasNext()) {
                Map.Entry<K, TimestampedItem<S>> next = this.keyToStateIterator.next();
                if (next.getValue().timestamp() >= Util.subtractClamped(TransformStatefulP.this.currentWm, TransformStatefulP.this.ttl)) {
                    break;
                }
                this.keyToStateIterator.remove();
                if (TransformStatefulP.this.onEvictFn != null) {
                    return TransformStatefulP.this.onEvictFn.apply(next.getValue().item(), next.getKey(), Long.valueOf(TransformStatefulP.this.currentWm));
                }
            }
            this.keyToStateIterator = null;
            return this.wmTraverser;
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/impl/processor/TransformStatefulP$SnapshotKeys.class */
    private enum SnapshotKeys {
        WATERMARK
    }

    public TransformStatefulP(long j, @Nonnull Function<? super T, ? extends K> function, @Nonnull ToLongFunction<? super T> toLongFunction, @Nonnull Supplier<? extends S> supplier, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> triFunction, @Nullable TriFunction<? super S, ? super K, ? super Long, ? extends Traverser<R>> triFunction2) {
        this.ttl = j > 0 ? j : Long.MAX_VALUE;
        this.keyFn = function;
        this.timestampFn = toLongFunction;
        this.createIfAbsentFn = obj -> {
            return new TimestampedItem(Long.MIN_VALUE, supplier.get());
        };
        this.statefulFlatMapFn = triFunction;
        this.onEvictFn = triFunction2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.jet.core.AbstractProcessor
    public void init(@Nonnull Processor.Context context) throws Exception {
        this.maxEntries = context.maxProcessorAccumulatedRecords();
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected boolean tryProcess(int i, @Nonnull Object obj) {
        return this.flatMapper.tryProcess(obj);
    }

    @Nonnull
    private Traverser<R> flatMapEvent(T t) {
        long applyAsLong = this.timestampFn.applyAsLong(t);
        if (applyAsLong < this.currentWm && this.ttl != Long.MAX_VALUE) {
            Util.logLateEvent(getLogger(), (byte) 0, this.currentWm, t);
            this.lateEventsDropped.inc();
            return Traversers.empty();
        }
        K apply = this.keyFn.apply(t);
        TimestampedItem<S> computeIfAbsent = this.keyToState.computeIfAbsent(apply, obj -> {
            if (this.keyToState.size() == this.maxEntries) {
                throw new AccumulationLimitExceededException();
            }
            return this.createIfAbsentFn.apply(obj);
        });
        computeIfAbsent.setTimestamp(Math.max(computeIfAbsent.timestamp(), applyAsLong));
        return this.statefulFlatMapFn.apply(computeIfAbsent.item(), apply, t);
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor, com.hazelcast.jet.core.Processor
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        keyedWatermarkCheck(watermark);
        return this.wmFlatMapper.tryProcess(watermark);
    }

    private Traverser<?> flatMapWm(Watermark watermark) {
        this.currentWm = watermark.timestamp();
        this.evictingTraverser.reset(watermark);
        return this.evictingTraverserFlattened;
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean complete() {
        this.inComplete = true;
        return tryProcessWatermark(FLUSHING_WATERMARK);
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean closeIsCooperative() {
        return true;
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean saveToSnapshot() {
        if (this.inComplete) {
            return complete();
        }
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseIterable(this.keyToState.entrySet()).append(com.hazelcast.jet.Util.entry(BroadcastKey.broadcastKey(SnapshotKeys.WATERMARK), Long.valueOf(this.currentWm))).onFirstNull(() -> {
                this.snapshotTraverser = null;
            });
        }
        return emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        if (!(obj instanceof BroadcastKey)) {
            TimestampedItem<S> put = this.keyToState.put(obj, (TimestampedItem) obj2);
            if (!$assertionsDisabled && put != null) {
                throw new AssertionError("Duplicate key '" + obj + "'");
            }
            return;
        }
        BroadcastKey broadcastKey = (BroadcastKey) obj;
        if (!$assertionsDisabled && broadcastKey.key() != SnapshotKeys.WATERMARK) {
            throw new AssertionError("Unexpected " + obj);
        }
        long longValue = ((Long) obj2).longValue();
        this.currentWm = this.currentWm == Long.MIN_VALUE ? longValue : Math.min(this.currentWm, longValue);
    }

    static {
        $assertionsDisabled = !TransformStatefulP.class.desiredAssertionStatus();
        FLUSHING_WATERMARK = new Watermark(Long.MAX_VALUE);
    }
}
