/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.impl.mem.collect;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import javassist.util.proxy.MethodFilter;
import javassist.util.proxy.MethodHandler;
import javassist.util.proxy.ProxyFactory;
import org.apache.commons.lang.SerializationException;
import org.apache.commons.lang.SerializationUtils;
import org.apache.crunch.Aggregator;
import org.apache.crunch.CachingOptions;
import org.apache.crunch.DoFn;
import org.apache.crunch.FilterFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PObject;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineCallable;
import org.apache.crunch.ReadableData;
import org.apache.crunch.Target;
import org.apache.crunch.fn.ExtractKeyFn;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.impl.mem.collect.MemReadableData;
import org.apache.crunch.impl.mem.collect.MemTable;
import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
import org.apache.crunch.lib.Aggregate;
import org.apache.crunch.materialize.pobject.CollectionPObject;
import org.apache.crunch.materialize.pobject.FirstElementPObject;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;

public class MemCollection<S>
implements PCollection<S> {
    private final Collection<S> collect;
    private final PType<S> ptype;
    private String name;

    public MemCollection(Iterable<S> collect) {
        this(collect, null, null);
    }

    public MemCollection(Iterable<S> collect, PType<S> ptype) {
        this(collect, ptype, null);
    }

    public MemCollection(Iterable<S> collect, PType<S> ptype, String name) {
        this.collect = ImmutableList.copyOf(collect);
        this.ptype = ptype;
        this.name = name;
    }

    @Override
    public Pipeline getPipeline() {
        return MemPipeline.getInstance();
    }

    @Override
    public PCollection<S> union(PCollection<S> other) {
        return this.union(new PCollection[]{other});
    }

    @Override
    public PCollection<S> union(PCollection<S> ... collections) {
        ArrayList output = Lists.newArrayList();
        for (PCollection<S> pcollect : collections) {
            for (S s : pcollect.materialize()) {
                output.add(s);
            }
        }
        output.addAll(this.collect);
        return new MemCollection<S>(output, collections[0].getPType());
    }

    private <S, T> DoFn<S, T> verifySerializable(String name, DoFn<S, T> doFn) {
        try {
            return (DoFn)SerializationUtils.deserialize((byte[])SerializationUtils.serialize(doFn));
        }
        catch (SerializationException e) {
            throw new IllegalStateException(doFn.getClass().getSimpleName() + " named '" + name + "' cannot be serialized", e);
        }
    }

    @Override
    public <T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T> type) {
        return this.parallelDo(null, doFn, type);
    }

    @Override
    public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type) {
        return this.parallelDo(name, doFn, type, ParallelDoOptions.builder().build());
    }

    @Override
    public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type, ParallelDoOptions options) {
        doFn = this.verifySerializable(name, doFn);
        InMemoryEmitter emitter = new InMemoryEmitter();
        Configuration conf = this.getPipeline().getConfiguration();
        doFn.configure(conf);
        doFn.setContext(MemCollection.getInMemoryContext(conf));
        doFn.initialize();
        for (S s : this.collect) {
            doFn.process(s, emitter);
        }
        doFn.cleanup(emitter);
        return new MemCollection(emitter.getOutput(), type, name);
    }

    @Override
    public <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type) {
        return this.parallelDo((String)null, doFn, type);
    }

    @Override
    public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type) {
        return this.parallelDo(name, doFn, type, ParallelDoOptions.builder().build());
    }

    @Override
    public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type, ParallelDoOptions options) {
        InMemoryEmitter emitter = new InMemoryEmitter();
        Configuration conf = this.getPipeline().getConfiguration();
        doFn.configure(conf);
        doFn.setContext(MemCollection.getInMemoryContext(conf));
        doFn.initialize();
        for (S s : this.collect) {
            doFn.process(s, emitter);
        }
        doFn.cleanup(emitter);
        return new MemTable(emitter.getOutput(), type, name);
    }

    @Override
    public PCollection<S> write(Target target) {
        this.getPipeline().write(this, target);
        return this;
    }

    @Override
    public PCollection<S> write(Target target, Target.WriteMode writeMode) {
        this.getPipeline().write(this, target, writeMode);
        return this;
    }

    @Override
    public Iterable<S> materialize() {
        return this.collect;
    }

    @Override
    public PCollection<S> cache() {
        return this;
    }

    @Override
    public PCollection<S> cache(CachingOptions options) {
        return this;
    }

    @Override
    public PObject<Collection<S>> asCollection() {
        return new CollectionPObject(this);
    }

    @Override
    public PObject<S> first() {
        return new FirstElementPObject(this);
    }

    @Override
    public <Output> Output sequentialDo(String label, PipelineCallable<Output> pipelineCallable) {
        pipelineCallable.dependsOn(label, this);
        return this.getPipeline().sequentialDo(pipelineCallable);
    }

    @Override
    public ReadableData<S> asReadable(boolean materialize) {
        return new MemReadableData<S>(this.collect);
    }

    public Collection<S> getCollection() {
        return this.collect;
    }

    @Override
    public PType<S> getPType() {
        return this.ptype;
    }

    @Override
    public PTypeFamily getTypeFamily() {
        if (this.ptype != null) {
            return this.ptype.getFamily();
        }
        return null;
    }

    @Override
    public long getSize() {
        return this.collect.isEmpty() ? 0L : 1L;
    }

    @Override
    public String getName() {
        return this.name;
    }

    public String toString() {
        return this.collect.toString();
    }

    @Override
    public PTable<S, Long> count() {
        return Aggregate.count(this);
    }

    @Override
    public PObject<Long> length() {
        return Aggregate.length(this);
    }

    @Override
    public PObject<S> max() {
        return Aggregate.max(this);
    }

    @Override
    public PObject<S> min() {
        return Aggregate.min(this);
    }

    @Override
    public PCollection<S> aggregate(Aggregator<S> aggregator) {
        return Aggregate.aggregate(this, aggregator);
    }

    @Override
    public PCollection<S> filter(FilterFn<S> filterFn) {
        return this.parallelDo(filterFn, this.getPType());
    }

    @Override
    public PCollection<S> filter(String name, FilterFn<S> filterFn) {
        return this.parallelDo(name, filterFn, this.getPType());
    }

    @Override
    public <K> PTable<K, S> by(MapFn<S, K> mapFn, PType<K> keyType) {
        return this.parallelDo((DoFn)new ExtractKeyFn<K, S>(mapFn), this.getTypeFamily().tableOf(keyType, this.getPType()));
    }

    @Override
    public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
        return this.parallelDo(name, (DoFn)new ExtractKeyFn<K, S>(mapFn), this.getTypeFamily().tableOf(keyType, this.getPType()));
    }

    private static TaskInputOutputContext<?, ?, ?, ?> getInMemoryContext(final Configuration conf) {
        ProxyFactory factory = new ProxyFactory();
        Class<TaskInputOutputContext> superType = TaskInputOutputContext.class;
        Class[] types = new Class[]{};
        Object[] args = new Object[]{};
        final TaskAttemptID taskAttemptId = new TaskAttemptID();
        if (superType.isInterface()) {
            factory.setInterfaces(new Class[]{superType});
        } else {
            types = new Class[]{Configuration.class, TaskAttemptID.class, RecordWriter.class, OutputCommitter.class, StatusReporter.class};
            args = new Object[]{conf, taskAttemptId, null, null, null};
            factory.setSuperclass(superType);
        }
        ImmutableSet handledMethods = ImmutableSet.of((Object)"getConfiguration", (Object)"getCounter", (Object)"progress", (Object)"getNumReduceTasks", (Object)"getTaskAttemptID");
        factory.setFilter(new MethodFilter((Set)handledMethods){
            final /* synthetic */ Set val$handledMethods;
            {
                this.val$handledMethods = set;
            }

            public boolean isHandled(Method m) {
                return this.val$handledMethods.contains(m.getName());
            }
        });
        MethodHandler handler = new MethodHandler(){

            public Object invoke(Object arg0, Method m, Method arg2, Object[] args) throws Throwable {
                String name = m.getName();
                if ("getConfiguration".equals(name)) {
                    return conf;
                }
                if ("progress".equals(name)) {
                    return null;
                }
                if ("getTaskAttemptID".equals(name)) {
                    return taskAttemptId;
                }
                if ("getNumReduceTasks".equals(name)) {
                    return 1;
                }
                if ("getCounter".equals(name)) {
                    if (args.length == 1) {
                        return MemPipeline.getCounters().findCounter((Enum)args[0]);
                    }
                    return MemPipeline.getCounters().findCounter((String)args[0], (String)args[1]);
                }
                throw new IllegalStateException("Unhandled method " + name);
            }
        };
        try {
            Object newInstance = factory.create(types, args, handler);
            return (TaskInputOutputContext)newInstance;
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
}

