/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.impl.mr.exec;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.PipelineCallable;
import org.apache.crunch.PipelineExecution;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
import org.apache.crunch.impl.dist.collect.PCollectionImpl;
import org.apache.crunch.impl.mr.MRJob;
import org.apache.crunch.impl.mr.MRPipelineExecution;
import org.apache.crunch.impl.mr.exec.CappedExponentialCounter;
import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.hadoop.conf.Configuration;

public class MRExecutor
extends AbstractFuture<PipelineResult>
implements MRPipelineExecution {
    private static final Log LOG = LogFactory.getLog(MRExecutor.class);
    private final CrunchJobControl control;
    private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
    private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
    private final CountDownLatch doneSignal = new CountDownLatch(1);
    private final CountDownLatch killSignal = new CountDownLatch(1);
    private final CappedExponentialCounter pollInterval;
    private AtomicReference<PipelineExecution.Status> status = new AtomicReference<PipelineExecution.Status>(PipelineExecution.Status.READY);
    private PipelineResult result;
    private Thread monitorThread;
    private boolean started;
    private String planDotFile;

    public MRExecutor(Configuration conf, Class<?> jarClass, Map<PCollectionImpl<?>, Set<Target>> outputTargets, Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize, Map<PipelineCallable<?>, Set<Target>> pipelineCallables) {
        this.control = new CrunchJobControl(conf, jarClass.toString(), pipelineCallables);
        this.outputTargets = outputTargets;
        this.toMaterialize = toMaterialize;
        this.monitorThread = new Thread(new Runnable(){

            @Override
            public void run() {
                MRExecutor.this.monitorLoop();
            }
        });
        this.pollInterval = MRExecutor.isLocalMode() ? new CappedExponentialCounter(50L, 1000L) : new CappedExponentialCounter(500L, 10000L);
    }

    public void addJob(CrunchControlledJob job) {
        this.control.addJob(job);
    }

    public void setPlanDotFile(String planDotFile) {
        this.planDotFile = planDotFile;
    }

    public synchronized MRPipelineExecution execute() {
        if (!this.started) {
            this.monitorThread.start();
            this.started = true;
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void monitorLoop() {
        this.status.set(PipelineExecution.Status.RUNNING);
        try {
            List<PipelineCallable<?>> failedCallables;
            while (this.killSignal.getCount() > 0L && !this.control.allFinished() && !this.control.anyFailures()) {
                this.control.pollJobStatusAndStartNewOnes();
                this.killSignal.await(this.pollInterval.get(), TimeUnit.MILLISECONDS);
            }
            this.control.killAllRunningJobs();
            List<CrunchControlledJob> failures = this.control.getFailedJobList();
            if (!failures.isEmpty()) {
                System.err.println(failures.size() + " job failure(s) occurred:");
                for (CrunchControlledJob job : failures) {
                    System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage());
                }
            }
            if (!(failedCallables = this.control.getFailedCallables()).isEmpty()) {
                System.err.println(failedCallables.size() + " callable failure(s) occurred:");
                for (PipelineCallable<?> c : failedCallables) {
                    System.err.println(c.getName() + ": " + c.getMessage());
                }
            }
            boolean hasFailures = !failures.isEmpty() || !failedCallables.isEmpty();
            ArrayList stages = Lists.newArrayList();
            for (CrunchControlledJob crunchControlledJob : this.control.getSuccessfulJobList()) {
                stages.add(new PipelineResult.StageResult(crunchControlledJob.getJobName(), crunchControlledJob.getMapredJobID().toString(), crunchControlledJob.getCounters(), crunchControlledJob.getStartTimeMsec(), crunchControlledJob.getJobStartTimeMsec(), crunchControlledJob.getJobEndTimeMsec(), crunchControlledJob.getEndTimeMsec()));
            }
            if (!hasFailures) {
                for (PCollectionImpl pCollectionImpl : this.outputTargets.keySet()) {
                    if (this.toMaterialize.containsKey(pCollectionImpl)) {
                        MaterializableIterable iter = this.toMaterialize.get(pCollectionImpl);
                        if (!iter.isSourceTarget()) continue;
                        iter.materialize();
                        pCollectionImpl.materializeAt((SourceTarget)((Object)iter.getSource()));
                        continue;
                    }
                    boolean materialized = false;
                    for (Target t : this.outputTargets.get(pCollectionImpl)) {
                        if (materialized) continue;
                        if (t instanceof SourceTarget) {
                            pCollectionImpl.materializeAt((SourceTarget)t);
                            materialized = true;
                            continue;
                        }
                        SourceTarget st = t.asSourceTarget(pCollectionImpl.getPType());
                        if (st == null) continue;
                        pCollectionImpl.materializeAt(st);
                        materialized = true;
                    }
                }
            }
            MRExecutor mRExecutor = this;
            synchronized (mRExecutor) {
                if (this.killSignal.getCount() == 0L) {
                    this.status.set(PipelineExecution.Status.KILLED);
                } else if (!failures.isEmpty() || !failedCallables.isEmpty()) {
                    this.status.set(PipelineExecution.Status.FAILED);
                } else {
                    this.status.set(PipelineExecution.Status.SUCCEEDED);
                }
                this.result = new PipelineResult(stages, this.status.get());
                this.set(this.result);
            }
        }
        catch (InterruptedException e) {
            throw new AssertionError((Object)e);
        }
        catch (Exception e) {
            LOG.error((Object)"Pipeline failed due to exception", (Throwable)e);
            this.status.set(PipelineExecution.Status.FAILED);
            this.setException(e);
        }
        finally {
            this.doneSignal.countDown();
        }
    }

    @Override
    public String getPlanDotFile() {
        return this.planDotFile;
    }

    @Override
    public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
        this.doneSignal.await(timeout, timeUnit);
    }

    @Override
    public void waitUntilDone() throws InterruptedException {
        this.doneSignal.await();
    }

    public PipelineResult get() throws InterruptedException, ExecutionException {
        if (this.getStatus() == PipelineExecution.Status.READY) {
            this.execute();
        }
        return (PipelineResult)super.get();
    }

    public PipelineResult get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException {
        if (this.getStatus() == PipelineExecution.Status.READY) {
            this.execute();
        }
        return (PipelineResult)super.get(timeout, unit);
    }

    @Override
    public synchronized PipelineExecution.Status getStatus() {
        return this.status.get();
    }

    @Override
    public synchronized PipelineResult getResult() {
        return this.result;
    }

    @Override
    public void kill() throws InterruptedException {
        this.killSignal.countDown();
    }

    protected void interruptTask() {
        try {
            this.kill();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static boolean isLocalMode() {
        Configuration conf = new Configuration();
        String jobTrackerAddress = conf.get("mapreduce.jobtracker.address", conf.get("mapred.job.tracker", "local"));
        return "local".equals(jobTrackerAddress);
    }

    @Override
    public List<MRJob> getJobs() {
        return Lists.transform(this.control.getAllJobs(), (Function)new Function<CrunchControlledJob, MRJob>(){

            public MRJob apply(CrunchControlledJob job) {
                return job;
            }
        });
    }
}

