/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.lib;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Set;
import javassist.util.proxy.MethodFilter;
import javassist.util.proxy.MethodHandler;
import javassist.util.proxy.ProxyFactory;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.ReflectionUtils;

public class Mapred {
    public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> map(PTable<K1, V1> input, Class<? extends Mapper<K1, V1, K2, V2>> mapperClass, Class<K2> keyClass, Class<V2> valueClass) {
        return input.parallelDo(new MapperFn(mapperClass), (PTableType<K1, V1>)Mapred.tableOf(keyClass, valueClass));
    }

    public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> reduce(PGroupedTable<K1, V1> input, Class<? extends Reducer<K1, V1, K2, V2>> reducerClass, Class<K2> keyClass, Class<V2> valueClass) {
        return input.parallelDo(new ReducerFn(reducerClass), (PTableType<K1, V1>)Mapred.tableOf(keyClass, valueClass));
    }

    private static <K extends Writable, V extends Writable> PTableType<K, V> tableOf(Class<K> keyClass, Class<V> valueClass) {
        return Writables.tableOf(Writables.writables(keyClass), Writables.writables(valueClass));
    }

    private static Counters.Counter proxyCounter(Counter c) {
        ProxyFactory proxyFactory = new ProxyFactory();
        proxyFactory.setSuperclass(Counters.Counter.class);
        proxyFactory.setFilter(CCMethodHandler.FILTER);
        CCMethodHandler handler = new CCMethodHandler(c);
        try {
            return (Counters.Counter)proxyFactory.create(new Class[0], new Object[0], (MethodHandler)handler);
        }
        catch (Exception e) {
            throw new CrunchRuntimeException(e);
        }
    }

    private static class CCMethodHandler
    implements MethodHandler {
        private static final Set<String> HANDLED = ImmutableSet.of((Object)"increment", (Object)"getCounter", (Object)"getValue", (Object)"getName", (Object)"getDisplayName", (Object)"setValue", (Object[])new String[]{"getUnderlyingCounter", "readFields", "write"});
        public static final MethodFilter FILTER = new MethodFilter(){

            public boolean isHandled(Method m) {
                return HANDLED.contains(m.getName());
            }
        };
        private final Counter c;

        public CCMethodHandler(Counter c) {
            this.c = c;
        }

        public Object invoke(Object obj, Method m, Method m2, Object[] args) throws Throwable {
            String name = m.getName();
            if ("increment".equals(name)) {
                this.c.increment(((Long)args[0]).longValue());
                return null;
            }
            if ("getCounter".equals(name) || "getValue".equals(name)) {
                return this.c.getValue();
            }
            if ("setValue".equals(name)) {
                this.c.setValue(((Long)args[0]).longValue());
                return null;
            }
            if ("getDisplayName".equals(name)) {
                return this.c.getDisplayName();
            }
            if ("getName".equals(name)) {
                return this.c.getName();
            }
            if ("getUnderlyingCounter".equals(name)) {
                return this.c;
            }
            if ("readFields".equals(name)) {
                this.c.readFields((DataInput)args[0]);
                return null;
            }
            if ("write".equals(name)) {
                this.c.write((DataOutput)args[0]);
                return null;
            }
            throw new IllegalStateException("Unhandled Counters.Counter method = " + name);
        }
    }

    private static class OutputCollectorImpl<K, V>
    implements OutputCollector<K, V> {
        private Emitter<Pair<K, V>> emitter;

        public void set(Emitter<Pair<K, V>> emitter) {
            this.emitter = emitter;
        }

        public void collect(K k, V v) throws IOException {
            this.emitter.emit(Pair.of(k, v));
        }
    }

