/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.impl.mr.run;

import java.io.Serializable;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.impl.mr.emit.IntermediateEmitter;
import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter;
import org.apache.crunch.impl.mr.emit.OutputEmitter;
import org.apache.crunch.impl.mr.run.CrunchTaskContext;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;

public class RTNode
implements Serializable {
    private static final Log LOG = LogFactory.getLog(RTNode.class);
    private final String nodeName;
    private DoFn<Object, Object> fn;
    private PType<Object> outputPType;
    private final List<RTNode> children;
    private final Converter inputConverter;
    private final Converter outputConverter;
    private final String outputName;
    private transient Emitter<Object> emitter;

    public RTNode(DoFn<Object, Object> fn, PType<Object> outputPType, String name, List<RTNode> children, Converter inputConverter, Converter outputConverter, String outputName) {
        this.fn = fn;
        this.outputPType = outputPType;
        this.nodeName = name;
        this.children = children;
        this.inputConverter = inputConverter;
        this.outputConverter = outputConverter;
        this.outputName = outputName;
    }

    public void initialize(CrunchTaskContext ctxt) {
        if (this.emitter != null) {
            return;
        }
        this.fn.setContext(ctxt.getContext());
        this.fn.initialize();
        for (RTNode child : this.children) {
            child.initialize(ctxt);
        }
        if (this.outputConverter != null) {
            this.emitter = this.outputName != null ? new MultipleOutputEmitter<Object, Object, Object>(this.outputConverter, ctxt.getMultipleOutputs(), this.outputName) : new OutputEmitter<Object, Object, Object>(this.outputConverter, ctxt.getContext());
        } else if (!this.children.isEmpty()) {
            Configuration conf;
            boolean disableDeepCopy = (conf = ctxt.getContext().getConfiguration()).getBoolean("crunch.disable.deep.copy", false);
            this.emitter = new IntermediateEmitter(this.outputPType, this.children, conf, disableDeepCopy || this.fn.disableDeepCopy());
        } else {
            throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + this.nodeName);
        }
    }

    public boolean isLeafNode() {
        return this.outputConverter != null && this.children.isEmpty();
    }

    public void process(Object input) {
        try {
            this.fn.process(input, this.emitter);
        }
        catch (CrunchRuntimeException e) {
            if (!e.wasLogged()) {
                LOG.info((Object)String.format("Crunch exception in '%s' for input: %s", this.nodeName, input), (Throwable)e);
                e.markLogged();
            }
            throw e;
        }
    }

    public void process(Object key, Object value) {
        this.process(this.inputConverter.convertInput(key, value));
    }

    public void processIterable(Object key, Iterable values) {
        this.process(this.inputConverter.convertIterableInput(key, values));
    }

    public void cleanup() {
        this.fn.cleanup(this.emitter);
        this.emitter.flush();
        for (RTNode child : this.children) {
            child.cleanup();
        }
    }

    public String toString() {
        return "RTNode [nodeName=" + this.nodeName + ", fn=" + this.fn + ", children=" + this.children + ", inputConverter=" + this.inputConverter + ", outputConverter=" + this.outputConverter + ", outputName=" + this.outputName + "]";
    }
}

