/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.types.writable;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.MapFn;
import org.apache.crunch.Pair;
import org.apache.crunch.Tuple;
import org.apache.crunch.Tuple3;
import org.apache.crunch.Tuple4;
import org.apache.crunch.TupleN;
import org.apache.crunch.Union;
import org.apache.crunch.fn.CompositeMapFn;
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypes;
import org.apache.crunch.types.TupleFactory;
import org.apache.crunch.types.writable.GenericArrayWritable;
import org.apache.crunch.types.writable.TextMapWritable;
import org.apache.crunch.types.writable.TupleWritable;
import org.apache.crunch.types.writable.UnionWritable;
import org.apache.crunch.types.writable.WritableTableType;
import org.apache.crunch.types.writable.WritableType;
import org.apache.crunch.types.writable.WritableTypeFamily;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;

public class Writables {
    private static final Log LOG = LogFactory.getLog(Writables.class);
    static BiMap<Integer, Class<? extends Writable>> WRITABLE_CODES = HashBiMap.create((Map)ImmutableBiMap.builder().put((Object)1, BytesWritable.class).put((Object)2, Text.class).put((Object)3, IntWritable.class).put((Object)4, LongWritable.class).put((Object)5, FloatWritable.class).put((Object)6, DoubleWritable.class).put((Object)7, BooleanWritable.class).put((Object)8, TupleWritable.class).put((Object)9, TextMapWritable.class).put((Object)10, UnionWritable.class).build());
    private static final String WRITABLE_COMPARABLE_CODES = "crunch.writable.comparable.codes";
    private static final MapFn<NullWritable, Void> NULL_WRITABLE_TO_VOID = new MapFn<NullWritable, Void>(){

        @Override
        public Void map(NullWritable input) {
            return null;
        }
    };
    private static final MapFn<Void, NullWritable> VOID_TO_NULL_WRITABLE = new MapFn<Void, NullWritable>(){

        @Override
        public NullWritable map(Void input) {
            return NullWritable.get();
        }
    };
    private static final MapFn<Text, String> TEXT_TO_STRING = new MapFn<Text, String>(){

        @Override
        public String map(Text input) {
            return input.toString();
        }
    };
    private static final MapFn<String, Text> STRING_TO_TEXT = new MapFn<String, Text>(){

        @Override
        public Text map(String input) {
            return new Text(input);
        }
    };
    private static final MapFn<IntWritable, Integer> IW_TO_INT = new MapFn<IntWritable, Integer>(){

        @Override
        public Integer map(IntWritable input) {
            return input.get();
        }
    };
    private static final MapFn<Integer, IntWritable> INT_TO_IW = new MapFn<Integer, IntWritable>(){

        @Override
        public IntWritable map(Integer input) {
            return new IntWritable(input.intValue());
        }
    };
    private static final MapFn<LongWritable, Long> LW_TO_LONG = new MapFn<LongWritable, Long>(){

        @Override
        public Long map(LongWritable input) {
            return input.get();
        }
    };
    private static final MapFn<Long, LongWritable> LONG_TO_LW = new MapFn<Long, LongWritable>(){

        @Override
        public LongWritable map(Long input) {
            return new LongWritable(input.longValue());
        }
    };
    private static final MapFn<FloatWritable, Float> FW_TO_FLOAT = new MapFn<FloatWritable, Float>(){

        @Override
        public Float map(FloatWritable input) {
            return Float.valueOf(input.get());
        }
    };
    private static final MapFn<Float, FloatWritable> FLOAT_TO_FW = new MapFn<Float, FloatWritable>(){

        @Override
        public FloatWritable map(Float input) {
            return new FloatWritable(input.floatValue());
        }
    };
    private static final MapFn<DoubleWritable, Double> DW_TO_DOUBLE = new MapFn<DoubleWritable, Double>(){

        @Override
        public Double map(DoubleWritable input) {
            return input.get();
        }
    };
    private static final MapFn<Double, DoubleWritable> DOUBLE_TO_DW = new MapFn<Double, DoubleWritable>(){

        @Override
        public DoubleWritable map(Double input) {
            return new DoubleWritable(input.doubleValue());
        }
    };
    private static final MapFn<BooleanWritable, Boolean> BW_TO_BOOLEAN = new MapFn<BooleanWritable, Boolean>(){

        @Override
        public Boolean map(BooleanWritable input) {
            return input.get();
        }
    };
    private static final BooleanWritable TRUE = new BooleanWritable(true);
    private static final BooleanWritable FALSE = new BooleanWritable(false);
    private static final MapFn<Boolean, BooleanWritable> BOOLEAN_TO_BW = new MapFn<Boolean, BooleanWritable>(){

        @Override
        public BooleanWritable map(Boolean input) {
            return input != false ? TRUE : FALSE;
        }
    };
    private static final MapFn<BytesWritable, ByteBuffer> BW_TO_BB = new MapFn<BytesWritable, ByteBuffer>(){

        @Override
        public ByteBuffer map(BytesWritable input) {
            return ByteBuffer.wrap(input.getBytes(), 0, input.getLength());
        }
    };
    private static final MapFn<ByteBuffer, BytesWritable> BB_TO_BW = new MapFn<ByteBuffer, BytesWritable>(){

        @Override
        public BytesWritable map(ByteBuffer input) {
            BytesWritable bw = new BytesWritable();
            bw.set(input.array(), input.arrayOffset(), input.limit());
            return bw;
        }
    };
    private static final WritableType<Void, NullWritable> nulls = WritableType.immutableType(Void.class, NullWritable.class, NULL_WRITABLE_TO_VOID, VOID_TO_NULL_WRITABLE, new PType[0]);
    private static final WritableType<String, Text> strings = WritableType.immutableType(String.class, Text.class, TEXT_TO_STRING, STRING_TO_TEXT, new PType[0]);
    private static final WritableType<Long, LongWritable> longs = WritableType.immutableType(Long.class, LongWritable.class, LW_TO_LONG, LONG_TO_LW, new PType[0]);
    private static final WritableType<Integer, IntWritable> ints = WritableType.immutableType(Integer.class, IntWritable.class, IW_TO_INT, INT_TO_IW, new PType[0]);
    private static final WritableType<Float, FloatWritable> floats = WritableType.immutableType(Float.class, FloatWritable.class, FW_TO_FLOAT, FLOAT_TO_FW, new PType[0]);
    private static final WritableType<Double, DoubleWritable> doubles = WritableType.immutableType(Double.class, DoubleWritable.class, DW_TO_DOUBLE, DOUBLE_TO_DW, new PType[0]);
    private static final WritableType<Boolean, BooleanWritable> booleans = WritableType.immutableType(Boolean.class, BooleanWritable.class, BW_TO_BOOLEAN, BOOLEAN_TO_BW, new PType[0]);
    private static final WritableType<ByteBuffer, BytesWritable> bytes = new WritableType<ByteBuffer, BytesWritable>(ByteBuffer.class, BytesWritable.class, BW_TO_BB, BB_TO_BW, new PType[0]);
    private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.builder().put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats).put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
    private static final Map<Class<?>, WritableType<?, ?>> EXTENSIONS = Maps.newHashMap();

    public static void registerComparable(Class<? extends WritableComparable> clazz) {
        int code = clazz.hashCode();
        if (code < 0) {
            code = -code;
        }
        if (code < WRITABLE_CODES.size()) {
            code += WRITABLE_CODES.size();
        }
        Writables.registerComparable(clazz, code);
    }

    public static void registerComparable(Class<? extends WritableComparable> clazz, int code) {
        if (WRITABLE_CODES.containsKey((Object)code)) {
            throw new IllegalArgumentException("Already have writable class assigned to code = " + code);
        }
        WRITABLE_CODES.put((Object)code, clazz);
    }

    private static void serializeWritableComparableCodes(Configuration conf) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(baos);
        oos.writeObject(WRITABLE_CODES);
        oos.close();
        conf.set(WRITABLE_COMPARABLE_CODES, Base64.encodeBase64String((byte[])baos.toByteArray()));
    }

    static void reloadWritableComparableCodes(Configuration conf) throws Exception {
        if (conf.get(WRITABLE_COMPARABLE_CODES) != null) {
            ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64((String)conf.get(WRITABLE_COMPARABLE_CODES)));
            ObjectInputStream ois = new ObjectInputStream(bais);
            BiMap codes = (BiMap)ois.readObject();
            ois.close();
            for (Map.Entry e : codes.entrySet()) {
                WRITABLE_CODES.put(e.getKey(), e.getValue());
            }
        }
    }

    public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
        return PRIMITIVES.get(clazz);
    }

    public static <T> void register(Class<T> clazz, WritableType<T, ? extends Writable> ptype) {
        EXTENSIONS.put(clazz, ptype);
    }

    public static final WritableType<Void, NullWritable> nulls() {
        return nulls;
    }

    public static final WritableType<String, Text> strings() {
        return strings;
    }

    public static final WritableType<Long, LongWritable> longs() {
        return longs;
    }

    public static final WritableType<Integer, IntWritable> ints() {
        return ints;
    }

    public static final WritableType<Float, FloatWritable> floats() {
        return floats;
    }

    public static final WritableType<Double, DoubleWritable> doubles() {
        return doubles;
    }

    public static final WritableType<Boolean, BooleanWritable> booleans() {
        return booleans;
    }

    public static final WritableType<ByteBuffer, BytesWritable> bytes() {
        return bytes;
    }

    public static final <T, W extends Writable> WritableType<T, W> records(Class<T> clazz) {
        if (EXTENSIONS.containsKey(clazz)) {
            return EXTENSIONS.get(clazz);
        }
        if (Writable.class.isAssignableFrom(clazz)) {
            return Writables.writables(clazz.asSubclass(Writable.class));
        }
        throw new IllegalArgumentException("Cannot create Writable records from non-Writable class" + clazz.getCanonicalName());
    }

    public static <W extends Writable> WritableType<W, W> writables(Class<W> clazz) {
        IdentityFn wIdentity = IdentityFn.getInstance();
        return new WritableType<W, W>(clazz, clazz, wIdentity, wIdentity, new PType[0]);
    }

    public static <K, V> WritableTableType<K, V> tableOf(PType<K> key, PType<V> value) {
        WritableTableType wtt;
        if (key instanceof WritableTableType) {
            wtt = (WritableTableType)key;
            key = Writables.pairs(wtt.getKeyType(), wtt.getValueType());
        } else if (!(key instanceof WritableType)) {
            throw new IllegalArgumentException("Key type must be of class WritableType");
        }
        if (value instanceof WritableTableType) {
            wtt = (WritableTableType)value;
            value = Writables.pairs(wtt.getKeyType(), wtt.getValueType());
        } else if (!(value instanceof WritableType)) {
            throw new IllegalArgumentException("Value type must be of class WritableType");
        }
        return new WritableTableType((WritableType)key, (WritableType)value);
    }

    private static BytesWritable asBytesWritable(Writable w) {
        if (w instanceof BytesWritable) {
            return (BytesWritable)w;
        }
        return new BytesWritable(WritableUtils.toByteArray((Writable[])new Writable[]{w}));
    }

    private static <W extends Writable> W create(Class<W> clazz, Writable writable) {
        if (clazz.equals(writable.getClass())) {
            return (W)writable;
        }
        Writable instance = WritableFactories.newInstance(clazz);
        BytesWritable bytes = (BytesWritable)writable;
        try {
            instance.readFields((DataInput)new DataInputStream(new ByteArrayInputStream(bytes.getBytes())));
        }
        catch (IOException e) {
            throw new CrunchRuntimeException(e);
        }
        return (W)instance;
    }

    public static <V1, V2> WritableType<Pair<V1, V2>, TupleWritable> pairs(PType<V1> p1, PType<V2> p2) {
        TWTupleMapFn input = new TWTupleMapFn(TupleFactory.PAIR, (WritableType)p1, (WritableType)p2);
        TupleTWMapFn output = new TupleTWMapFn(p1, p2);
        return new WritableType<Tuple, TupleWritable>(Pair.class, TupleWritable.class, input, output, p1, p2);
    }

    public static <V1, V2, V3> WritableType<Tuple3<V1, V2, V3>, TupleWritable> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
        TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE3, (WritableType)p1, (WritableType)p2, (WritableType)p3);
        TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3);
        return new WritableType<Tuple, TupleWritable>(Tuple3.class, TupleWritable.class, input, output, p1, p2, p3);
    }

    public static <V1, V2, V3, V4> WritableType<Tuple4<V1, V2, V3, V4>, TupleWritable> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) {
        TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE4, (WritableType)p1, (WritableType)p2, (WritableType)p3, (WritableType)p4);
        TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3, p4);
        return new WritableType<Tuple, TupleWritable>(Tuple4.class, TupleWritable.class, input, output, p1, p2, p3, p4);
    }

    public static WritableType<TupleN, TupleWritable> tuples(PType ... ptypes) {
        WritableType[] wt = new WritableType[ptypes.length];
        for (int i = 0; i < wt.length; ++i) {
            wt[i] = (WritableType)ptypes[i];
        }
        TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLEN, wt);
        TupleTWMapFn output = new TupleTWMapFn(ptypes);
        return new WritableType<Tuple, TupleWritable>(TupleN.class, TupleWritable.class, input, output, ptypes);
    }

    public static <T extends Tuple> PType<T> tuples(Class<T> clazz, PType ... ptypes) {
        Class[] typeArgs = new Class[ptypes.length];
        WritableType[] wt = new WritableType[ptypes.length];
        for (int i = 0; i < typeArgs.length; ++i) {
            typeArgs[i] = ptypes[i].getTypeClass();
            wt[i] = (WritableType)ptypes[i];
        }
        TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
        TWTupleMapFn input = new TWTupleMapFn(factory, wt);
        TupleTWMapFn output = new TupleTWMapFn(ptypes);
        return new WritableType<Tuple, TupleWritable>(clazz, TupleWritable.class, input, output, ptypes);
    }

    public static PType<Union> unionOf(PType<?> ... ptypes) {
        WritableType[] wt = new WritableType[ptypes.length];
        for (int i = 0; i < wt.length; ++i) {
            wt[i] = (WritableType)ptypes[i];
        }
        UWInputFn input = new UWInputFn(wt);
        UWOutputFn output = new UWOutputFn(ptypes);
        return new WritableType<Union, UnionWritable>(Union.class, UnionWritable.class, input, output, ptypes);
    }

    public static <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
        WritableType wt = (WritableType)base;
        CompositeMapFn input = new CompositeMapFn(wt.getInputMapFn(), inputFn);
        CompositeMapFn output = new CompositeMapFn(outputFn, wt.getOutputMapFn());
        return new WritableType(clazz, wt.getSerializationClass(), input, output, base.getSubTypes().toArray(new PType[0]));
    }

    public static <S, T> PType<T> derivedImmutable(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
        WritableType wt = (WritableType)base;
        CompositeMapFn input = new CompositeMapFn(wt.getInputMapFn(), inputFn);
        CompositeMapFn output = new CompositeMapFn(outputFn, wt.getOutputMapFn());
        return WritableType.immutableType(clazz, wt.getSerializationClass(), input, output, base.getSubTypes().toArray(new PType[0]));
    }

    public static <T> WritableType<Collection<T>, GenericArrayWritable> collections(PType<T> ptype) {
        WritableType wt = (WritableType)ptype;
        return new WritableType<Collection<T>, GenericArrayWritable>(Collection.class, GenericArrayWritable.class, new ArrayCollectionMapFn(wt.getSerializationClass(), wt.getInputMapFn()), new CollectionArrayMapFn(wt.getOutputMapFn()), ptype);
    }

    public static <T> WritableType<Map<String, T>, MapWritable> maps(PType<T> ptype) {
        WritableType wt = (WritableType)ptype;
        return new WritableType<Map, TextMapWritable>(Map.class, TextMapWritable.class, new MapInputMapFn(wt.getSerializationClass(), wt.getInputMapFn()), new MapOutputMapFn(wt.getOutputMapFn()), ptype);
    }

    public static <T> PType<T> jsons(Class<T> clazz) {
        return PTypes.jsonString(clazz, WritableTypeFamily.getInstance());
    }

    private Writables() {
    }

    private static class MapOutputMapFn<T>
    extends MapFn<Map<String, T>, TextMapWritable> {
        private final MapFn<T, Writable> mapFn;

        public MapOutputMapFn(MapFn<T, Writable> mapFn) {
            this.mapFn = mapFn;
        }

        @Override
        public void configure(Configuration conf) {
            this.mapFn.configure(conf);
        }

        @Override
        public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
            this.mapFn.setContext(context);
        }

        @Override
        public void initialize() {
            this.mapFn.initialize();
        }

        @Override
        public TextMapWritable map(Map<String, T> input) {
            TextMapWritable tmw = new TextMapWritable();
            for (Map.Entry<String, T> e : input.entrySet()) {
                Writable w = this.mapFn.map(e.getValue());
                tmw.put(new Text(e.getKey()), Writables.asBytesWritable(w));
            }
            return tmw;
        }
    }

    private static class MapInputMapFn<T>
    extends MapFn<TextMapWritable, Map<String, T>> {
        private final Class<Writable> clazz;
        private final MapFn<Writable, T> mapFn;

        public MapInputMapFn(Class<Writable> clazz, MapFn<Writable, T> mapFn) {
            this.clazz = clazz;
            this.mapFn = mapFn;
        }

        @Override
        public void configure(Configuration conf) {
            this.mapFn.configure(conf);
        }

        @Override
        public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
            this.mapFn.setContext(context);
        }

        @Override
        public void initialize() {
            this.mapFn.initialize();
        }

        @Override
        public Map<String, T> map(TextMapWritable input) {
            HashMap out = Maps.newHashMap();
            for (Map.Entry<Text, BytesWritable> e : input.entrySet()) {
                Writable v = Writables.create(this.clazz, (Writable)e.getValue());
                out.put(e.getKey().toString(), this.mapFn.map(v));
            }
            return out;
        }
    }

    private static class CollectionArrayMapFn<T>
    extends MapFn<Collection<T>, GenericArrayWritable> {
        private final MapFn<T, Object> mapFn;

        public CollectionArrayMapFn(MapFn<T, Object> mapFn) {
            this.mapFn = mapFn;
        }

        @Override
        public void configure(Configuration conf) {
            this.mapFn.configure(conf);
        }

        @Override
        public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
            this.mapFn.setContext(context);
        }

        @Override
        public void initialize() {
            this.mapFn.initialize();
        }

        @Override
        public GenericArrayWritable map(Collection<T> input) {
            GenericArrayWritable arrayWritable = new GenericArrayWritable();
            BytesWritable[] w = new BytesWritable[input.size()];
            int index = 0;
            for (T in : input) {
                w[index++] = Writables.asBytesWritable((Writable)this.mapFn.map(in));
            }
            arrayWritable.set(w);
            return arrayWritable;
        }
    }

    private static class ArrayCollectionMapFn<T>
    extends MapFn<GenericArrayWritable, Collection<T>> {
        private Class<Writable> clazz;
        private final MapFn<Object, T> mapFn;

        public ArrayCollectionMapFn(Class<Writable> clazz, MapFn<Object, T> mapFn) {
            this.clazz = clazz;
            this.mapFn = mapFn;
        }

        @Override
        public void configure(Configuration conf) {
            this.mapFn.configure(conf);
        }

        @Override
        public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
            this.mapFn.setContext(context);
        }

        @Override
        public void initialize() {
            this.mapFn.initialize();
        }

        @Override
        public Collection<T> map(GenericArrayWritable input) {
            ArrayList collection = Lists.newArrayList();
            for (BytesWritable raw : input.get()) {
                Writable w = Writables.create(this.clazz, (Writable)raw);
                collection.add(this.mapFn.map(w));
            }
            return collection;
        }
    }

    private static class UWOutputFn
    extends MapFn<Union, UnionWritable> {
        private final List<MapFn> fns = Lists.newArrayList();

        public UWOutputFn(PType<?> ... ptypes) {
            for (PType<?> ptype : ptypes) {
                this.fns.add(ptype.getOutputMapFn());
            }
        }

        @Override
        public void configure(Configuration conf) {
            for (MapFn fn : this.fns) {
                fn.configure(conf);
            }
        }

        @Override
        public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
            for (MapFn fn : this.fns) {
                fn.setContext(context);
            }
        }

        @Override
        public void initialize() {
            for (MapFn fn : this.fns) {
                fn.initialize();
            }
        }

        @Override
        public UnionWritable map(Union input) {
            int index = input.getIndex();
            Writable w = (Writable)this.fns.get(index).map(input.getValue());
            return new UnionWritable(index, Writables.asBytesWritable(w));
        }
    }

    private static class UWInputFn
    extends MapFn<UnionWritable, Union> {
        private final List<MapFn> fns = Lists.newArrayList();
        private final List<Class<Writable>> writableClasses = Lists.newArrayList();

        public UWInputFn(WritableType<?, ?> ... ptypes) {
            for (WritableType<?, ?> ptype : ptypes) {
                this.fns.add(ptype.getInputMapFn());
                this.writableClasses.add(ptype.getSerializationClass());
            }
        }

        @Override
        public void configure(Configuration conf) {
            for (MapFn fn : this.fns) {
                fn.configure(conf);
            }
        }

        @Override
        public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
            for (MapFn fn : this.fns) {
                fn.setContext(context);
            }
        }

        @Override
        public void initialize() {
            for (MapFn fn : this.fns) {
                fn.initialize();
            }
        }

        @Override
        public Union map(UnionWritable in) {
            int index = in.getIndex();
            Writable w = Writables.create(this.writableClasses.get(index), (Writable)in.getValue());
            return new Union(index, this.fns.get(index).map(w));
        }
    }

    private static class TupleTWMapFn
    extends MapFn<Tuple, TupleWritable> {
        private final List<MapFn> fns = Lists.newArrayList();
        private transient int[] written;
        private transient Writable[] values;

        public TupleTWMapFn(PType<?> ... ptypes) {
            for (PType<?> ptype : ptypes) {
                this.fns.add(ptype.getOutputMapFn());
            }
            this.written = new int[this.fns.size()];
            this.values = new Writable[this.fns.size()];
        }

        @Override
        public void configure(Configuration conf) {
            try {
                Writables.serializeWritableComparableCodes(conf);
            }
            catch (IOException e) {
                throw new CrunchRuntimeException("Error serializing writable comparable codes", e);
            }
            for (MapFn fn : this.fns) {
                fn.configure(conf);
            }
        }

        @Override
        public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
            for (MapFn fn : this.fns) {
                fn.setContext(context);
            }
        }

        @Override
        public void initialize() {
            for (MapFn fn : this.fns) {
                fn.initialize();
            }
            this.written = new int[this.fns.size()];
            this.values = new Writable[this.fns.size()];
        }

        @Override
        public TupleWritable map(Tuple input) {
            Arrays.fill(this.written, 0);
            Arrays.fill(this.values, null);
            for (int i = 0; i < input.size(); ++i) {
                Object value = input.get(i);
                if (value == null) continue;
                Writable w = (Writable)this.fns.get(i).map(value);
                if (WRITABLE_CODES.inverse().containsKey(w.getClass())) {
                    this.values[i] = w;
                    this.written[i] = (Integer)WRITABLE_CODES.inverse().get(w.getClass());
                    continue;
                }
                this.values[i] = Writables.asBytesWritable(w);
                this.written[i] = 1;
            }
            return new TupleWritable(this.values, this.written);
        }
    }

    private static class TWTupleMapFn
    extends MapFn<TupleWritable, Tuple> {
        private final TupleFactory<?> tupleFactory;
        private final List<MapFn> fns;
        private final List<Class<Writable>> writableClasses;
        private transient Object[] values;

        public TWTupleMapFn(TupleFactory<?> tupleFactory, WritableType<?, ?> ... ptypes) {
            this.tupleFactory = tupleFactory;
            this.fns = Lists.newArrayList();
            this.writableClasses = Lists.newArrayList();
            for (WritableType<?, ?> ptype : ptypes) {
                this.fns.add(ptype.getInputMapFn());
                Class<?> clazz = ptype.getSerializationClass();
                if (WritableComparable.class.isAssignableFrom(clazz) && !WRITABLE_CODES.inverse().containsKey(clazz)) {
                    LOG.warn((Object)String.format("WritableComparable class %s in tuple type should be registered with Writables.registerComparable", clazz.toString()));
                }
                this.writableClasses.add(clazz);
            }
        }

        @Override
        public void configure(Configuration conf) {
            try {
                Writables.serializeWritableComparableCodes(conf);
            }
            catch (IOException e) {
                throw new CrunchRuntimeException("Error serializing writable comparable codes", e);
            }
            for (MapFn fn : this.fns) {
                fn.configure(conf);
            }
        }

        @Override
        public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
            for (MapFn fn : this.fns) {
                fn.setContext(context);
            }
        }

        @Override
        public void initialize() {
            for (MapFn fn : this.fns) {
                fn.initialize();
            }
            this.values = new Object[this.fns.size()];
            this.tupleFactory.initialize();
        }

        @Override
        public Tuple map(TupleWritable in) {
            for (int i = 0; i < this.values.length; ++i) {
                if (in.has(i)) {
                    Writable w = Writables.create(this.writableClasses.get(i), in.get(i));
                    this.values[i] = this.fns.get(i).map(w);
                    continue;
                }
                this.values[i] = null;
            }
            return this.tupleFactory.makeTuple(this.values);
        }
    }
}