    private static class ReducerFn<K1, V1, K2 extends Writable, V2 extends Writable>
    extends DoFn<Pair<K1, Iterable<V1>>, Pair<K2, V2>>
    implements Reporter {
        private final Class<? extends Reducer<K1, V1, K2, V2>> reducerClass;
        private transient Reducer<K1, V1, K2, V2> instance;
        private transient OutputCollectorImpl<K2, V2> outputCollector;

        public ReducerFn(Class<? extends Reducer<K1, V1, K2, V2>> reducerClass) {
            this.reducerClass = (Class)Preconditions.checkNotNull(reducerClass);
        }

        @Override
        public void initialize() {
            if (this.instance == null) {
                this.instance = (Reducer)ReflectionUtils.newInstance(this.reducerClass, (Configuration)this.getConfiguration());
            }
            this.instance.configure(new JobConf(this.getConfiguration()));
            this.outputCollector = new OutputCollectorImpl();
        }

        @Override
        public void process(Pair<K1, Iterable<V1>> input, Emitter<Pair<K2, V2>> emitter) {
            this.outputCollector.set(emitter);
            try {
                this.instance.reduce(input.first(), input.second().iterator(), this.outputCollector, (Reporter)this);
            }
            catch (IOException e) {
                throw new CrunchRuntimeException(e);
            }
        }

        @Override
        public void cleanup(Emitter<Pair<K2, V2>> emitter) {
            try {
                this.instance.close();
            }
            catch (IOException e) {
                throw new CrunchRuntimeException("Error closing mapper = " + this.reducerClass, e);
            }
        }

        @Override
        public void progress() {
            super.progress();
        }

        @Override
        public void setStatus(String status) {
            super.setStatus(status);
        }

        public Counters.Counter getCounter(Enum<?> counter) {
            return Mapred.proxyCounter(super.getCounter(counter));
        }

        public Counters.Counter getCounter(String group, String name) {
            return Mapred.proxyCounter(super.getCounter(group, name));
        }

        public InputSplit getInputSplit() throws UnsupportedOperationException {
            return null;
        }

        public void incrCounter(Enum<?> counter, long by) {
            super.increment(counter, by);
        }

        public void incrCounter(String group, String name, long by) {
            super.increment(group, name, by);
        }

        public float getProgress() {
            return 0.5f;
        }
    }

    private static class MapperFn<K1, V1, K2 extends Writable, V2 extends Writable>
    extends DoFn<Pair<K1, V1>, Pair<K2, V2>>
    implements Reporter {
        private final Class<? extends Mapper<K1, V1, K2, V2>> mapperClass;
        private transient Mapper<K1, V1, K2, V2> instance;
        private transient OutputCollectorImpl<K2, V2> outputCollector;

        public MapperFn(Class<? extends Mapper<K1, V1, K2, V2>> mapperClass) {
            this.mapperClass = (Class)Preconditions.checkNotNull(mapperClass);
        }

        @Override
        public void initialize() {
            if (this.instance == null) {
                this.instance = (Mapper)ReflectionUtils.newInstance(this.mapperClass, (Configuration)this.getConfiguration());
            }
            this.instance.configure(new JobConf(this.getConfiguration()));
            this.outputCollector = new OutputCollectorImpl();
        }

        @Override
        public void process(Pair<K1, V1> input, Emitter<Pair<K2, V2>> emitter) {
            this.outputCollector.set(emitter);
            try {
                this.instance.map(input.first(), input.second(), this.outputCollector, (Reporter)this);
            }
            catch (IOException e) {
                throw new CrunchRuntimeException(e);
            }
        }

        @Override
        public void cleanup(Emitter<Pair<K2, V2>> emitter) {
            try {
                this.instance.close();
            }
            catch (IOException e) {
                throw new CrunchRuntimeException("Error closing mapper = " + this.mapperClass, e);
            }
        }

        @Override
        public void progress() {
            super.progress();
        }

        @Override
        public void setStatus(String status) {
            super.setStatus(status);
        }

        public Counters.Counter getCounter(Enum<?> counter) {
            return Mapred.proxyCounter(super.getCounter(counter));
        }

        public Counters.Counter getCounter(String group, String name) {
            return Mapred.proxyCounter(super.getCounter(group, name));
        }

        public InputSplit getInputSplit() throws UnsupportedOperationException {
            return null;
        }

        public void incrCounter(Enum<?> counter, long by) {
            super.increment(counter, by);
        }

        public void incrCounter(String group, String name, long by) {
            super.increment(group, name, by);
        }

        public float getProgress() {
            return 0.5f;
        }
    }
}

