/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.impl.mr;

import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CachingOptions;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.PCollection;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.dist.DistributedPipeline;
import org.apache.crunch.impl.dist.collect.PCollectionImpl;
import org.apache.crunch.impl.mr.MRPipelineExecution;
import org.apache.crunch.impl.mr.collect.MRCollectionFactory;
import org.apache.crunch.impl.mr.exec.MRExecutor;
import org.apache.crunch.impl.mr.plan.MSCRPlanner;
import org.apache.crunch.io.ReadableSource;
import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class MRPipeline
extends DistributedPipeline {
    private static final Log LOG = LogFactory.getLog(MRPipeline.class);
    private final Class<?> jarClass;

    public MRPipeline(Class<?> jarClass) {
        this(jarClass, new Configuration());
    }

    public MRPipeline(Class<?> jarClass, String name) {
        this(jarClass, name, new Configuration());
    }

    public MRPipeline(Class<?> jarClass, Configuration conf) {
        this(jarClass, jarClass.getName(), conf);
    }

    public MRPipeline(Class<?> jarClass, String name, Configuration conf) {
        super(name, conf, new MRCollectionFactory());
        this.jarClass = jarClass;
    }

    public MRExecutor plan() {
        HashMap toMaterialize = Maps.newHashMap();
        for (PCollectionImpl c : this.outputTargets.keySet()) {
            if (!this.outputTargetsToMaterialize.containsKey(c)) continue;
            toMaterialize.put(c, this.outputTargetsToMaterialize.get(c));
            this.outputTargetsToMaterialize.remove(c);
        }
        MSCRPlanner planner = new MSCRPlanner(this, this.outputTargets, toMaterialize, this.allPipelineCallables);
        try {
            return planner.plan(this.jarClass, this.getConfiguration());
        }
        catch (IOException e) {
            throw new CrunchRuntimeException(e);
        }
    }

    @Override
    public PipelineResult run() {
        try {
            MRPipelineExecution pipelineExecution = this.runAsync();
            pipelineExecution.waitUntilDone();
            return pipelineExecution.getResult();
        }
        catch (InterruptedException e) {
            LOG.error((Object)"Exception running pipeline", (Throwable)e);
            return PipelineResult.EMPTY;
        }
    }

    @Override
    public MRPipelineExecution runAsync() {
        MRExecutor mrExecutor = this.plan();
        this.writePlanDotFile(mrExecutor.getPlanDotFile());
        MRPipelineExecution res = mrExecutor.execute();
        this.outputTargets.clear();
        return res;
    }

    @Override
    public <T> Iterable<T> materialize(PCollection<T> pcollection) {
        ((PCollectionImpl)pcollection).setBreakpoint();
        ReadableSource<T> readableSrc = this.getMaterializeSourceTarget(pcollection);
        MaterializableIterable<T> c = new MaterializableIterable<T>(this, readableSrc);
        if (!this.outputTargetsToMaterialize.containsKey(pcollection)) {
            this.outputTargetsToMaterialize.put((PCollectionImpl)pcollection, c);
        }
        return c;
    }

    @Override
    public <T> void cache(PCollection<T> pcollection, CachingOptions options) {
        this.materialize(pcollection);
    }

    private void writePlanDotFile(String dotFileContents) {
        String dotFileDir = this.getConfiguration().get("crunch.planner.dotfile.outputdir");
        if (dotFileDir != null) {
            FSDataOutputStream outputStream = null;
            Exception thrownException = null;
            try {
                URI uri = new URI(dotFileDir);
                FileSystem fs = FileSystem.get((URI)uri, (Configuration)this.getConfiguration());
                SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd_HH.mm.ss.SSS");
                String filenameSuffix = String.format("_%s_jobplan.dot", dateFormat.format(new Date()));
                String encodedName = URLEncoder.encode(this.getName(), "UTF-8");
                int maxPipeNameLength = 150;
                String filenamePrefix = encodedName.substring(0, Math.min(150, encodedName.length()));
                Path jobPlanPath = new Path(uri.getPath(), filenamePrefix + filenameSuffix);
                LOG.info((Object)("Writing jobplan to " + jobPlanPath));
                outputStream = fs.create(jobPlanPath, true);
                outputStream.write(dotFileContents.getBytes(Charsets.UTF_8));
            }
            catch (URISyntaxException e) {
                thrownException = e;
                throw new CrunchRuntimeException("Invalid dot file dir URI, job plan will not be written: " + dotFileDir, e);
            }
            catch (IOException e) {
                thrownException = e;
                throw new CrunchRuntimeException("Error writing dotfile contents to " + dotFileDir, e);
            }
            catch (RuntimeException e) {
                thrownException = e;
                throw e;
            }
            finally {
                block14: {
                    if (outputStream != null) {
                        try {
                            outputStream.close();
                        }
                        catch (IOException e) {
                            if (thrownException != null) break block14;
                            throw new CrunchRuntimeException("Error closing dotfile", e);
                        }
                    }
                }
            }
        }
    }
}

