/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.lib;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.PriorityQueue;
import org.apache.crunch.Aggregator;
import org.apache.crunch.CombineFn;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.GroupingOptions;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PObject;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.fn.Aggregators;
import org.apache.crunch.lib.PTables;
import org.apache.crunch.materialize.pobject.FirstElementPObject;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.util.PartitionUtils;

public class Aggregate {
    public static <S> PTable<S, Long> count(PCollection<S> collect) {
        return Aggregate.count(collect, PartitionUtils.getRecommendedPartitions(collect));
    }

    public static <S> PTable<S, Long> count(PCollection<S> collect, int numPartitions) {
        PTypeFamily tf = collect.getTypeFamily();
        return collect.parallelDo("Aggregate.count", new MapFn<S, Pair<S, Long>>(){

            @Override
            public Pair<S, Long> map(S input) {
                return Pair.of(input, 1L);
            }
        }, tf.tableOf(collect.getPType(), tf.longs())).groupByKey(numPartitions).combineValues(Aggregators.SUM_LONGS());
    }

    public static <S> PObject<Long> length(PCollection<S> collect) {
        PTypeFamily tf = collect.getTypeFamily();
        PTable<Integer, Long> countTable = collect.parallelDo("Aggregate.count", new MapFn<S, Pair<Integer, Long>>(){

            @Override
            public Pair<Integer, Long> map(S input) {
                return Pair.of(1, 1L);
            }

            @Override
            public void cleanup(Emitter<Pair<Integer, Long>> e) {
                e.emit(Pair.of(1, 0L));
            }
        }, tf.tableOf(tf.ints(), tf.longs())).groupByKey(GroupingOptions.builder().numReducers(1).build()).combineValues(Aggregators.SUM_LONGS());
        PCollection<Long> count = countTable.values();
        return new FirstElementPObject<Long>(count);
    }

    public static <K, V> PTable<K, V> top(PTable<K, V> ptable, int limit, boolean maximize) {
        PTypeFamily ptf = ptable.getTypeFamily();
        PTableType<K, V> base = ptable.getPTableType();
        PType<Pair<K, V>> pairType = ptf.pairs(base.getKeyType(), base.getValueType());
        PTableType<Integer, Pair<K, V>> inter = ptf.tableOf(ptf.ints(), pairType);
        return ptable.parallelDo("top" + limit + "map", new TopKFn<K, V>(limit, maximize, pairType), inter).groupByKey(1).combineValues(new TopKCombineFn<K, V>(limit, maximize, pairType)).parallelDo("top" + limit + "reduce", new DoFn<Pair<Integer, Pair<K, V>>, Pair<K, V>>(){

            @Override
            public void process(Pair<Integer, Pair<K, V>> input, Emitter<Pair<K, V>> emitter) {
                emitter.emit(input.second());
            }
        }, base);
    }

    public static <S> PObject<S> max(PCollection<S> collect) {
        Class<S> clazz = collect.getPType().getTypeClass();
        if (!clazz.isPrimitive() && !Comparable.class.isAssignableFrom(clazz)) {
            throw new IllegalArgumentException("Can only get max for Comparable elements, not for: " + collect.getPType().getTypeClass());
        }
        PTypeFamily tf = collect.getTypeFamily();
        PCollection<S> maxCollect = PTables.values(collect.parallelDo("max", new DoFn<S, Pair<Boolean, S>>(){
            private transient S max = null;

            @Override
            public void process(S input, Emitter<Pair<Boolean, S>> emitter) {
                if (this.max == null || ((Comparable)this.max).compareTo(input) < 0) {
                    this.max = input;
                }
            }

            @Override
            public void cleanup(Emitter<Pair<Boolean, S>> emitter) {
                if (this.max != null) {
                    emitter.emit(Pair.of(true, this.max));
                }
            }
        }, tf.tableOf(tf.booleans(), collect.getPType())).groupByKey(1).combineValues(new CombineFn<Boolean, S>(){

            @Override
            public void process(Pair<Boolean, Iterable<S>> input, Emitter<Pair<Boolean, S>> emitter) {
                Object max = null;
                for (Object v : input.second()) {
                    if (max != null && ((Comparable)max).compareTo(v) >= 0) continue;
                    max = v;
                }
                emitter.emit(Pair.of(input.first(), max));
            }
        }));
        return new FirstElementPObject<S>(maxCollect);
    }

