/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.lib;

import com.google.common.collect.Lists;
import java.util.Collection;
import org.apache.crunch.MapFn;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Tuple;
import org.apache.crunch.Tuple3;
import org.apache.crunch.Tuple4;
import org.apache.crunch.TupleN;
import org.apache.crunch.Union;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.TupleFactory;

public class Cogroup {
    public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>> cogroup(PTable<K, U> left, PTable<K, V> right) {
        return Cogroup.cogroup(0, left, right);
    }

    public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>> cogroup(int numReducers, PTable<K, U> left, PTable<K, V> right) {
        PTypeFamily tf = left.getTypeFamily();
        return Cogroup.cogroup(tf.pairs(tf.collections(left.getValueType()), tf.collections(right.getValueType())), TupleFactory.PAIR, numReducers, left, right);
    }

    public static <K, V1, V2, V3> PTable<K, Tuple3.Collect<V1, V2, V3>> cogroup(PTable<K, V1> first, PTable<K, V2> second, PTable<K, V3> third) {
        return Cogroup.cogroup(0, first, second, third);
    }

    public static <K, V1, V2, V3> PTable<K, Tuple3.Collect<V1, V2, V3>> cogroup(int numReducers, PTable<K, V1> first, PTable<K, V2> second, PTable<K, V3> third) {
        return Cogroup.cogroup(Tuple3.Collect.derived(first.getValueType(), second.getValueType(), third.getValueType()), new TupleFactory<Tuple3.Collect<V1, V2, V3>>(){

            @Override
            public Tuple3.Collect<V1, V2, V3> makeTuple(Object ... values) {
                return new Tuple3.Collect((Collection)values[0], (Collection)values[1], (Collection)values[2]);
            }
        }, numReducers, first, second, third);
    }

    public static <K, V1, V2, V3, V4> PTable<K, Tuple4.Collect<V1, V2, V3, V4>> cogroup(PTable<K, V1> first, PTable<K, V2> second, PTable<K, V3> third, PTable<K, V4> fourth) {
        return Cogroup.cogroup(0, first, second, third, fourth);
    }

    public static <K, V1, V2, V3, V4> PTable<K, Tuple4.Collect<V1, V2, V3, V4>> cogroup(int numReducers, PTable<K, V1> first, PTable<K, V2> second, PTable<K, V3> third, PTable<K, V4> fourth) {
        return Cogroup.cogroup(Tuple4.Collect.derived(first.getValueType(), second.getValueType(), third.getValueType(), fourth.getValueType()), new TupleFactory<Tuple4.Collect<V1, V2, V3, V4>>(){

            @Override
            public Tuple4.Collect<V1, V2, V3, V4> makeTuple(Object ... values) {
                return new Tuple4.Collect((Collection)values[0], (Collection)values[1], (Collection)values[2], (Collection)values[3]);
            }
        }, numReducers, first, second, third, fourth);
    }

    public static <K> PTable<K, TupleN> cogroup(PTable<K, ?> first, PTable<K, ?> ... rest) {
        return Cogroup.cogroup(0, first, rest);
    }

    public static <K, U, V> PTable<K, TupleN> cogroup(int numReducers, PTable<K, ?> first, PTable<K, ?> ... rest) {
        PTypeFamily tf = first.getTypeFamily();
        PType[] components = new PType[1 + rest.length];
        components[0] = tf.collections(first.getValueType());
        for (int i = 0; i < rest.length; ++i) {
            components[i + 1] = tf.collections(rest[i].getValueType());
        }
        return Cogroup.cogroup(tf.tuples(components), TupleFactory.TUPLEN, numReducers, first, rest);
    }

    private static <K, T extends Tuple> PTable<K, T> cogroup(PType<T> outputType, TupleFactory tupleFactory, int numReducers, PTable<K, ?> first, PTable<K, ?> ... rest) {
        PTypeFamily ptf = first.getTypeFamily();
        PType[] ptypes = new PType[1 + rest.length];
        ptypes[0] = first.getValueType();
        for (int i = 0; i < rest.length; ++i) {
            ptypes[i + 1] = rest[i].getValueType();
        }
        PType<Union> itype = ptf.unionOf(ptypes);
        PTable<K, Union> firstInter = first.mapValues("coGroupTag1", new CogroupFn(0), itype);
        PTable[] inter = new PTable[rest.length];
        for (int i = 0; i < rest.length; ++i) {
            inter[i] = rest[i].mapValues("coGroupTag" + (i + 2), new CogroupFn(i + 1), itype);
        }
        PTable<K, Union> union = firstInter.union(inter);
        PGroupedTable<K, Union> grouped = numReducers > 0 ? union.groupByKey(numReducers) : union.groupByKey();
        return grouped.mapValues("cogroup", new PostGroupFn(tupleFactory, ptypes), outputType);
    }

    private static class PostGroupFn<T extends Tuple>
    extends MapFn<Iterable<Union>, T> {
        private final TupleFactory factory;
        private final PType[] ptypes;

        PostGroupFn(TupleFactory tf, PType ... ptypes) {
            this.factory = tf;
            this.ptypes = ptypes;
        }

        @Override
        public void initialize() {
            super.initialize();
            for (PType pt : this.ptypes) {
                pt.initialize(this.getConfiguration());
            }
        }

        @Override
        public T map(Iterable<Union> input) {
            Object[] collections = new Collection[this.ptypes.length];
            for (int i = 0; i < this.ptypes.length; ++i) {
                collections[i] = Lists.newArrayList();
            }
            for (Union t : input) {
                int index = t.getIndex();
                collections[index].add(this.ptypes[index].getDetachedValue(t.getValue()));
            }
            return this.factory.makeTuple(collections);
        }
    }

    private static class CogroupFn<T>
    extends MapFn<T, Union> {
        private final int index;

        CogroupFn(int index) {
            this.index = index;
        }

        @Override
        public Union map(T input) {
            return new Union(this.index, input);
        }
    }
}

