/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.lib;

import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.GroupingOptions;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.lib.join.JoinUtils;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.hadoop.conf.Configuration;

public class SecondarySort {
    public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype) {
        return SecondarySort.sortAndApply(input, doFn, ptype, -1);
    }

    public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype, int numReducers) {
        return SecondarySort.prepare(input, numReducers).parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, T>(doFn), ptype);
    }

    public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype) {
        return SecondarySort.sortAndApply(input, doFn, ptype, -1);
    }

    public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype, int numReducers) {
        return SecondarySort.prepare(input, numReducers).parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, Pair<U, V>>(doFn), (PTableType<Pair<U, V1>, Pair<V1, V2>>)ptype);
    }

    private static <K, V1, V2> PGroupedTable<Pair<K, V1>, Pair<V1, V2>> prepare(PTable<K, Pair<V1, V2>> input, int numReducers) {
        PTypeFamily ptf = input.getTypeFamily();
        PType<Pair<V1, V2>> valueType = input.getValueType();
        PTableType inter = ptf.tableOf(ptf.pairs(input.getKeyType(), valueType.getSubTypes().get(0)), valueType);
        GroupingOptions.Builder gob = GroupingOptions.builder().requireSortedKeys().groupingComparatorClass(JoinUtils.getGroupingComparator(ptf)).partitionerClass(JoinUtils.getPartitionerClass(ptf));
        if (numReducers > 0) {
            gob.numReducers(numReducers);
        }
        return input.parallelDo("SecondarySort.format", new SSFormatFn(), inter).groupByKey(gob.build());
    }

    private static class SSWrapFn<K, V1, V2, T>
    extends DoFn<Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>>, T> {
        private final DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> intern;

        public SSWrapFn(DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> intern) {
            this.intern = intern;
        }

        @Override
        public void configure(Configuration conf) {
            this.intern.configure(conf);
        }

        @Override
        public void initialize() {
            this.intern.setContext(this.getContext());
            this.intern.initialize();
        }

        @Override
        public void process(Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>> input, Emitter<T> emitter) {
            this.intern.process(Pair.of(input.first().first(), input.second()), emitter);
        }

        @Override
        public void cleanup(Emitter<T> emitter) {
            this.intern.cleanup(emitter);
        }
    }

    private static class SSFormatFn<K, V1, V2>
    extends MapFn<Pair<K, Pair<V1, V2>>, Pair<Pair<K, V1>, Pair<V1, V2>>> {
        private SSFormatFn() {
        }

        @Override
        public Pair<Pair<K, V1>, Pair<V1, V2>> map(Pair<K, Pair<V1, V2>> input) {
            return Pair.of(Pair.of(input.first(), input.second().first()), input.second());
        }
    }
}