    public static <S> PObject<S> min(PCollection<S> collect) {
        Class<S> clazz = collect.getPType().getTypeClass();
        if (!clazz.isPrimitive() && !Comparable.class.isAssignableFrom(clazz)) {
            throw new IllegalArgumentException("Can only get min for Comparable elements, not for: " + collect.getPType().getTypeClass());
        }
        PTypeFamily tf = collect.getTypeFamily();
        PCollection<S> minCollect = PTables.values(collect.parallelDo("min", new DoFn<S, Pair<Boolean, S>>(){
            private transient S min = null;

            @Override
            public void process(S input, Emitter<Pair<Boolean, S>> emitter) {
                if (this.min == null || ((Comparable)this.min).compareTo(input) > 0) {
                    this.min = input;
                }
            }

            @Override
            public void cleanup(Emitter<Pair<Boolean, S>> emitter) {
                if (this.min != null) {
                    emitter.emit(Pair.of(false, this.min));
                }
            }
        }, tf.tableOf(tf.booleans(), collect.getPType())).groupByKey(1).combineValues(new CombineFn<Boolean, S>(){

            @Override
            public void process(Pair<Boolean, Iterable<S>> input, Emitter<Pair<Boolean, S>> emitter) {
                Object min = null;
                for (Object v : input.second()) {
                    if (min != null && ((Comparable)min).compareTo(v) <= 0) continue;
                    min = v;
                }
                emitter.emit(Pair.of(input.first(), min));
            }
        }));
        return new FirstElementPObject<S>(minCollect);
    }

    public static <K, V> PTable<K, Collection<V>> collectValues(PTable<K, V> collect) {
        PTypeFamily tf = collect.getTypeFamily();
        final PType<V> valueType = collect.getValueType();
        return collect.groupByKey().mapValues("collect", new MapFn<Iterable<V>, Collection<V>>(){

            @Override
            public void initialize() {
                valueType.initialize(this.getConfiguration());
            }

            @Override
            public Collection<V> map(Iterable<V> values) {
                ArrayList collected = Lists.newArrayList();
                for (Object value : values) {
                    collected.add(valueType.getDetachedValue(value));
                }
                return collected;
            }
        }, tf.collections(collect.getValueType()));
    }

    public static <S> PCollection<S> aggregate(PCollection<S> collect, Aggregator<S> aggregator) {
        PTypeFamily tf = collect.getTypeFamily();
        return collect.parallelDo("Aggregate.aggregator", new MapFn<S, Pair<Void, S>>(){

            @Override
            public Pair<Void, S> map(S input) {
                return Pair.of(null, input);
            }
        }, tf.tableOf(tf.nulls(), collect.getPType())).groupByKey(1).combineValues(aggregator).values();
    }

    public static class TopKCombineFn<K, V>
    extends CombineFn<Integer, Pair<K, V>> {
        private final int limit;
        private final boolean maximize;
        private PType<Pair<K, V>> pairType;

        public TopKCombineFn(int limit, boolean maximize, PType<Pair<K, V>> pairType) {
            this.limit = limit;
            this.maximize = maximize;
            this.pairType = pairType;
        }

        @Override
        public void initialize() {
            this.pairType.initialize(this.getConfiguration());
        }

        @Override
        public void process(Pair<Integer, Iterable<Pair<K, V>>> input, Emitter<Pair<Integer, Pair<K, V>>> emitter) {
            PairValueComparator cmp = new PairValueComparator(this.maximize);
            PriorityQueue<Pair<K, V>> queue = new PriorityQueue<Pair<K, V>>(this.limit, cmp);
            for (Pair<K, V> pair : input.second()) {
                queue.add(this.pairType.getDetachedValue(pair));
                if (queue.size() <= this.limit) continue;
                queue.poll();
            }
            ArrayList values = Lists.newArrayList(queue);
            Collections.sort(values, cmp);
            for (int i = values.size() - 1; i >= 0; --i) {
                emitter.emit(Pair.of(0, values.get(i)));
            }
        }
    }

    public static class TopKFn<K, V>
    extends DoFn<Pair<K, V>, Pair<Integer, Pair<K, V>>> {
        private final int limit;
        private final boolean maximize;
        private final PType<Pair<K, V>> pairType;
        private transient PriorityQueue<Pair<K, V>> values;

        public TopKFn(int limit, boolean ascending, PType<Pair<K, V>> pairType) {
            this.limit = limit;
            this.maximize = ascending;
            this.pairType = pairType;
        }

        @Override
        public void initialize() {
            this.values = new PriorityQueue(this.limit, new PairValueComparator(this.maximize));
            this.pairType.initialize(this.getConfiguration());
        }

        @Override
        public void process(Pair<K, V> input, Emitter<Pair<Integer, Pair<K, V>>> emitter) {
            this.values.add(this.pairType.getDetachedValue(input));
            if (this.values.size() > this.limit) {
                this.values.poll();
            }
        }

        @Override
        public void cleanup(Emitter<Pair<Integer, Pair<K, V>>> emitter) {
            for (Pair<K, V> p : this.values) {
                emitter.emit(Pair.of(0, p));
            }
        }
    }

    public static class PairValueComparator<K, V>
    implements Comparator<Pair<K, V>> {
        private final boolean ascending;

        public PairValueComparator(boolean ascending) {
            this.ascending = ascending;
        }

        @Override
        public int compare(Pair<K, V> left, Pair<K, V> right) {
            int cmp = ((Comparable)left.second()).compareTo(right.second());
            if (this.ascending) {
                return cmp;
            }
            return cmp == Integer.MIN_VALUE ? Integer.MAX_VALUE : -cmp;
        }
    }
}

