商务合作:179001057@qq.com

hadoop 作业提交

技术2022-05-12  0


某平台价值19860元的编程课程资料免费领取【点我领取】


 

 

 

作业提交对应上图的第4个步骤。作业提交的实现类是JobSubmitter,其中函数submitJobInternal实现的,它的主要工作是:1)检查输入、输出目录;2)计算输入的文件块;3)准备DistributeCache相关信息。4)拷贝作业的jar文件和配置到文件系统。5)提交作业给JobTracker,并开始监控作业的状态。

 

代码如下:

JobStatus submitJobInternal(Job job, Cluster cluster) { Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, job.getConfiguration()); Configuration conf = job.getConfiguration(); JobID jobId = submitClient.getNewJobID(); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { conf.set("mapreduce.job.dir", submitJobDir.toString()); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(new Path [] {submitJobDir}, conf); copyAndConfigureFiles(job, submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir); conf.setInt("mapred.map.tasks", maps); LOG.info("number of splits:" + maps); // Write job file to submit dir writeConf(conf, submitJobFile); // Now, actually submit the job (using the submit name) populateTokenCache(conf); status = submitClient.submitJob(jobId, submitJobDir.toString(),TokenCache.getTokenStorage()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); jtFs.delete(submitJobDir, true); } }  

 

 

其中,代码:

status = submitClient.submitJob(jobId, submitJobDir.toString(),TokenCache.getTokenStorage());

是通过RPC将相关文件信息传递到JobTracker

现在,JobTracker的哪一部分代码体现了接收信息的呢?这个问题困扰了我很久。JobTracker里的submitJob()函数,该函数中接收提交的任务:创建一个JobInProgress对象,包含作业描述信息(JobProfile)和作业状态信息(JobStatus)。得到作业idJobInProgress后便将这两个对象提交给JobTracker。在submitJob()的最后一句代码:

return addJob(jobId,job)

是作业提交的核心所在。addJob()函数的代码如下:

 

 

synchronized JobStatus addJob(JobID jobId, JobInProgress job) { totalSubmissions++; synchronized (jobs) { synchronized (taskScheduler) { jobs.put(job.getProfile().getJobID(), job); for (JobInProgressListener listener : jobInProgressListeners) { try { listener.jobAdded(job); } catch (IOException ioe) { LOG.warn("Failed to add and so skipping the job : " + job.getJobID() + ". Exception : " + ioe); } } } } myInstrumentation.submitJob(job.getJobConf(), jobId); return job.getStatus(); }  

 

上述代码中的for循环的主要是给jobTracker的各种监听器添加这个job。例如,对于作业调度里会有一个JobInProgressListener来监听提交的作业,进而根据调度算法进行任务分配。

 

 

 

 

 

 

 

 

 

 

 

 

 


最新回复(0)