/*
 * Decompiled with CFR 0.152.
 */
package storm.trident.operation.builtin;

import backtype.storm.tuple.Fields;
import java.util.Comparator;
import java.util.PriorityQueue;
import storm.trident.Stream;
import storm.trident.operation.Aggregator;
import storm.trident.operation.Assembly;
import storm.trident.operation.BaseAggregator;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FirstN
implements Assembly {
    Aggregator _agg;

    public FirstN(int n, String sortField) {
        this(n, sortField, false);
    }

    public FirstN(int n, String sortField, boolean reverse) {
        this._agg = sortField != null ? new FirstNSortedAgg(n, sortField, reverse) : new FirstNAgg(n);
    }

    @Override
    public Stream apply(Stream input) {
        Fields outputFields = input.getOutputFields();
        return input.partitionAggregate(outputFields, this._agg, outputFields).global().partitionAggregate(outputFields, this._agg, outputFields);
    }

    public static class FirstNSortedAgg
    extends BaseAggregator<PriorityQueue> {
        int _n;
        String _sortField;
        boolean _reverse;

        public FirstNSortedAgg(int n, String sortField, boolean reverse) {
            this._n = n;
            this._sortField = sortField;
            this._reverse = reverse;
        }

        @Override
        public PriorityQueue init(Object batchId, TridentCollector collector) {
            return new PriorityQueue<TridentTuple>(this._n, new Comparator<TridentTuple>(){

                @Override
                public int compare(TridentTuple t1, TridentTuple t2) {
                    Comparable c1 = (Comparable)t1.getValueByField(FirstNSortedAgg.this._sortField);
                    Comparable c2 = (Comparable)t2.getValueByField(FirstNSortedAgg.this._sortField);
                    int ret = c1.compareTo(c2);
                    if (FirstNSortedAgg.this._reverse) {
                        ret *= -1;
                    }
                    return ret;
                }
            });
        }

        @Override
        public void aggregate(PriorityQueue state, TridentTuple tuple, TridentCollector collector) {
            state.add(tuple);
        }

        @Override
        public void complete(PriorityQueue val, TridentCollector collector) {
            int total = val.size();
            for (int i = 0; i < this._n && i < total; ++i) {
                TridentTuple t = (TridentTuple)val.remove();
                collector.emit(t);
            }
        }
    }

    public static class FirstNAgg
    extends BaseAggregator<State> {
        int _n;

        public FirstNAgg(int n) {
            this._n = n;
        }

        @Override
        public State init(Object batchId, TridentCollector collector) {
            return new State();
        }

        @Override
        public void aggregate(State val, TridentTuple tuple, TridentCollector collector) {
            if (val.emitted < this._n) {
                collector.emit(tuple);
                ++val.emitted;
            }
        }

        @Override
        public void complete(State val, TridentCollector collector) {
        }

        static class State {
            int emitted = 0;

            State() {
            }
        }
    }
}

