/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;

import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mr.MRJob;
import org.apache.crunch.impl.mr.plan.JobNameBuilder;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;

public class CrunchControlledJob
implements MRJob {
    private static final Log LOG = LogFactory.getLog(CrunchControlledJob.class);
    private final int jobID;
    private final Job job;
    private final JobNameBuilder jobNameBuilder;
    private final Set<Target> allTargets;
    private final List<CrunchControlledJob> dependingJobs;
    private final Hook prepareHook;
    private final Hook completionHook;
    private MRJob.State state;
    private String message;
    private String lastKnownProgress;
    private Counters counters;
    private long preHookStartTimeMsec;
    private long jobStartTimeMsec;
    private long jobEndTimeMsec;
    private long postHookEndTimeMsec;

    public CrunchControlledJob(int jobID, Job job, JobNameBuilder jobNameBuilder, Set<Target> allTargets, Hook prepareHook, Hook completionHook) {
        this.jobID = jobID;
        this.job = job;
        this.jobNameBuilder = jobNameBuilder;
        this.allTargets = allTargets;
        this.dependingJobs = Lists.newArrayList();
        this.prepareHook = prepareHook;
        this.completionHook = completionHook;
        this.state = MRJob.State.WAITING;
        this.message = "just initialized";
    }

    public String toString() {
        StringBuffer sb = new StringBuffer();
        sb.append("job name:\t").append(this.job.getJobName()).append("\n");
        sb.append("job id:\t").append(this.jobID).append("\n");
        sb.append("job state:\t").append((Object)this.state).append("\n");
        sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
        sb.append("job message:\t").append(this.message).append("\n");
        if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
            sb.append("job has no depending job:\t").append("\n");
        } else {
            sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
            for (int i = 0; i < this.dependingJobs.size(); ++i) {
                sb.append("\t depending job ").append(i).append(":\t");
                sb.append(this.dependingJobs.get(i).getJobName()).append("\n");
            }
        }
        return sb.toString();
    }

    public String getJobName() {
        return this.job.getJobName();
    }

    public void setJobSequence(int jobSequence) {
        this.job.setJobName(this.jobNameBuilder.jobSequence(jobSequence).build());
    }

    @Override
    public int getJobID() {
        return this.jobID;
    }

    public JobID getMapredJobID() {
        return this.job.getJobID();
    }

    public long getStartTimeMsec() {
        return this.preHookStartTimeMsec;
    }

    public long getJobStartTimeMsec() {
        return this.jobStartTimeMsec;
    }

    public long getJobEndTimeMsec() {
        return this.jobEndTimeMsec;
    }

    public long getEndTimeMsec() {
        return this.postHookEndTimeMsec;
    }

    public Counters getCounters() {
        return this.counters;
    }

    public Set<Target> getAllTargets() {
        return this.allTargets;
    }

    @Override
    public synchronized Job getJob() {
        return this.job;
    }

    @Override
    public List<MRJob> getDependentJobs() {
        return Lists.transform(this.dependingJobs, (Function)new Function<CrunchControlledJob, MRJob>(){

            public MRJob apply(CrunchControlledJob job) {
                return job;
            }
        });
    }

    @Override
    public synchronized MRJob.State getJobState() {
        return this.state;
    }

    protected synchronized void setJobState(MRJob.State state) {
        this.state = state;
    }

    public synchronized String getMessage() {
        return this.message;
    }

    public synchronized void setMessage(String message) {
        this.message = message;
    }

    public synchronized boolean addDependingJob(CrunchControlledJob dependingJob) {
        if (this.state == MRJob.State.WAITING) {
            return this.dependingJobs.add(dependingJob);
        }
        return false;
    }

    public synchronized boolean isCompleted() {
        return this.state == MRJob.State.FAILED || this.state == MRJob.State.DEPENDENT_FAILED || this.state == MRJob.State.SUCCESS;
    }

    public synchronized boolean isReady() {
        return this.state == MRJob.State.READY;
    }

    public void killJob() throws IOException, InterruptedException {
        this.job.killJob();
    }

    private void checkRunningState() throws IOException, InterruptedException {
        try {
            if (this.job.isComplete()) {
                this.jobEndTimeMsec = System.currentTimeMillis();
                this.counters = this.job.getCounters();
                if (this.job.isSuccessful()) {
                    this.state = MRJob.State.SUCCESS;
                } else {
                    this.state = MRJob.State.FAILED;
                    this.message = "Job failed!";
                }
            } else if (this.job.getConfiguration().getBoolean("crunch.log.job.progress", false)) {
                this.logJobProgress();
            }
        }
        catch (IOException ioe) {
            this.state = MRJob.State.FAILED;
            this.message = StringUtils.stringifyException((Throwable)ioe);
            try {
                if (this.job != null) {
                    this.job.killJob();
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
        if (this.isCompleted()) {
            this.completionHook.run();
            this.postHookEndTimeMsec = System.currentTimeMillis();
        }
    }

    synchronized MRJob.State checkState() throws IOException, InterruptedException {
        MRJob.State s;
        if (this.state == MRJob.State.RUNNING) {
            this.checkRunningState();
        }
        if (this.state != MRJob.State.WAITING) {
            return this.state;
        }
        if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
            this.state = MRJob.State.READY;
            return this.state;
        }
        CrunchControlledJob pred = null;
        int n = this.dependingJobs.size();
        for (int i = 0; i < n && (s = (pred = this.dependingJobs.get(i)).checkState()) != MRJob.State.WAITING && s != MRJob.State.READY && s != MRJob.State.RUNNING; ++i) {
            if (s == MRJob.State.FAILED || s == MRJob.State.DEPENDENT_FAILED) {
                this.state = MRJob.State.DEPENDENT_FAILED;
                this.message = "Depending job with jobID " + pred.getJobID() + " failed.";
                break;
            }
            if (i != n - 1) continue;
            this.state = MRJob.State.READY;
        }
        return this.state;
    }

    protected synchronized void submit() {
        try {
            this.preHookStartTimeMsec = System.currentTimeMillis();
            this.prepareHook.run();
            this.jobStartTimeMsec = System.currentTimeMillis();
            this.job.submit();
            this.state = MRJob.State.RUNNING;
            LOG.info((Object)("Running job \"" + this.getJobName() + "\""));
            LOG.info((Object)("Job status available at: " + this.job.getTrackingURL()));
        }
        catch (Exception ioe) {
            this.state = MRJob.State.FAILED;
            this.message = StringUtils.stringifyException((Throwable)ioe);
            LOG.info((Object)("Error occurred starting job \"" + this.getJobName() + "\":"));
            LOG.info((Object)this.getMessage());
        }
    }

    private void logJobProgress() throws IOException, InterruptedException {
        String progress = String.format("map %.0f%% reduce %.0f%%", 100.0 * (double)this.job.mapProgress(), 100.0 * (double)this.job.reduceProgress());
        if (!Objects.equal((Object)this.lastKnownProgress, (Object)progress)) {
            LOG.info((Object)(this.job.getJobName() + " progress: " + progress));
            this.lastKnownProgress = progress;
        }
    }

    public static interface Hook {
        public void run() throws IOException;
    }
}

