/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.lib;

import java.util.Random;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PTypeFamily;

public class Cartesian {
    static final int DEFAULT_PARALLELISM = 6;

    public static <K1, K2, U, V> PTable<Pair<K1, K2>, Pair<U, V>> cross(PTable<K1, U> left, PTable<K2, V> right) {
        return Cartesian.cross(left, right, 6);
    }

    public static <K1, K2, U, V> PTable<Pair<K1, K2>, Pair<U, V>> cross(PTable<K1, U> left, PTable<K2, V> right, int parallelism) {
        PTypeFamily ltf = left.getTypeFamily();
        PTypeFamily rtf = right.getTypeFamily();
        PTable<Pair<Integer, Integer>, Pair<K1, Pair<K2, V>>> leftCross = left.parallelDo(new GFCross(0, parallelism), (PTableType<K1, U>)ltf.tableOf(ltf.pairs(ltf.ints(), ltf.ints()), ltf.pairs(left.getKeyType(), left.getValueType())));
        PTable<Pair<Integer, Integer>, Pair<K2, V>> rightCross = right.parallelDo(new GFCross(1, parallelism), (PTableType<K2, Pair<K2, V>>)rtf.tableOf(rtf.pairs(rtf.ints(), rtf.ints()), rtf.pairs(right.getKeyType(), right.getValueType())));
        PTable<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, Pair<U, V>>>> cg = leftCross.join(rightCross);
        PTypeFamily ctf = cg.getTypeFamily();
        return cg.parallelDo(new MapFn<Pair<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, V>>>, Pair<Pair<K1, K2>, Pair<U, V>>>(){

            @Override
            public Pair<Pair<K1, K2>, Pair<U, V>> map(Pair<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, V>>> input) {
                Pair valuePair = input.second();
                return Pair.of(Pair.of(valuePair.first().first(), valuePair.second().first()), Pair.of(valuePair.first().second(), valuePair.second().second()));
            }
        }, (PTableType<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, Pair<U, V>>>>)ctf.tableOf(ctf.pairs(left.getKeyType(), right.getKeyType()), ctf.pairs(left.getValueType(), right.getValueType())));
    }

    public static <U, V> PCollection<Pair<U, V>> cross(PCollection<U> left, PCollection<V> right) {
        return Cartesian.cross(left, right, 6);
    }

    public static <U, V> PCollection<Pair<U, V>> cross(PCollection<U> left, PCollection<V> right, int parallelism) {
        PTypeFamily ltf = left.getTypeFamily();
        PTypeFamily rtf = right.getTypeFamily();
        PTableType<Pair<Integer, Integer>, U> ptt = ltf.tableOf(ltf.pairs(ltf.ints(), ltf.ints()), left.getPType());
        if (ptt == null) {
            throw new Error();
        }
        PTable<Pair<Integer, Integer>, V> leftCross = left.parallelDo(new GFCross(0, parallelism), ltf.tableOf(ltf.pairs(ltf.ints(), ltf.ints()), left.getPType()));
        PTable<Pair<Integer, Integer>, V> rightCross = right.parallelDo(new GFCross(1, parallelism), rtf.tableOf(rtf.pairs(rtf.ints(), rtf.ints()), right.getPType()));
        PTable<Pair<Integer, Integer>, Pair<U, V>> cg = leftCross.join(rightCross);
        PTypeFamily ctf = cg.getTypeFamily();
        return cg.parallelDo("Extract second element", new MapFn<Pair<Pair<Integer, Integer>, Pair<U, V>>, Pair<U, V>>(){

            @Override
            public Pair<U, V> map(Pair<Pair<Integer, Integer>, Pair<U, V>> input) {
                return input.second();
            }
        }, ctf.pairs(left.getPType(), right.getPType()));
    }

    private static class GFCross<V>
    extends DoFn<V, Pair<Pair<Integer, Integer>, V>> {
        private final int constantField;
        private final int parallelism;
        private final Random r;

        public GFCross(int constantField, int parallelism) {
            this.constantField = constantField;
            this.parallelism = parallelism;
            this.r = new Random();
        }

        @Override
        public void process(V input, Emitter<Pair<Pair<Integer, Integer>, V>> emitter) {
            int c = this.r.nextInt(this.parallelism);
            if (this.constantField == 0) {
                for (int i = 0; i < this.parallelism; ++i) {
                    emitter.emit(Pair.of(Pair.of(c, i), input));
                }
            } else {
                for (int i = 0; i < this.parallelism; ++i) {
                    emitter.emit(Pair.of(Pair.of(i, c), input));
                }
            }
        }
    }
}

