/*
 * Decompiled with CFR 0.152.
 */
package storm.trident.operation.impl;

import java.io.Serializable;
import java.util.Map;
import storm.trident.operation.Aggregator;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

public class SingleEmitAggregator
implements Aggregator<SingleEmitState> {
    Aggregator _agg;
    BatchToPartition _batchToPartition;
    int myPartitionIndex;
    int totalPartitions;

    public SingleEmitAggregator(Aggregator agg, BatchToPartition batchToPartition) {
        this._agg = agg;
        this._batchToPartition = batchToPartition;
    }

    @Override
    public SingleEmitState init(Object batchId, TridentCollector collector) {
        return new SingleEmitState(batchId);
    }

    @Override
    public void aggregate(SingleEmitState val, TridentTuple tuple, TridentCollector collector) {
        if (!val.received) {
            val.state = this._agg.init(val.batchId, collector);
            val.received = true;
        }
        this._agg.aggregate(val.state, tuple, collector);
    }

    @Override
    public void complete(SingleEmitState val, TridentCollector collector) {
        if (!val.received) {
            if (this.myPartitionIndex == this._batchToPartition.partitionIndex(val.batchId, this.totalPartitions)) {
                val.state = this._agg.init(val.batchId, collector);
                this._agg.complete(val.state, collector);
            }
        } else {
            this._agg.complete(val.state, collector);
        }
    }

    @Override
    public void prepare(Map conf, TridentOperationContext context) {
        this._agg.prepare(conf, context);
        this.myPartitionIndex = context.getPartitionIndex();
        this.totalPartitions = context.numPartitions();
    }

    @Override
    public void cleanup() {
        this._agg.cleanup();
    }

    static class SingleEmitState {
        boolean received = false;
        Object state;
        Object batchId;

        public SingleEmitState(Object batchId) {
            this.batchId = batchId;
        }
    }

    public static interface BatchToPartition
    extends Serializable {
        public int partitionIndex(Object var1, int var2);
    }
}

