/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.impl.mem.collect;

import org.apache.crunch.Aggregator;
import org.apache.crunch.CombineFn;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.GroupingOptions;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Target;
import org.apache.crunch.fn.Aggregators;
import org.apache.crunch.impl.mem.collect.MemCollection;
import org.apache.crunch.impl.mem.collect.MemTable;
import org.apache.crunch.impl.mem.collect.Shuffler;
import org.apache.crunch.lib.PTables;
import org.apache.crunch.types.PGroupedTableType;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;

class MemGroupedTable<K, V>
extends MemCollection<Pair<K, Iterable<V>>>
implements PGroupedTable<K, V> {
    private final MemTable<K, V> parent;

    private static <S, T> Iterable<Pair<S, Iterable<T>>> buildMap(MemTable<S, T> parent, GroupingOptions options) {
        PType<S> keyType = parent.getKeyType();
        Shuffler shuffler = Shuffler.create(keyType, options, parent.getPipeline());
        for (Pair pair : parent.materialize()) {
            shuffler.add(pair);
        }
        return shuffler;
    }

    public MemGroupedTable(MemTable<K, V> parent, GroupingOptions options) {
        super(MemGroupedTable.buildMap(parent, options));
        this.parent = parent;
    }

    @Override
    public PCollection<Pair<K, Iterable<V>>> union(PCollection<Pair<K, Iterable<V>>> ... collections) {
        throw new UnsupportedOperationException();
    }

    @Override
    public PCollection<Pair<K, Iterable<V>>> write(Target target) {
        this.getPipeline().write(this.ungroup(), target);
        return this;
    }

    @Override
    public PType<Pair<K, Iterable<V>>> getPType() {
        return this.getGroupedTableType();
    }

    @Override
    public PGroupedTableType<K, V> getGroupedTableType() {
        PTableType<K, V> parentType = this.parent.getPTableType();
        if (parentType != null) {
            return parentType.getGroupedTableType();
        }
        return null;
    }

    @Override
    public PTypeFamily getTypeFamily() {
        return this.parent.getTypeFamily();
    }

    @Override
    public long getSize() {
        return 1L;
    }

    @Override
    public String getName() {
        return "MemGrouped(" + this.parent.getName() + ")";
    }

    @Override
    public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
        return this.parallelDo(combineFn, this.parent.getPTableType());
    }

    @Override
    public PTable<K, V> combineValues(CombineFn<K, V> combineFn, CombineFn<K, V> reduceFn) {
        return this.combineValues(reduceFn);
    }

    @Override
    public PTable<K, V> combineValues(Aggregator<V> agg) {
        return this.combineValues(Aggregators.toCombineFn(agg));
    }

    @Override
    public PTable<K, V> combineValues(Aggregator<V> combineAgg, Aggregator<V> reduceAgg) {
        return this.combineValues(Aggregators.toCombineFn(combineAgg), Aggregators.toCombineFn(reduceAgg));
    }

    @Override
    public <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
        return PTables.mapValues(this, mapFn, ptype);
    }

    @Override
    public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
        return PTables.mapValues(name, this, mapFn, ptype);
    }

    @Override
    public PTable<K, V> ungroup() {
        return this.parallelDo("ungroup", new UngroupFn(), this.parent.getPTableType());
    }

    private static class UngroupFn<K, V>
    extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> {
        private UngroupFn() {
        }

        @Override
        public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
            for (V v : input.second()) {
                emitter.emit(Pair.of(input.first(), v));
            }
        }
    }
}

