/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.streamlet.impl.operators;

import java.util.HashMap;
import java.util.Map;
import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.api.windowing.TupleWindow;
import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.KeyedWindow;
import org.apache.heron.streamlet.SerializableBinaryOperator;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.Window;
import org.apache.heron.streamlet.impl.operators.StreamletWindowOperator;

public class ReduceByKeyAndWindowOperator<K, V, R>
extends StreamletWindowOperator {
    private static final long serialVersionUID = 2833576046687750496L;
    private SerializableFunction<R, K> keyExtractor;
    private SerializableFunction<R, V> valueExtractor;
    private SerializableBinaryOperator<V> reduceFn;
    private OutputCollector collector;

    public ReduceByKeyAndWindowOperator(SerializableFunction<R, K> keyExtractor, SerializableFunction<R, V> valueExtractor, SerializableBinaryOperator<V> reduceFn) {
        this.keyExtractor = keyExtractor;
        this.valueExtractor = valueExtractor;
        this.reduceFn = reduceFn;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        HashMap reduceMap = new HashMap();
        HashMap windowCountMap = new HashMap();
        for (Tuple tuple : inputWindow.get()) {
            Object tup = tuple.getValue(0);
            this.addMap(reduceMap, windowCountMap, tup);
        }
        long startWindow = inputWindow.getStartTimestamp() == null ? 0L : inputWindow.getStartTimestamp();
        long endWindow = inputWindow.getEndTimestamp() == null ? 0L : inputWindow.getEndTimestamp();
        for (Object key : reduceMap.keySet()) {
            Window window = new Window(startWindow, endWindow, ((Integer)windowCountMap.get(key)).intValue());
            KeyedWindow keyedWindow = new KeyedWindow(key, window);
            this.collector.emit(new Values(new KeyValue(keyedWindow, reduceMap.get(key))));
        }
    }

    private void addMap(Map<K, V> reduceMap, Map<K, Integer> windowCountMap, R tup) {
        Object key = this.keyExtractor.apply(tup);
        if (reduceMap.containsKey(key)) {
            reduceMap.put(key, this.reduceFn.apply(reduceMap.get(key), this.valueExtractor.apply(tup)));
            windowCountMap.put(key, windowCountMap.get(key) + 1);
        } else {
            reduceMap.put(key, this.valueExtractor.apply(tup));
            windowCountMap.put(key, 1);
        }
    }
}

