/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.ResettableSingletonTraverser;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedTriFunction;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;

public final class RollingAggregateP<T, K, A, R, OUT>
extends AbstractProcessor {
    private final AbstractProcessor.FlatMapper<T, OUT> flatMapper;
    private final Map<K, A> keyToAcc = new HashMap<K, A>();
    private final ResettableSingletonTraverser<OUT> outputTraverser = new ResettableSingletonTraverser();
    private Traverser<Map.Entry<K, A>> snapshotTraverser;

    public RollingAggregateP(@Nonnull DistributedFunction<? super T, ? extends K> keyFn, @Nonnull AggregateOperation1<? super T, A, ? extends R> aggrOp, @Nonnull DistributedTriFunction<? super T, ? super K, ? super R, ? extends OUT> mapToOutputFn) {
        this.flatMapper = this.flatMapper(item -> {
            Object key = keyFn.apply(item);
            Object acc = this.keyToAcc.computeIfAbsent(key, k -> aggrOp.createFn().get());
            aggrOp.accumulateFn().accept(acc, item);
            Object aggResult = aggrOp.exportFn().apply(acc);
            Object output = mapToOutputFn.apply((T)item, (K)key, (R)aggResult);
            if (output != null) {
                this.outputTraverser.accept(output);
            }
            return this.outputTraverser;
        });
    }

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        return this.flatMapper.tryProcess(item);
    }

    @Override
    public boolean saveToSnapshot() {
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseIterable(this.keyToAcc.entrySet()).onFirstNull(() -> {
                this.snapshotTraverser = null;
            });
        }
        return this.emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    @Override
    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        Object old = this.keyToAcc.put(key, value);
        assert (old == null) : "Duplicate key '" + key + '\'';
    }
}

