/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.lib;

import java.util.Collection;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Tuple3;
import org.apache.crunch.lib.Cogroup;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;

public class Set {
    public static <T> PCollection<T> difference(PCollection<T> coll1, PCollection<T> coll2) {
        return Cogroup.cogroup(Set.toTable(coll1), Set.toTable(coll2)).parallelDo("Calculate differences of sets", new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>(){

            @Override
            public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input, Emitter<T> emitter) {
                Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
                if (!groups.first().isEmpty() && groups.second().isEmpty()) {
                    emitter.emit(input.first());
                }
            }
        }, coll1.getPType());
    }

    public static <T> PCollection<T> intersection(PCollection<T> coll1, PCollection<T> coll2) {
        return Cogroup.cogroup(Set.toTable(coll1), Set.toTable(coll2)).parallelDo("Calculate intersection of sets", new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>(){

            @Override
            public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input, Emitter<T> emitter) {
                Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
                if (!groups.first().isEmpty() && !groups.second().isEmpty()) {
                    emitter.emit(input.first());
                }
            }
        }, coll1.getPType());
    }

    public static <T> PCollection<Tuple3<T, T, T>> comm(PCollection<T> coll1, PCollection<T> coll2) {
        PTypeFamily typeFamily = coll1.getTypeFamily();
        PType<T> type = coll1.getPType();
        return Cogroup.cogroup(Set.toTable(coll1), Set.toTable(coll2)).parallelDo("Calculate common values of sets", new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, Tuple3<T, T, T>>(){

            @Override
            public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input, Emitter<Tuple3<T, T, T>> emitter) {
                Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
                boolean inFirst = !groups.first().isEmpty();
                boolean inSecond = !groups.second().isEmpty();
                Object t = input.first();
                emitter.emit(Tuple3.of(inFirst && !inSecond ? (Object)t : null, !inFirst && inSecond ? (Object)t : null, inFirst && inSecond ? (Object)t : null));
            }
        }, typeFamily.triples(type, type, type));
    }

    private static <T> PTable<T, Boolean> toTable(PCollection<T> coll) {
        PTypeFamily typeFamily = coll.getTypeFamily();
        return coll.parallelDo(new DoFn<T, Pair<T, Boolean>>(){

            @Override
            public void process(T input, Emitter<Pair<T, Boolean>> emitter) {
                emitter.emit(Pair.of(input, Boolean.TRUE));
            }
        }, typeFamily.tableOf(coll.getPType(), typeFamily.booleans()));
    }
}

