Hadoop之JobTrack分析_hadoop分析
Hadoop之JobTrack分析由刀豆文库小编整理,希望给你工作、学习、生活带来方便,猜你可能喜欢“hadoop分析”。
Hadoop之JobTrack分析
1.client端指定Job的各种参数配置之后调用job.waitForCompletion(true)方法提交Job给JobTracker,等待Job 完成。
[java] view plaincopyprint?
1.public void submit()throws IOException, InterruptedException, 2.ClaNotFoundException { 3.ensureState(JobState.DEFINE);//检查JobState状态
4.setUseNewAPI();//检查及设置是否使用新的MapReduce API
5.6.// Connect to the JobTracker and submit the job
7.connect();//链接JobTracker
8.info = jobClient.submitJobInternal(conf);//将job信息提交
9.super.setJobID(info.getID());
10.state = JobState.RUNNING;//更改job状态
11.}
以上代码主要有两步骤,连接JobTracker并提交Job信息。connect方法主要是实例化JobClient对象,包括设置JobConf和init工作:
[java] view plaincopyprint?
1.public void init(JobConf conf)throws IOException {
2.String tracker = conf.get(“mapred.job.tracker”, “local”);//读取配置文件信息用于判断该Job是运行于本地单机模式还是分布式模式
3.tasklogtimeout = conf.getInt(4.TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);5.this.ugi = UserGroupInformation.getCurrentUser();
6.if(“local”.equals(tracker)){//如果是单机模式,new LocalJobRunner
7.conf.setNumMapTasks(1);
8.this.jobSubmitClient = new LocalJobRunner(conf);9.} else {
10.this.jobSubmitClient = createRPCProxy(JobTracker.getAddre(conf), conf);
11.} 12.}
分布式模式下就会创建一个RPC代理链接:
[java] view plaincopyprint?
1.public static VersionedProtocol getProxy(2.Cla extends VersionedProtocol> protocol,3.long clientVersion, InetSocketAddre addr, UserGroupInformation ticket,4.Configuration conf, SocketFactory factory, int rpcTimeout)throws IOException { 5.6.if(UserGroupInformation.isSecurityEnabled()){ 7.SaslRpcServer.init(conf);8.}
9.VersionedProtocol proxy =
10.(VersionedProtocol)Proxy.newProxyInstance(11.protocol.getClaLoader(), new Cla[] { protocol },12.new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
13.long serverVersion = proxy.getProtocolVersion(protocol.getName(), 14.clientVersion);15.if(serverVersion == clientVersion){ 16.return proxy;17.} else {
18.throw new VersionMismatch(protocol.getName(), clientVersion, 19.serverVersion);20.} 21.}
从上述代码可以看出hadoop实际上使用了Java自带的Proxy API来实现Remote Procedure Call 初始完之后,需要提交job [java] view plaincopyprint?
1.info = jobClient.submitJobInternal(conf);//将job信息提交
submit方法做以下几件事情:
1.将conf中目录名字替换成hdfs代理的名字
2.检查output是否合法:比如路径是否已经存在,是否是明确的3.将数据分成多个split并放到hdfs上面,写入job.xml文件
4.调用JobTracker的submitJob方法
该方法主要新建JobInProgre对象,然后检查访问权限和系统参数是否满足job,最后addJob:
[java] view plaincopyprint?
1.private synchronized JobStatus addJob(JobID jobId, JobInProgre job)2.throws IOException { 3.totalSubmiions++;4.5.synchronized(jobs){
6.synchronized(taskScheduler){
7.jobs.put(job.getProfile().getJobID(), job);
8.for(JobInProgreListener listener : jobInProgreListeners){ 9.listener.jobAdded(job);10.} 11.} 12.}
13.myInstrumentation.submitJob(job.getJobConf(), jobId);14.job.getQueueMetrics().submitJob(job.getJobConf(), jobId);15.16.LOG.info(“Job ” + jobId + “ added succefully for user '”
17.+ job.getJobConf().getUser()+ “' to queue '”
18.+ job.getJobConf().getQueueName()+ “'”);19.AuditLogger.logSucce(job.getUser(),20.Operation.SUBMIT_JOB.name(), jobId.toString());21.return job.getStatus();22.}
totalSubmiions记录client端提交job到JobTracker的次数。而jobs则是JobTracker所有可以管理的job的映射表
Map jobs = Collections.synchronizedMap(new TreeMap());taskScheduler是用于调度job先后执行策略的,其类图如下所示:
hadoop job调度机制; public enum SchedulingMode { FAIR, FIFO } 1.公平调度FairScheduler 对于每个用户而言,分布式资源是公平分配的,每个用户都有一个job池,假若某个用户目前所占有的资源很多,对于其他用户而言是不公平的,那么调度器就会杀掉占有资源多的用户的一些task,释放资源供他人使用 2.容量调度JobQueueTaskScheduler 在分布式系统上维护多个队列,每个队列都有一定的容量,每个队列中的job按照FIFO的策略进行调度。队列中可以包含队列。
两个Scheduler都要实现TaskScheduler的public synchronized List aignTasks(TaskTracker tracker)方法,该方法通过具体的计算生成可以分配的task
接下来看看JobTracker的工作: 记录更新JobTracker重试的次数:
[java] view plaincopyprint?
1.while(true){ 2.try {
3.recoveryManager.updateRestartCount();4.break;
5.} catch(IOException ioe){
6.LOG.warn(“Failed to initialize recovery manager.”, ioe);7.// wait for some time
8.Thread.sleep(FS_ACCESS_RETRY_PERIOD);9.LOG.warn(“Retrying...”);10.} 11.}
启动Job调度器,默认是FairScheduler: taskScheduler.start();主要是初始化一些管理对象,比如job pool管理池
[java] view plaincopyprint?
1.// Initialize other pieces of the scheduler
2.jobInitializer = new JobInitializer(conf, taskTrackerManager);3.taskTrackerManager.addJobInProgreListener(jobListener);4.poolMgr = new PoolManager(this);5.poolMgr.initialize();
6.loadMgr =(LoadManager)ReflectionUtils.newInstance(7.conf.getCla(“mapred.fairscheduler.loadmanager”, 8.CapBasedLoadManager.cla, LoadManager.cla), conf);9.loadMgr.setTaskTrackerManager(taskTrackerManager);10.loadMgr.setEventLog(eventLog);11.loadMgr.start();
12.taskSelector =(TaskSelector)ReflectionUtils.newInstance(13.conf.getCla(“mapred.fairscheduler.taskselector”, 14.DefaultTaskSelector.cla, TaskSelector.cla), conf);15.taskSelector.setTaskTrackerManager(taskTrackerManager);16.taskSelector.start();
[java] view plaincopyprint?
1.JobInitializer有一个确定大小的ExecutorService threadPool,每个thread用于初始化job
[java] view plaincopyprint?
1.try {
2.JobStatus prevStatus =(JobStatus)job.getStatus().clone();3.LOG.info(“Initializing ” + job.getJobID());4.job.initTasks();
5.// Inform the listeners if the job state has changed 6.// Note : that the job will be in PREP state.7.JobStatus newStatus =(JobStatus)job.getStatus().clone();8.if(prevStatus.getRunState()!= newStatus.getRunState()){ 9.JobStatusChangeEvent event =
10.new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,11.newStatus);
12.synchronized(JobTracker.this){ 13.updateJobInProgreListeners(event);14.} 15.} 16.}
初始化操作主要用于初始化生成tasks然后通知其他的监听者执行其他操作。initTasks主要处理以下工作:
[java] view plaincopyprint?
1.// 记录用户提交的运行的job信息
2.try {
3.userUGI.doAs(new PrivilegedExceptionAction(){ 4.@Override
5.public Object run()throws Exception {
6.JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile, 7.startTimeFinal, hasRestarted());8.return null;9.} 10.});
11.} catch(InterruptedException ie){ 12.throw new IOException(ie);13.} 14.15.// 设置并记录job的优先级
16.setPriority(this.priority);17.18.//
19.//生成每个Task需要的密钥
20.//
21.generateAndStoreTokens();22.然后读取JobTracker split的数据的元信息,元信息包括以下属性信息:
[java] view plaincopyprint?
1.private TaskSplitIndex splitIndex;//洗牌后的索引位置
2.private long inputDataLength;//洗牌后数据长度 3.private String[] locations;//数据存储位置
然后根据元信息的长度来计算numMapTasks并校验数据存储地址是否可以连接 接下来生成map tasks和reducer tasks:
[java] view plaincopyprint?
1.maps = new TaskInProgre[numMapTasks];2.for(int i=0;i
3.inputLength += splits[i].getInputDataLength();4.maps[i] = new TaskInProgre(jobId, jobFile, 5.splits[i],6.jobtracker, conf, this, i, numSlotsPerMap);
7.}
[java] view plaincopyprint?
1.this.jobFile = jobFile;2.this.splitInfo = split;3.this.jobtracker = jobtracker;4.this.job = job;5.this.conf = conf;
6.this.partition = partition;
7.this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);8.this.numSlotsRequired = numSlotsRequired;9.setMaxTaskAttempts();10.init(jobid);
以上除了task对应的jobTracker,split信息和job信息外,还设置了
[java] view plaincopyprint?
1.maxSkipRecords---记录task执行的时候最大可以跳过的错误记录数;
2.
setMaxTaskAttempts--设置task最多可以执行的次数。当一个task执行两次都失败了之后,会以skip mode模式再重新执行一次,记录那些bad record,3.然后第四次再执行的时候,跳过这些bad records 4.
5.新建reducer task的过程也很类似。
6.7.
8.
9.10.
11.12.
13.14.
15.