/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.heron.streamlet.impl.operators;

import com.twitter.heron.api.bolt.OutputCollector;
import com.twitter.heron.api.topology.TopologyContext;
import com.twitter.heron.api.tuple.Tuple;
import com.twitter.heron.api.tuple.Values;
import com.twitter.heron.api.windowing.TupleWindow;
import com.twitter.heron.streamlet.KeyValue;
import com.twitter.heron.streamlet.KeyedWindow;
import com.twitter.heron.streamlet.SerializableBiFunction;
import com.twitter.heron.streamlet.SerializableFunction;
import com.twitter.heron.streamlet.Window;
import com.twitter.heron.streamlet.impl.operators.StreamletWindowOperator;
import java.util.HashMap;
import java.util.Map;

public class GeneralReduceByKeyAndWindowOperator<K, V, VR>
extends StreamletWindowOperator {
    private static final long serialVersionUID = 2833576046687752396L;
    private SerializableFunction<V, K> keyExtractor;
    private VR identity;
    private SerializableBiFunction<VR, V, ? extends VR> reduceFn;
    private OutputCollector collector;

    public GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> keyExtractor, VR identity, SerializableBiFunction<VR, V, ? extends VR> reduceFn) {
        this.keyExtractor = keyExtractor;
        this.identity = identity;
        this.reduceFn = reduceFn;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        HashMap reduceMap = new HashMap();
        HashMap windowCountMap = new HashMap();
        for (Tuple tuple : inputWindow.get()) {
            Object tup = tuple.getValue(0);
            this.addMap(reduceMap, windowCountMap, tup);
        }
        long startWindow = inputWindow.getStartTimestamp() == null ? 0L : inputWindow.getStartTimestamp();
        long endWindow = inputWindow.getEndTimestamp() == null ? 0L : inputWindow.getEndTimestamp();
        for (Object key : reduceMap.keySet()) {
            Window window = new Window(startWindow, endWindow, ((Integer)windowCountMap.get(key)).intValue());
            KeyedWindow keyedWindow = new KeyedWindow(key, window);
            this.collector.emit(new Values(new KeyValue(keyedWindow, reduceMap.get(key))));
        }
    }

    private void addMap(Map<K, VR> reduceMap, Map<K, Integer> windowCountMap, V tup) {
        Object key = this.keyExtractor.apply(tup);
        if (!reduceMap.containsKey(key)) {
            reduceMap.put(key, this.identity);
            windowCountMap.put(key, 0);
        }
        reduceMap.put(key, this.reduceFn.apply(reduceMap.get(key), tup));
        windowCountMap.put(key, windowCountMap.get(key) + 1);
    }
}

