/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.streamlet.impl.operators;

import java.util.HashMap;
import java.util.Map;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.SerializableBinaryOperator;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.impl.operators.StreamletOperator;

public class ReduceByKeyOperator<R, K, T>
extends StreamletOperator<R, KeyValue<K, T>> {
    private SerializableFunction<R, K> keyExtractor;
    private SerializableFunction<R, T> valueExtractor;
    private SerializableBinaryOperator<T> reduceFn;
    private Map<K, T> reduceMap;

    public ReduceByKeyOperator(SerializableFunction<R, K> keyExtractor, SerializableFunction<R, T> valueExtractor, SerializableBinaryOperator<T> reduceFn) {
        this.keyExtractor = keyExtractor;
        this.valueExtractor = valueExtractor;
        this.reduceFn = reduceFn;
        this.reduceMap = new HashMap<K, T>();
    }

    @Override
    public void execute(Tuple tuple) {
        Object obj = tuple.getValue(0);
        Object key = this.keyExtractor.apply(obj);
        Object value = this.valueExtractor.apply(obj);
        Object newValue = this.reduceMap.containsKey(key) ? this.reduceFn.apply(this.reduceMap.get(key), value) : value;
        this.reduceMap.put(key, newValue);
        this.collector.emit(new Values(new KeyValue(key, newValue)));
        this.collector.ack(tuple);
    }
}

