/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.lib.join;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.io.IOException;
import java.util.Collection;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.MapFn;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.ReadableData;
import org.apache.crunch.lib.join.JoinStrategy;
import org.apache.crunch.lib.join.JoinType;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.hadoop.conf.Configuration;

public class MapsideJoinStrategy<K, U, V>
implements JoinStrategy<K, U, V> {
    private boolean materialize;

    @Deprecated
    public MapsideJoinStrategy() {
        this(true);
    }

    @Deprecated
    public MapsideJoinStrategy(boolean materialize) {
        this.materialize = materialize;
    }

    public static <K, U, V> MapsideJoinStrategy<K, U, V> create() {
        return MapsideJoinStrategy.create(true);
    }

    public static <K, U, V> MapsideJoinStrategy<K, U, V> create(boolean materialize) {
        return new LoadLeftSideMapsideJoinStrategy(materialize);
    }

    @Override
    public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) {
        switch (joinType) {
            case INNER_JOIN: {
                return this.joinInternal(left, right, false);
            }
            case LEFT_OUTER_JOIN: {
                return this.joinInternal(left, right, true);
            }
        }
        throw new UnsupportedOperationException("Join type " + (Object)((Object)joinType) + " not supported by MapsideJoinStrategy");
    }

    private PTable<K, Pair<U, V>> joinInternal(PTable<K, U> left, PTable<K, V> right, boolean includeUnmatchedLeftValues) {
        PTypeFamily tf = left.getTypeFamily();
        ReadableData rightReadable = right.asReadable(this.materialize);
        MapsideJoinDoFn mapJoinDoFn = new MapsideJoinDoFn(rightReadable, right.getPTableType(), includeUnmatchedLeftValues);
        ParallelDoOptions options = ParallelDoOptions.builder().sourceTargets(rightReadable.getSourceTargets()).build();
        return left.parallelDo("mapjoin", mapJoinDoFn, (PTableType<K, U>)tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())), options);
    }

    private static class ReversePairOrderFn<V, U>
    extends MapFn<Pair<V, U>, Pair<U, V>> {
        private ReversePairOrderFn() {
        }

        @Override
        public Pair<U, V> map(Pair<V, U> input) {
            return Pair.of(input.second(), input.first());
        }
    }

    private static class LoadLeftSideMapsideJoinStrategy<K, U, V>
    extends MapsideJoinStrategy<K, U, V> {
        private MapsideJoinStrategy<K, V, U> mapsideJoinStrategy;

        public LoadLeftSideMapsideJoinStrategy(boolean materialize) {
            this.mapsideJoinStrategy = new MapsideJoinStrategy(materialize);
        }

        @Override
        public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) {
            JoinType reversedJoinType;
            switch (joinType) {
                case INNER_JOIN: {
                    reversedJoinType = JoinType.INNER_JOIN;
                    break;
                }
                case RIGHT_OUTER_JOIN: {
                    reversedJoinType = JoinType.LEFT_OUTER_JOIN;
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Join type " + (Object)((Object)joinType) + " is not supported");
                }
            }
            return this.mapsideJoinStrategy.join(right, left, reversedJoinType).mapValues("Reverse order out output table values", new ReversePairOrderFn(), left.getTypeFamily().pairs(left.getValueType(), right.getValueType()));
        }
    }

    static class MapsideJoinDoFn<K, U, V>
    extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
        private final ReadableData<Pair<K, V>> readable;
        private final PTableType<K, V> tableType;
        private final boolean includeUnmatched;
        private Multimap<K, V> joinMap;

        public MapsideJoinDoFn(ReadableData<Pair<K, V>> rs, PTableType<K, V> tableType, boolean includeUnmatched) {
            this.readable = rs;
            this.tableType = tableType;
            this.includeUnmatched = includeUnmatched;
        }

        @Override
        public void configure(Configuration conf) {
            this.readable.configure(conf);
        }

        @Override
        public void initialize() {
            super.initialize();
            this.tableType.initialize(this.getConfiguration());
            this.joinMap = ArrayListMultimap.create();
            try {
                for (Pair<K, V> joinPair : this.readable.read(this.getContext())) {
                    Pair<K, V> detachedPair = this.tableType.getDetachedValue(joinPair);
                    this.joinMap.put(detachedPair.first(), detachedPair.second());
                }
            }
            catch (IOException e) {
                throw new CrunchRuntimeException("Error reading map-side join data", e);
            }
        }

        @Override
        public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
            K key = input.first();
            U value = input.second();
            Collection joinValues = this.joinMap.get(key);
            if (this.includeUnmatched && joinValues.isEmpty()) {
                emitter.emit(Pair.of(key, Pair.of(value, null)));
            } else {
                for (Object joinValue : joinValues) {
                    Pair valuePair = Pair.of(value, joinValue);
                    emitter.emit(Pair.of(key, valuePair));
                }
            }
        }
    }
}

