/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.lib;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.util.Set;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.lib.PTables;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;

public final class Distinct {
    private static final int DEFAULT_FLUSH_EVERY = 50000;

    public static <S> PCollection<S> distinct(PCollection<S> input) {
        return Distinct.distinct(input, 50000);
    }

    public static <K, V> PTable<K, V> distinct(PTable<K, V> input) {
        return PTables.asPTable(Distinct.distinct(input));
    }

    public static <S> PCollection<S> distinct(PCollection<S> input, int flushEvery) {
        Preconditions.checkArgument((flushEvery > 0 ? 1 : 0) != 0);
        PType<S> pt = input.getPType();
        PTypeFamily ptf = pt.getFamily();
        return input.parallelDo("pre-distinct", new PreDistinctFn<S>(flushEvery, pt), ptf.tableOf(pt, ptf.nulls())).groupByKey().parallelDo("post-distinct", new PostDistinctFn(), pt);
    }

    public static <K, V> PTable<K, V> distinct(PTable<K, V> input, int flushEvery) {
        return PTables.asPTable(Distinct.distinct(input, flushEvery));
    }

    private Distinct() {
    }

    private static class PostDistinctFn<S>
    extends DoFn<Pair<S, Iterable<Void>>, S> {
        private PostDistinctFn() {
        }

        @Override
        public void process(Pair<S, Iterable<Void>> input, Emitter<S> emitter) {
            emitter.emit(input.first());
        }
    }

    private static class PreDistinctFn<S>
    extends DoFn<S, Pair<S, Void>> {
        private final Set<S> values = Sets.newHashSet();
        private final int flushEvery;
        private final PType<S> ptype;

        PreDistinctFn(int flushEvery, PType<S> ptype) {
            this.flushEvery = flushEvery;
            this.ptype = ptype;
        }

        @Override
        public void initialize() {
            super.initialize();
            this.ptype.initialize(this.getConfiguration());
        }

        @Override
        public void process(S input, Emitter<Pair<S, Void>> emitter) {
            this.values.add(this.ptype.getDetachedValue(input));
            if (this.values.size() > this.flushEvery) {
                this.cleanup(emitter);
            }
        }

        @Override
        public void cleanup(Emitter<Pair<S, Void>> emitter) {
            for (S in : this.values) {
                emitter.emit(Pair.of(in, null));
            }
            this.values.clear();
        }
    }
}

