/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.lib.join;

import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.FilterFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.ReadableData;
import org.apache.crunch.lib.join.DefaultJoinStrategy;
import org.apache.crunch.lib.join.JoinStrategy;
import org.apache.crunch.lib.join.JoinType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.avro.AvroMode;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.AvroTypeFamily;
import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.types.writable.WritableType;
import org.apache.crunch.types.writable.WritableTypeFamily;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Filter;
import org.apache.hadoop.util.bloom.Key;

public class BloomFilterJoinStrategy<K, U, V>
implements JoinStrategy<K, U, V> {
    private static final Log LOG = LogFactory.getLog(BloomFilterJoinStrategy.class);
    private int vectorSize;
    private int nbHash;
    private JoinStrategy<K, U, V> delegateJoinStrategy;

    public BloomFilterJoinStrategy(int numElements) {
        this(numElements, 0.05f);
    }

    public BloomFilterJoinStrategy(int numElements, float falsePositiveRate) {
        this(numElements, falsePositiveRate, new DefaultJoinStrategy());
    }

    public BloomFilterJoinStrategy(int numElements, float falsePositiveRate, JoinStrategy<K, U, V> delegateJoinStrategy) {
        this.vectorSize = BloomFilterJoinStrategy.getOptimalVectorSize(numElements, falsePositiveRate);
        this.nbHash = BloomFilterJoinStrategy.getOptimalNumHash(numElements, this.vectorSize);
        this.delegateJoinStrategy = delegateJoinStrategy;
    }

    private static int getOptimalVectorSize(int numElements, float falsePositiveRate) {
        return (int)((double)((float)(-numElements) * (float)Math.log(falsePositiveRate)) / Math.pow(Math.log(2.0), 2.0));
    }

    private static int getOptimalNumHash(int numElements, float vectorSize) {
        return (int)Math.round((double)vectorSize * Math.log(2.0) / (double)numElements);
    }

    @Override
    public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) {
        if (joinType != JoinType.INNER_JOIN && joinType != JoinType.LEFT_OUTER_JOIN) {
            throw new IllegalStateException("JoinType " + (Object)((Object)joinType) + " is not supported for BloomFilter joins");
        }
        PType<BloomFilter> bloomFilterType = BloomFilterJoinStrategy.getBloomFilterType(left.getTypeFamily());
        PCollection<BloomFilter> bloomFilters = left.keys().parallelDo("Create bloom filters", new CreateBloomFilterFn<K>(this.vectorSize, this.nbHash, left.getKeyType()), bloomFilterType);
        ReadableData<BloomFilter> bloomData = bloomFilters.asReadable(true);
        FilterKeysWithBloomFilterFn filterKeysFn = new FilterKeysWithBloomFilterFn(bloomData, this.vectorSize, this.nbHash, left.getKeyType());
        ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
        optionsBuilder.sourceTargets(bloomData.getSourceTargets());
        PTable<K, V> filteredRightSide = right.parallelDo("Filter right side with BloomFilters", filterKeysFn, right.getPTableType(), optionsBuilder.build());
        return this.delegateJoinStrategy.join(left, filteredRightSide, joinType);
    }

    private static <K> MapFn<K, byte[]> getKeyToBytesMapFn(PType<K> ptype, Configuration conf) {
        if (ptype instanceof AvroType) {
            return new AvroToBytesFn((AvroType)ptype, conf);
        }
        if (ptype instanceof WritableType) {
            return new WritableToBytesFn((WritableType)ptype, conf);
        }
        throw new IllegalStateException("Unrecognized PType: " + ptype);
    }

    private static PType<BloomFilter> getBloomFilterType(PTypeFamily typeFamily) {
        if (typeFamily.equals(AvroTypeFamily.getInstance())) {
            return Avros.writables(BloomFilter.class);
        }
        if (typeFamily.equals(WritableTypeFamily.getInstance())) {
            return Writables.writables(BloomFilter.class);
        }
        throw new IllegalStateException("Unrecognized PTypeFamily: " + typeFamily);
    }

    private static class AvroToBytesFn<T>
    extends MapFn<T, byte[]> {
        private AvroType<T> ptype;
        private BinaryEncoder encoder;
        private DatumWriter datumWriter;

        AvroToBytesFn(AvroType<T> ptype, Configuration conf) {
            this.ptype = ptype;
            this.datumWriter = AvroMode.fromType(ptype).withFactoryFromConfiguration(conf).getWriter(ptype.getSchema());
        }

        @Override
        public byte[] map(T input) {
            Object datum = this.ptype.getOutputMapFn().map(input);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            this.encoder = EncoderFactory.get().binaryEncoder((OutputStream)byteArrayOutputStream, this.encoder);
            try {
                this.datumWriter.write(datum, (Encoder)this.encoder);
                this.encoder.flush();
            }
            catch (IOException e) {
                throw new CrunchRuntimeException(e);
            }
            return byteArrayOutputStream.toByteArray();
        }
    }

    private static class WritableToBytesFn<T>
    extends MapFn<T, byte[]> {
        private WritableType<T, ?> ptype;
        private DataOutputBuffer dataOutputBuffer;

        WritableToBytesFn(WritableType<T, ?> ptype, Configuration conf) {
            this.ptype = ptype;
            this.dataOutputBuffer = new DataOutputBuffer();
        }

        @Override
        public byte[] map(T input) {
            this.dataOutputBuffer.reset();
            Writable writable = (Writable)this.ptype.getOutputMapFn().map(input);
            try {
                writable.write((DataOutput)this.dataOutputBuffer);
            }
            catch (IOException e) {
                throw new CrunchRuntimeException(e);
            }
            byte[] output = new byte[this.dataOutputBuffer.getLength()];
            System.arraycopy(this.dataOutputBuffer.getData(), 0, output, 0, this.dataOutputBuffer.getLength());
            return output;
        }
    }

    private static class FilterKeysWithBloomFilterFn<K, V>
    extends FilterFn<Pair<K, V>> {
        private int vectorSize;
        private int nbHash;
        private PType<K> keyType;
        private PType<BloomFilter> bloomFilterPType;
        private transient BloomFilter bloomFilter;
        private transient MapFn<K, byte[]> keyToBytesFn;
        private ReadableData<BloomFilter> bloomData;

        FilterKeysWithBloomFilterFn(ReadableData<BloomFilter> bloomData, int vectorSize, int nbHash, PType<K> keyType) {
            this.bloomData = bloomData;
            this.vectorSize = vectorSize;
            this.nbHash = nbHash;
            this.keyType = keyType;
        }

        @Override
        public void configure(Configuration conf) {
            this.bloomData.configure(conf);
        }

        @Override
        public void initialize() {
            Iterable<BloomFilter> iterable;
            super.initialize();
            this.keyType.initialize(this.getConfiguration());
            this.keyToBytesFn = BloomFilterJoinStrategy.getKeyToBytesMapFn(this.keyType, this.getConfiguration());
            try {
                iterable = this.bloomData.read(this.getContext());
            }
            catch (IOException e) {
                throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
            }
            this.bloomFilter = new BloomFilter(this.vectorSize, this.nbHash, 1);
            for (BloomFilter subFilter : iterable) {
                this.bloomFilter.or((Filter)subFilter);
            }
        }

        @Override
        public boolean accept(Pair<K, V> input) {
            Key key = new Key(this.keyToBytesFn.map(input.first()));
            return this.bloomFilter.membershipTest(key);
        }
    }

    private static class CreateBloomFilterFn<K>
    extends DoFn<K, BloomFilter> {
        private int vectorSize;
        private int nbHash;
        private transient BloomFilter bloomFilter;
        private transient MapFn<K, byte[]> keyToBytesFn;
        private PType<K> ptype;

        CreateBloomFilterFn(int vectorSize, int nbHash, PType<K> ptype) {
            this.vectorSize = vectorSize;
            this.nbHash = nbHash;
            this.ptype = ptype;
        }

        @Override
        public void initialize() {
            super.initialize();
            this.bloomFilter = new BloomFilter(this.vectorSize, this.nbHash, 1);
            this.ptype.initialize(this.getConfiguration());
            this.keyToBytesFn = BloomFilterJoinStrategy.getKeyToBytesMapFn(this.ptype, this.getConfiguration());
        }

        @Override
        public void process(K input, Emitter<BloomFilter> emitter) {
            this.bloomFilter.add(new Key(this.keyToBytesFn.map(input)));
        }

        @Override
        public void cleanup(Emitter<BloomFilter> emitter) {
            emitter.emit(this.bloomFilter);
        }
    }
}

