package org.apache.crunch.lib;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.util.Iterator;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;

/* loaded from: input_file:org/apache/crunch/lib/Distinct.class */
public final class Distinct {
    private static final int DEFAULT_FLUSH_EVERY = 50000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/crunch/lib/Distinct$PostDistinctFn.class */
    public static class PostDistinctFn<S> extends DoFn<Pair<S, Iterable<Void>>, S> {
        private PostDistinctFn() {
        }

        @Override // org.apache.crunch.DoFn
        public void process(Pair<S, Iterable<Void>> pair, Emitter<S> emitter) {
            emitter.emit(pair.first());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/crunch/lib/Distinct$PreDistinctFn.class */
    public static class PreDistinctFn<S> extends DoFn<S, Pair<S, Void>> {
        private final java.util.Set<S> values = Sets.newHashSet();
        private final int flushEvery;

        public PreDistinctFn(int i) {
            this.flushEvery = i;
        }

        @Override // org.apache.crunch.DoFn
        public void process(S s, Emitter<Pair<S, Void>> emitter) {
            this.values.add(s);
            if (this.values.size() > this.flushEvery) {
                cleanup(emitter);
            }
        }

        @Override // org.apache.crunch.DoFn
        public void cleanup(Emitter<Pair<S, Void>> emitter) {
            Iterator<S> it = this.values.iterator();
            while (it.hasNext()) {
                emitter.emit(Pair.of(it.next(), null));
            }
            this.values.clear();
        }
    }

    public static <S> PCollection<S> distinct(PCollection<S> pCollection) {
        return distinct(pCollection, DEFAULT_FLUSH_EVERY);
    }

    public static <K, V> PTable<K, V> distinct(PTable<K, V> pTable) {
        return PTables.asPTable(distinct((PCollection) pTable));
    }

    public static <S> PCollection<S> distinct(PCollection<S> pCollection, int i) {
        Preconditions.checkArgument(i > 0);
        PType<S> pType = pCollection.getPType();
        PTypeFamily family = pType.getFamily();
        return (PCollection<S>) pCollection.parallelDo("pre-distinct", (DoFn) new PreDistinctFn(i), (PTableType) family.tableOf(pType, family.nulls())).groupByKey().parallelDo("post-distinct", new PostDistinctFn(), pType);
    }

    public static <K, V> PTable<K, V> distinct(PTable<K, V> pTable, int i) {
        return PTables.asPTable(distinct((PCollection) pTable, i));
    }

    private Distinct() {
    }
}
