一般概念是,应用程序提交客户端将应用程序提交到YARN ResourceManager(RM)。这可以通过设置YarnClient对象来完成。启动YarnClient后,客户端可以设置应用程序上下文,准备包含ApplicationMaster的应用程序的第一个容器(AM),然后提交申请。您需要提供信息,例如,有关运行应用程序所需的本地文件/ jar的详细信息,需要执行的实际命令(带有必要的命令行参数),任何OS环境设置(可选)等等。实际上,您需要描述需要为ApplicationMaster启动的Unix进程。
然后,YARN ResourceManager将在分配的容器上启动ApplicationMaster(如指定)。ApplicationMaster与YARN群集通信,并处理应用程序执行。它以异步方式执行操作。在应用程序启动期间,ApplicationMaster的主要任务是:a)与ResourceManager进行通信以协商和分配将来的容器资源,以及b)容器分配之后,传递YARN NodeManager(NM)以在其上启动应用程序容器。任务a)可以通过AMRMClientAsync对象异步执行,使用AMRMClientAsync.CallbackHandler中指定的事件处理方法事件处理程序的类型。需要将事件处理程序显式设置为客户端。任务b)可以通过启动可运行对象来执行,然后在分配了容器后启动容器。作为启动此容器的一部分,AM必须指定具有启动信息的ContainerLaunchContext,例如命令行规范,环境等。
在执行应用程序期间,ApplicationMaster通过NMClientAsync对象通信NodeManager 。所有的容器事件被处理NMClientAsync.CallbackHandler,关联NMClientAsync。典型的回调处理程序处理客户端的启动,停止,状态更新和错误。ApplicationMaster还通过处理AMRMClientAsync.CallbackHandler的getProgress()方法向ResourceManager报告执行进度。
除异步客户端外,某些工作流程(AMRMClient和NMClient)还有同步版本。推荐使用异步客户端是因为(在主观上)更简单的用法,本文将主要介绍异步客户端。有关同步客户端的更多信息,请参考AMRMClient和NMClient。
以下是重要的界面:
客户端 <-> ResourceManager
通过使用YarnClient对象。
ApplicationMaster <-> ResourceManager
通过使用AMRMClientAsync对象,AMRMClientAsync.CallbackHandler异步处理事件
ApplicationMaster <-> NodeManager
启动容器。通过使用NodeManagers沟通NMClientAsync对象,处理容器事件由NMClientAsync.CallbackHandler
注意
YARN应用程序的三个主要协议(ApplicationClientProtocol,ApplicationMasterProtocol和ContainerManagementProtocol)仍然保留。3个客户端包装了这3个协议,以为YARN应用程序提供更简单的编程模型。
在极少数情况下,程序员可能希望直接使用这三种协议来实现应用程序。但是,请注意,一般用例不再鼓励此类行为。
客户端需要做的第一步是初始化并启动YarnClient。
YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); yarnClient.start();
设置客户端后,客户端需要创建一个应用程序,并获取其应用程序ID。
YarnClientApplication app = yarnClient.createApplication(); GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
YarnClientApplication对新应用程序的响应还包含有关群集的信息,例如群集的最小/最大资源能力。这是必需的,以确保您可以正确设置将在其中启动ApplicationMaster的容器的规格。有关更多详细信息,请参考GetNewApplicationResponse。
客户端的主要症结在于设置ApplicationSubmissionContext,它定义RM启动AM所需的所有信息。客户需要在上下文中设置以下内容:
应用信息:ID,名称
队列,优先级信息:将向其提交应用程序的队列,为该应用程序分配的优先级。
用户:提交申请的用户
ContainerLaunchContext:定义将在其中启动和运行AM的容器的信息。的ContainerLaunchContext,如先前所提到的,定义了运行该应用程序所需的所有必需的信息,如当地ř物资跟不上(二进制文件,广口瓶,文件等),È nvironment设置(CLASSPATH等)时,Ç ommand要执行和安全Ť okens(RECT)。
//设置应用程序提交上下文 ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); ApplicationId appId = appContext.getApplicationId(); appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setApplicationName(appName); //为应用程序主机设置本地资源 //根据需要本地文件或档案 //在这种情况下,应用程序主服务器的jar文件是本地资源的一部分 Map <String,LocalResource> localResources = new HashMap <String,LocalResource>(); LOG.info(“从本地文件系统复制App Master jar并添加到本地环境”); //将应用程序主jar复制到文件系统 //创建本地资源以指向目标jar路径 FileSystem fs = FileSystem.get(conf); addToLocalResources(fs,appMasterJar,appMasterJarPath,appId.toString(), localResources,null); //根据需要设置log4j属性 如果(!log4jPropFile.isEmpty()){ addToLocalResources(fs,log4jPropFile,log4jPath,appId.toString(), localResources,null); } //必须在最终容器上提供shell脚本 //在哪里执行 //为此,我们需要先复制到可见的文件系统中 //到yarn框架。 //我们不需要将此设置为应用程序的本地资源 // master,因为应用程序master不需要它。 字符串hdfsShellScriptLocation =“”; long hdfsShellScriptLen = 0; long hdfsShellScriptTimestamp = 0; 如果(!shellScriptPath.isEmpty()){ 路径shellSrc = new Path(shellScriptPath); 字符串shellPathSuffix = appName +“ /” + appId.toString()+“ /” + SCRIPT_PATH; 路径shellDst = 新路径(fs.getHomeDirectory(),shellPathSuffix); fs.copyFromLocalFile(false,true,shellSrc,shellDst); hdfsShellScriptLocation = shellDst.toUri()。toString(); FileStatus shellFileStatus = fs.getFileStatus(shellDst); hdfsShellScriptLen = shellFileStatus.getLen(); hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); } 如果(!shellCommand.isEmpty()){ addToLocalResources(fs,null,shellCommandPath,appId.toString(), localResources,shellCommand); } 如果(shellArgs.length> 0){ addToLocalResources(fs,null,shellArgsPath,appId.toString(), localResources,StringUtils.join(shellArgs,“”)); } //设置要在运行应用程序主机的环境中设置的环境变量 LOG.info(“设置应用程序主环境”); Map <String,String> env = new HashMap <String,String>(); //将shell脚本的位置放入env //使用环境信息,应用程序主服务器将为该应用程序创建正确的本地资源 //将启动以执行Shell脚本的最终容器 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION,hdfsShellScriptLocation); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP,Long.toString(hdfsShellScriptTimestamp)); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN,Long.toString(hdfsShellScriptLen)); //将AppMaster.jar位置添加到类路径 //在某些时候,我们不需要添加 //到环境的hadoop特定的类路径。 //应该开箱即用。 //现在设置所有必需的类路径,包括 //指向“。”的类路径 用于应用罐 StringBuilder classPathEnv =新的StringBuilder(Environment.CLASSPATH。$$()) .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(“ ./*”); 对于(String c:conf.getStrings( YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)){ classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR); classPathEnv.append(c.trim()); } classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append( “ ./log4j.properties”); //设置必要的命令以执行应用程序主控 Vector <CharSequence> vargs =新的Vector <CharSequence>(30); //设置Java可执行命令 LOG.info(“设置应用程序主命令”); vargs.add(Environment.JAVA_HOME。$$()+“ / bin / java”); //根据am内存大小设置Xmx vargs.add(“-Xmx” + amMemory +“ m”); //设置班级名称 vargs.add(appMasterMainClass); //为Application Master设置参数 vargs.add(“-container_memory” + String.valueOf(containerMemory)); vargs.add(“-container_vcores” + String.valueOf(containerVirtualCores)); vargs.add(“-num_containers” + String.valueOf(numContainers)); vargs.add(“-priority” + String.valueOf(shellCmdPriority)); 对于(Map.Entry <String,String> entry:shellEnv.entrySet()){ vargs.add(“-shell_env” + entry.getKey()+“ =” + entry.getValue()); } 如果(debugFlag){ vargs.add(“-debug”); } vargs.add(“ 1>” + ApplicationConstants.LOG_DIR_EXPANSION_VAR +“ /AppMaster.stdout”); vargs.add(“ 2>” + ApplicationConstants.LOG_DIR_EXPANSION_VAR +“ /AppMaster.stderr”); //获取最终命令 StringBuilder命令= new StringBuilder(); 对于(CharSequence str:vargs){ command.append(str).append(“”); } LOG.info(“完成设置应用程序主命令” + command.toString()); List <String>命令=新的ArrayList <String>(); commands.add(command.toString()); //为应用程序主服务器设置容器启动上下文 ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( localResources,env,commands,null,null,null); //设置资源类型要求 //目前,内存和vcore均受支持,因此我们设置内存和 // vcores要求 资源能力= Resource.newInstance(amMemory,amVCores); appContext.setResource(capability); //服务数据是可以传递给应用程序的二进制Blob //在这种情况下不需要 // amContainer.setServiceData(serviceData); //设置安全令牌 如果(UserGroupInformation.isSecurityEnabled()){ //注意:对于HDFS和MapReduce,凭据类被标记为LimitedPrivate 凭证凭证= new Credentials(); 字符串tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); 如果(tokenRenewer == null | | tokenRenewer.length()== 0){ 抛出新的IOException( “无法获取RM的主Kerberos主体以用作续订”); } //目前,仅获取默认文件系统的令牌。 最终Token <?>令牌[] = fs.addDelegationTokens(tokenRenewer,凭证); if(令牌!= null){ for(Token <?> token:令牌){ LOG.info(“为dt加上+ fs.getUri()+”;“ +令牌); } } DataOutputBuffer dob =新的DataOutputBuffer(); certificate.writeTokenStorageToStream(dob); ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(),0,dob.getLength()); amContainer.setTokens(fsTokens); } appContext.setAMContainerSpec(amContainer);
//设置应用程序主机的优先级 优先级pri = Priority.newInstance(amPriority); appContext.setPriority(pri); //设置要在RM中提交此应用程序的队列 appContext.setQueue(amQueue); //将应用程序提交给应用程序管理器 // SubmitApplicationResponse SubmitResp = applicationsManager.submitApplication(appRequest); yarnClient.submitApplication(appContext);
至此,RM将在后台接受该应用程序,并将经历分配具有所需规范的容器,然后最终在所分配的容器上设置并启动AM的过程。
客户可以通过多种方式跟踪实际任务的进度。
- 它可以与RM通信,并通过YarnClient的getApplicationReport()方法请求应用程序报告。
//获取我们感兴趣的appId的应用程序报告 ApplicationReport报告= yarnClient.getApplicationReport(appId);
从RM收到的ApplicationReport包括以下内容:
常规应用程序信息:应用程序ID,应用程序提交到的队列,提交应用程序的用户以及应用程序的开始时间。
ApplicationMaster详细信息:运行AM的主机,正在侦听来自客户端的请求的rpc端口(如果有)以及客户端需要与AM通信的令牌。
应用程序跟踪信息:如果应用程序支持某种形式的进度跟踪,则可以设置一个跟踪URL,该URL可通过ApplicationReport的getTrackingUrl()方法获得,客户端可以查看该URL 以监视进度。
应用程序状态:ResourceManager看到的应用程序状态可通过ApplicationReport#getYarnApplicationState获得。如果YarnApplicationState设置为FINISHED,则客户端应参考ApplicationReport#getFinalApplicationStatus来检查应用程序任务本身的实际成功/失败。如果发生故障,ApplicationReport#getDiagnostics可能有助于进一步了解故障。
- 如果ApplicationMaster支持,则客户端可以通过从应用程序报告中获取的host:rpcport信息直接向AM本身查询进度更新。如果可用,它也可以使用从报告中获得的跟踪URL。
yarnClient.killApplication(appId);
AM是工作的实际所有者。它将由RM启动,并且将通过客户端向客户提供有关已负责监督和完成的工作的所有必要信息和资源。
由于AM是在可能(可能会)与其他容器共享物理主机的容器中启动的,考虑到多租户性质,除其他问题外,它无法对诸如可监听的预配置端口之类的任何假设上。
AM启动时,通过环境可以使用几个参数。其中包括AM容器的ContainerId,应用程序提交时间以及有关运行ApplicationMaster的NM(NodeManager)主机的详细信息。有关参数名称,请参见ApplicationConstants。
与RM的所有交互都需要一个ApplicationAttemptId(如果出现故障,每个应用程序可以进行多次尝试)。该ApplicationAttemptId可以从AM的容器ID来获得。有一些助手API可以将从环境中获取的值转换为对象。
Map <String,String> envs = System.getenv(); 字符串containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV); 如果(containerIdString == null){ //容器ID始终应由框架在环境中设置 抛出新的IllegalArgumentException( “未在环境中设置ContainerId”); } ContainerId containerId = ConverterUtils.toContainerId(containerIdString); ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000,allocListener); amRMClient.init(conf); amRMClient.start(); containerListener = createNMCallbackHandler(); nmClientAsync =新的NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); nmClientAsync.start();
//向ResourceManager注册自己 //这将开始对RM发出心跳 appMasterHostname = NetUtils.getHostname(); RegisterApplicationMasterResponse响应= amRMClient .registerApplicationMaster(appMasterHostname,appMasterRpcPort, appMasterTrackingUrl);
//转储有关集群功能的信息,如 //资源管理器 int maxMem = response.getMaximumResourceCapability()。getMemory(); LOG.info(“此群集中资源的最大内存容量” + maxMem); int maxVCores = response.getMaximumResourceCapability()。getVirtualCores(); LOG.info(“此群集中资源的最大vcores功能” + maxVCores); //资源要求不能超过最大值。 如果(containerMemory> maxMem){ LOG.info(“指定的容器内存超出群集的最大阈值。” +“使用最大值。” +“,指定=” + containerMemory +“,max =” + maxMem); containerMemory = maxMem; } 如果(containerVirtualCores> maxVCores){ LOG.info(“指定的虚拟机容器虚拟核心超过群集的最大阈值。” +“使用最大值。” +“,指定=” =“ + containerVirtualCores +”,max =“ + maxVCores); containerVirtualCores = maxVCores; } List <Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts(); LOG.info(“ Received” + previousAMRunningContainers.size() +“先前AM注册时正在运行的AM容器。”);
List <Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts(); LOG.info(“ Received” + previousAMRunningContainers.size() +“先前AM注册时正在运行的AM容器。”); int numTotalContainersToRequest = numTotalContainers-previousAMRunningContainers.size(); //安装程序从RM索要容器 //向RM发送容器请求 //在获得完全分配的配额之前,我们将继续轮询RM //容器 //继续循环,直到启动所有容器和shell脚本 //在它们上执行(无论成功/失败)。 for(int i = 0; i <numTotalContainersToRequest; ++ i){ ContainerRequest containerAsk = setupContainerAskForRM(); amRMClient.addContainerRequest(containerAsk); }
资源功能:目前,YARN支持基于内存的资源需求,因此请求应定义所需的内存量。该值以MB为单位定义,并且必须小于群集的最大容量,并且必须是最小容量的精确倍数。内存资源对应于任务容器上施加的物理内存限制。如代码所示,它还将支持基于计算的资源(vCore)。
优先级:当请求容器组时,AM可以为每个组定义不同的优先级。例如,Map-Reduce AM可以为Map任务所需的容器分配较高的优先级,而为Reduce任务的容器分配较低的优先级。
私人ContainerRequest setupContainerAskForRM(){ //主机的设置要求 //使用*作为任何主机将对分布式Shell应用程序执行 //设置请求的优先级 优先级pri = Priority.newInstance(requestPriority); //设置资源类型要求 //目前,内存和CPU受支持,因此我们设置内存和CPU要求 资源能力= Resource.newInstance(containerMemory, containerVirtualCores); ContainerRequest request = new ContainerRequest(capability,null,null, pri); LOG.info(“请求的容器询问:” + request.toString()); 退货要求; }
- 分配了容器后,处理程序将建立一个线程,该线程运行代码以启动容器。在这里,我们使用名称LaunchContainerRunnable进行演示。我们将在本文的以下部分中讨论LaunchContainerRunnable类。
@Override public void onContainersAllocated(List <Container> locatedContainers){ LOG.info(“ RM对容器询问的响应,allocateCnt =” + located.containers.size()); numAllocatedContainers.addAndGet(allocatedContainers.size()); 对于(容器locatedContainer:已分配容器){ LaunchContainerRunnable runnableLaunchContainer = 新的LaunchContainerRunnable(allocatedContainer,containerListener); 线程launchThread =新线程(runnableLaunchContainer); //在单独的线程上启动和启动容器以保持 //主线程畅通无阻 //因为可能不会一次分配所有容器。 launchThreads.add(launchThread); launchThread.start(); } }
- 心跳时,事件处理程序将报告应用程序的进度。
@Override 公众持股getProgress(){ //设置进度以在下一次心跳时交付给RM 浮动进度=(浮动)numCompletedContainers.get() / numTotalContainers; 返回进度; }
//设置必要的命令以在分配的容器上执行 Vector <CharSequence> vargs =新的Vector <CharSequence>(5); //设置可执行命令 vargs.add(shellCommand); //设置shell脚本路径 如果(!scriptPath.isEmpty()){ vargs.add(Shell.WINDOWS?ExecBatScripStringtPath :ExecShellStringPath); } //为shell命令设置args vargs.add(shellArgs); //添加日志重定向参数 vargs.add(“ 1>” + ApplicationConstants.LOG_DIR_EXPANSION_VAR +“ / stdout”); vargs.add(“ 2>” + ApplicationConstants.LOG_DIR_EXPANSION_VAR +“ / stderr”); //获取最终命令 StringBuilder命令= new StringBuilder(); 对于(CharSequence str:vargs){ command.append(str).append(“”); } List <String>命令=新的ArrayList <String>(); commands.add(command.toString()); //设置ContainerLaunchContext,设置本地资源,环境, //构造函数的命令和令牌。 //注意令牌:也为容器设置令牌。今天,正常 // shell命令,distribute-shell中的容器不需要任何 //令牌。我们主要是为了使NodeManagers能够填充它们 //下载分布式文件系统中的任何文件。令牌是 //否则在某些情况下也很有用,例如当一个人正在运行一个 //分布式外壳中的“ hadoop dfs”命令。 ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources,shellEnv,命令,null,allTokens.duplicate(),null); containerListener.addContainer(container.getId(),容器); nmClientAsync.startContainerAsync(container,ctx);
该NMClientAsync对象,其事件处理一起,处理容器事件。包括容器的启动,停止,状态更新以及发生错误。
ApplicationMaster确定工作完成后,需要通过AM-RM客户端注销自身,然后停止该客户端。
尝试{ amRMClient.unregisterApplicationMaster(appStatus,appMessage,null); } catch(YarnException ex){ LOG.error(“无法取消注册应用程序”,例如); } catch(IOException e){ LOG.error(“无法注销应用程序失败”,e); } amRMClient.stop();
您可以使用LocalResource将资源添加到您的应用程序请求中。这将导致YARN将资源分发到ApplicationMaster节点。如果资源是tgz,zip或jar-您可以让YARN解压缩。然后,您要做的就是将解压缩的文件夹添加到您的类路径中。例如,在创建应用程序请求时:
文件packageFile = new File(packagePath); URL packageUrl = ConverterUtils.getYarnUrlFromPath( FileContext.getFileContext()。makeQualified(new Path(packagePath))); packageResource.setResource(packageUrl); packageResource.setSize(packageFile.length()); packageResource.setTimestamp(packageFile.lastModified()); packageResource.setType(LocalResourceType.ARCHIVE); packageResource.setVisibility(LocalResourceVisibility.APPLICATION); resource.setMemory(memory); containerCtx.setResource(resource); containerCtx.setCommands(ImmutableList.of( “ java -cp'./package/*'some.class.to.Run” +“ 1>” + ApplicationConstants.LOG_DIR_EXPANSION_VAR +“ / stdout” +“ 2>” + ApplicationConstants.LOG_DIR_EXPANSION_VAR +“ / stderr”)); containerCtx.setLocalResources( Collections.singletonMap(“ package”,packageResource)); appCtx.setApplicationId(appId); appCtx.setUser(user.getShortUserName); appCtx.setAMContainerSpec(containerCtx); yarnClient.submitApplication(appCtx);
如您所见,setLocalResources命令将名称映射到资源。该名称成为应用程序的cwd中的符号链接,因此您可以仅使用./package/*引用内部的工件。
注意:Java的classpath(cp)参数非常敏感。确保语法完全正确。
将程序包分发到AM后,只要AM启动新容器,就需要遵循相同的过程(假设您希望将资源发送到容器)。此代码是相同的。您只需要确保为AM提供了包路径(HDFS或本地),以便它可以将资源URL与容器ctx一起发送。
的ApplicationAttemptId将被传递到通过环境和来自环境的值AM可以被转换成一个ApplicationAttemptId经由ConverterUtils辅助函数的对象。