/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.streams.processors;

import java.util.HashMap;
import java.util.Map;
import org.apache.storm.streams.Pair;
import org.apache.storm.streams.operations.CombinerAggregator;
import org.apache.storm.streams.processors.BaseProcessor;
import org.apache.storm.streams.processors.BatchProcessor;

public class AggregateByKeyProcessor<K, V, A, R>
extends BaseProcessor<Pair<K, V>>
implements BatchProcessor {
    private final CombinerAggregator<V, A, R> aggregator;
    private final boolean emitAggregate;
    private final Map<K, A> state = new HashMap<K, A>();

    public AggregateByKeyProcessor(CombinerAggregator<V, A, R> aggregator) {
        this(aggregator, false);
    }

    public AggregateByKeyProcessor(CombinerAggregator<V, A, R> aggregator, boolean emitAggregate) {
        this.aggregator = aggregator;
        this.emitAggregate = emitAggregate;
    }

    @Override
    public void execute(Pair<K, V> input) {
        K key = input.getFirst();
        V val = input.getSecond();
        A accumulator = this.state.get(key);
        if (accumulator == null) {
            accumulator = this.aggregator.init();
        }
        this.state.put(key, this.aggregator.apply(accumulator, val));
        if (this.emitAggregate) {
            this.mayBeForwardAggUpdate(() -> Pair.of(key, this.state.get(key)));
        } else {
            this.mayBeForwardAggUpdate(() -> Pair.of(key, this.aggregator.result(this.state.get(key))));
        }
    }

    @Override
    public void finish() {
        for (Map.Entry<K, A> entry : this.state.entrySet()) {
            if (this.emitAggregate) {
                this.context.forward(Pair.of(entry.getKey(), entry.getValue()));
                continue;
            }
            this.context.forward(Pair.of(entry.getKey(), this.aggregator.result(entry.getValue())));
        }
        this.state.clear();
    }

    public String toString() {
        return "AggregateByKeyProcessor{aggregator=" + this.aggregator + ", emitAggregate=" + this.emitAggregate + ", state=" + this.state + "}";
    }
}

