Hadoop 文档

General

Common

HDFS

MapReduce

MapReduce REST APIs

YARN

YARN REST APIs

YARN Service

Submarine

Hadoop Compatible File Systems

Auth

Tools

Reference

Configuration

目的

众所周知,YARN可以扩展到数千个节点。YARN的可扩展性由资源管理器确定,并且与节点数,活动应用程序,活动容器和(节点和应用程序的)心跳频率成比例。降低心跳可以增加可伸缩性,但不利于利用率(请参阅旧的Hadoop 1.x经验)。该文档介绍了一种基于联合的方法,通过联合多个YARN子群集将单个YARN群集扩展到成千上万个节点。所提出的方法是将一个大型(10-100k个节点)集群划分为称为子集群的较小单元,每个子单元都有自己的YARN RM和计算节点。联合系统会将这些子集群缝合在一起,使它们在应用程序中看起来像是一个大型YARN集群。在此联合环境中运行的应用程序将看到单个大型YARN群集,并将能够在联合群集的任何节点上调度任务。在后台,联合系统将与子集群的资源管理器进行协商,并为应用程序提供资源。目标是允许单个作业无缝“扩展”子集群。

由于我们限制了每个RM负责的节点数量,并且采用适当的策略,该设计在结构上是可扩展的,将尝试确保大多数应用程序驻留在单个子集群中,因此每个RM可以看到的应用程序数量也有界。这意味着我们可以通过简单地添加子集群(几乎不需要它们之间的协调)来几乎线性地扩展规模。这种架构可以非常严格地强制执行每个子集群中的调度不变式(仅继承自YARN),而跨子集群的连续重新平衡将(不太严格地)强制要求在全局级别也要尊重这些属性(例如,群集丢失了大量节点,我们可以将队列重新映射到其他子群集,以确保在受损的子群集上运行的用户不会受到不公平的影响)。

联合身份验证被设计为现有YARN代码库之上的“层”,而核心YARN机制的更改有限。

假设:

  • 我们假设各个子集群之间的连通性相当好(例如,尽管目前的调查未排除在外,但我们尚未考虑在DC上建立联盟)。
  • 我们依靠HDFS联合(或同等可扩展的DFS解决方案)来照顾商店方面的可扩展性。

建筑

已知OSS YARN可以扩展到大约数千个节点。提出的体系结构利用了将多个此类较小的YARN群集(称为子群集)联合到包含数万个节点的较大的联合YARN群集中的概念。在此联合环境中运行的应用程序可以看到一个统一的大型YARN群集,并且能够在群集中的任何节点上计划任务。在后台,联合系统将与子集群RM协商,并为应用程序提供资源。图1中的逻辑体系结构显示了组成联合集群的主要组件,下面将对其进行描述。

YARN联合建筑|  宽度= 800

YARN子集群

子集群是具有多达数千个节点的YARN集群。子群集的确切大小将根据部署/维护的简便性,与网络或可用性区域的对齐方式以及一般最佳实践来确定。

子群集YARN RM将在保持工作高可用性的情况下运行,即,我们应该能够以最小的中断容忍YARN RM,NM故障。如果整个子集群遭到破坏,则外部机制将确保作业在单独的子集群中重新提交(最终可能会包含在联合身份验证设计中)。

子集群还是联合环境中的可伸缩性单元。我们可以通过添加一个或多个子集群来扩展联盟环境。

注意:根据设计,每个子集群都是功能齐全的YARN RM,并且可以将其对联盟的贡献设置为仅占其总容量的一小部分,即子集群可以对联盟具有“部分”承诺,同时保留以完全本地化的方式发挥其部分能力的能力。

路由器

YARN应用程序被提交到其中一个路由器,后者又应用了路由策略(从策略存储中获取),在状态存储中查询子集群URL,并将应用程序提交请求重定向到适当的子集群RM。我们将作业开始的子集群称为“家庭子集群”,将“第二子集群”称为工作跨接的所有其他子集群。路由器将ApplicationClientProtocol暴露给外界,透明地隐藏了多个RM的存在。为此,路由器还将应用程序及其本地子群集之间的映射保持在状态存储中。这样可以使路由器处于软状态,同时廉价地支持用户请求,因为任何路由器都可以将该应用程序恢复到归属子群集映射,并将请求定向到正确的RM,而无需广播它们。对于性能缓存和会话粘性,建议使用。联盟的状态(包括应用程序和节点)通过Web UI公开。

AMRM代理

AMRMProxy是允许应用程序跨子集群扩展和运行的关键组件。AMRMProxy在所有NM机器上运行,并通过实现ApplicationMasterProtocol充当AM的YARN RM的代理。应用程序将不允许直接与子集群RM通信。它们被系统强制仅连接到AMRMProxy端点,这将提供对多个YARN RM的透明访问(通过动态路由/拆分/合并通信)。在任何时候,一项作业都可以跨越一个家庭子集群和多个辅助子集群,但是AMRMProxy中运行的策略试图限制每个作业的占用空间,以最大程度地减少调度基础结构的开销(有关可伸缩性的更多信息/加载)。图中显示了ARMMProxy的拦截器链体系结构。

AMRMProxy拦截器链的体系结构|  宽度= 800

AMRMProxy的作用

  1. 保护子群集YARN RM免受AM行为的影响。AMRMProxy可以通过限制/杀死要求过多资源的AM来防止DDOS攻击。
  2. 屏蔽群集中的多个YARN RM,并可以透明地允许AM跨越子群集。所有容器分配都是由YARN RM框架完成的,YARN RM框架由面向家庭和其他子集群RM的AMRMProxy组成。
  3. 拦截所有请求,因此它可以强制执行应用程序配额,而子群集RM不能实施该配额(因为每个配额只能看到一部分AM请求)。
  4. AMRMProxy可以强制执行负载平衡/溢出策略。

全球政策制定者

全局策略生成器会忽略整个联合身份,并确保始终正确配置和调整系统。一个关键的设计点是,群集可用性不依赖于始终在线的GPG。GPG连续运行,但在所有集群操作之外都处于带外状态,并为我们提供了一个独特的优势,该优势使我们可以强制执行全局不变性,影响负载平衡,触发将要维护的子集群的排空等。 GPG将更新用户容量分配到子群集的映射,并且很少更改在路由器,AMRMProxy(以及可能的RM)中运行的策略。

万一GPG不可用,集群操作将自GPG上次发布政策起继续进行,而长期不可用可能意味着一些理想的平衡属性,最佳集群利用率和全局不变性可能会逐渐消失,计算和数据访问不会受到损害。

注意:在当前实现中,GPG是手动调整过程,只需通过CLI(YARN-3657)即可公开。

联邦系统的这一部分是YARN-5597未来工作的一部分。

联邦州立商店

联邦状态定义了将多个子集群松散耦合到单个大型联合集群中所需的附加状态。其中包括以下信息:

子集群成员

成员YARN RM不断向状态存储发送心跳,以保持活动状态并发布其当前功能/负载信息。全球策略生成器(GPG)使用此信息来制定适当的策略决策。路由器也可以使用此信息来选择最佳家庭子群集。这种机制使我们能够通过添加或删除子集群来动态地增长/缩小“集群群”。这也使每个子集群的维护变得容易。这是需要添加到YARN RM的新功能,但是由于与单独的YARN RM HA相似,因此对这些机制的理解也很好。

应用程序的家庭子集群

运行Application Master(AM)的子集群称为应用程序的“ home子集群”。AM不仅限于来自本地子集群的资源,而且还可以请求来自其他子集群(称为辅助子集群)的资源。将对联合环境进行定期配置和调整,以便在将AM放在子集群上时,它应该能够在家庭子集群上找到大部分资源。仅在某些情况下,才需要从其他子集群中请求资源。

联邦政策商店

联盟策略存储是一个逻辑上独立的存储(虽然它可能由相同的物理组件支持),其中包含有关如何将应用程序和资源请求路由到不同子集群的信息。当前的实现提供了几种策略,范围从随机/哈希/循环/优先级到更复杂的策略,这些策略考虑了子集群的负载并请求了局部性需求。

跨子集群运行应用程序

提交应用程序后,系统将确定最合适的子集群来运行该应用程序,我们将其称为应用程序的主子集群。从AM到RM的所有通信都将通过AM计算机上本地运行的AMRMProxy进行代理。AMRMProxy公开与YARN RM相同的ApplicationMasterService协议终结点。AM可以使用存储层公开的位置信息来请求容器。在理想情况下,该应用程序将放置在一个子集群中,该应用程序将需要该应用程序所需的所有资源和数据,但是如果它确实需要其他子集群中的节点上的容器,则AMRMProxy将与这些子集群的RM协商。子集群透明地提供给应用程序,从而使应用程序可以将整个联合环境视为一个大型YARN群集。AMRMProxy,全局策略生成器(GPG)和路由器一起工作,以实现无缝连接。

联邦序列图|  宽度= 800

该图显示了以下作业执行流程的序列图:

  1. 路由器收到对YARN应用程序客户端协议投诉的应用程序提交请求。
  2. 路由器查询路由表/策略以选择作业的“家乡RM”(策略配置是从心跳中的状态存储中接收到的)。
  3. 路由器查询成员资格状态以确定归属RM的端点。
  4. 然后,路由器将应用程序提交请求重定向到归属RM。
  5. 路由器使用本地子群集标识符更新应用程序状态。
  6. 一旦将应用程序提交到家庭RM,就会触发库存YARN流,即,将应用程序添加到调度程序队列中,并在具有可用资源的第一个NodeManager上的家庭子群集中启动其AM。一个。在此过程中,通过指示要与之交谈的YARN RM作为AMRMProxy的地址来修改AM环境。b。NM在启动AM时也会修改安全令牌,因此AM只能与AMRMProxy进行通信。从AM到YARN RM的任何将来通信都是由AMRMProxy介导的。
  7. 然后,AM将使用HDFS公开的位置信息请求容器。
  8. 根据策略,AMRMProxy可以通过提交非托管AM并将AM心跳转发给相关的子群集,从而在其他子群集上模拟AM。一个。联合身份验证通过AMRMProxy HA支持多种应用程序尝试。AM容器在主子集群中将具有不同的尝试ID,但是在次要尝试中将使用相同的非托管AM在次级中。b。启用AMRMProxy HA后,UAM令牌将存储在Yarn注册表中。在每次尝试应用程序的registerApplicationMaster调用中,AMRMProxy将从注册表(如果有)中获取现有的UAM令牌,然后重新附加到现有的UAM。
  9. AMRMProxy将使用位置信息和状态存储中配置的可插拔策略来决定是将AM接收到的资源请求转发到归属RM还是一个(或多个)辅助RM。在图1中,我们显示了AMRMProxy决定将请求转发到辅助RM的情况。
  10. 辅助RM将为AMRMProxy提供有效的容器令牌,以在其子群集中的某个节点上启动新容器。此机制可确保每个子集群使用其自己的安全令牌,并避免了需要群集范围的共享机密来创建令牌。
  11. AMRMProxy将分配响应转发回AM。
  12. AM使用标准YARN协议在目标NodeManager上(子集群2上)启动容器。

组态

要将YARN配置为使用联合,请在conf / yarn-site.xml中设置以下属性:

到处:

这些是通用配置,应显示在联合身份验证中每台计算机的conf / yarn-site.xml中。

属性 描述
启用yarn.federation 真正 是否启用联盟
yarn.resourcemanager.cluster-id <unique-subcluster-id> 此RM的唯一子集群标识符(与用于HA的子集群标识符相同)。

州立商店:

当前,我们支持状态存储的ZooKeeper和基于SQL的实现。

注意:必须始终使用以下其中之一覆盖State-Store实现。

ZooKeeper:必须为Hadoop设置ZooKeeper设置:

属性 描述
yarn.federation.state-store.class org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore 要使用的状态存储的类型。
hadoop.zk.address 主机:端口 ZooKeeper合奏的地址。

SQL:必须设置以下参数:

属性 描述
yarn.federation.state-store.class org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore 要使用的状态存储的类型。
yarn.federation.state-store.sql.url jdbc:mysql:// <主机>:<端口> / FederationStateStore 对于SQLFederationStateStore,存储状态的数据库的名称。
yarn.federation.state-store.sql.jdbc-class com.mysql.jdbc.jdbc2.optional.MysqlDataSource 对于SQLFederationStateStore,可以使用jdbc类。
yarn.federation.state-store.sql.username <dbuser> 对于SQLFederationStateStore,用于数据库连接的用户名。
yarn.federation.state-store.sql.password <dbpass> 对于SQLFederationStateStore,用于数据库连接的密码。

我们提供用于MySQL和Microsoft SQL Server的脚本。

对于MySQL,必须从MVN Repository下载最新的jar版本5.x ,并将其添加到CLASSPATH。然后,通过在数据库中执行以下SQL脚本来创建数据库模式:

  1. sbin / FederationStateStore / MySQL / FederationStateStoreDatabase.sql
  2. sbin / FederationStateStore / MySQL / FederationStateStoreUser.sql
  3. sbin / FederationStateStore / MySQL / FederationStateStoreTables.sql
  4. sbin / FederationStateStore / MySQL / FederationStateStoreStoredProcs.sql

在同一目录中,我们提供脚本来删除存储过程,表,用户和数据库。

注意: FederationStateStoreUser.sql为数据库定义了默认的用户名/密码,强烈建议您将其设置为正确的强密码。

对于SQL Server,该过程类似,但是已经包含了jdbc驱动程序。SQL Server脚本位于sbin / FederationStateStore / SQLServer /中

可选的:

属性 描述
yarn.federation.failover.enabled 真正 是否应该重试考虑每个子群集中的RM故障转移。
yarn.federation.blacklist-subclusters <subcluster-id> 列入黑名单的子集群列表,可用于禁用子集群
纱线联合会政策经理 org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager 策略管理器的选择确定如何通过系统路由应用程序和资源请求。
yarn.federation.policy-manager-params <二进制> 配置策略的有效负载。在我们的示例中,路由器和amrmproxy策略的一组权重。这通常是通过序列化以编程方式配置的策略管理器,或通过以.json序列化形式填充状态存储来生成的。
yarn.federation.subcluster-resolver.class org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl 用于解析节点属于哪个子集群以及机架属于哪个子集群的类。
纱线联合机械列表 <机器列表文件的路径> SubClusterResolver使用的计算机列表文件的路径。文件的每一行都是一个带有子集群和机架信息的节点。下面是示例:

node1,subcluster1,机架1
node2,subcluster2,rack1
node3,
subcluster3,rack2 node4,subcluster3,rack2

在RM上:

这些是额外的配置,应显示在每个ResourceManager 的conf / yarn-site.xml中。

属性 描述
yarn.resourcemanager.epoch <独特时代> The seed value for the epoch. This is used to guarantee uniqueness of container-IDs generate by different RMs. It must therefore be unique among sub-clusters and well-spaced to allow for failures which increment epoch. Increments of 1000 allow for a large number of sub-clusters and practically ensure near-zero chance of collisions (a clash will only happen if a container is still alive for 1000 restarts of one RM, while the next RM never restarted, and an app requests more containers).

Optional:

Property Example Description
yarn.federation.state-store.heartbeat-interval-secs 60 The rate at which RMs report their membership to the federation to the central state-store.

ON ROUTER:

These are extra configurations that should appear in the conf/yarn-site.xml at each Router.

Property Example Description
yarn.router.bind-host 0.0.0.0 Host IP to bind the router to. The actual address the server will bind to. If this optional address is set, the RPC and webapp servers will bind to this address and the port specified in yarn.router.*.address respectively. This is most useful for making Router listen to all interfaces by setting to 0.0.0.0.
yarn.router.clientrm.interceptor-class.pipeline org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor A comma-separated list of interceptor classes to be run at the router when interfacing with the client. The last step of this pipeline must be the Federation Client Interceptor.

Optional:

Property Example Description
yarn.router.hostname 0.0.0.0 Router host name.
yarn.router.clientrm.address 0.0.0.0:8050 Router client address.
yarn.router.webapp.address 0.0.0.0:8089 Webapp address at the router.
yarn.router.admin.address 0.0.0.0:8052 Admin address at the router.
yarn.router.webapp.https.address 0.0.0.0:8091 Secure webapp address at the router.
yarn.router.submit.retry 3 The number of retries in the router before we give up.
yarn.federation.statestore.max-connections 10 This is the maximum number of parallel connections each Router makes to the state-store.
yarn.federation.cache-ttl.secs 60 The Router caches informations, and this is the time to leave before the cache is invalidated.
yarn.router.webapp.interceptor-class.pipeline org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST A comma-seperated list of interceptor classes to be run at the router when interfacing with the client via REST interface. The last step of this pipeline must be the Federation Interceptor REST.

ON NMs:

These are extra configurations that should appear in the conf/yarn-site.xml at each NodeManager.

Property Example Description
yarn.nodemanager.amrmproxy.enabled true Whether or not the AMRMProxy is enabled.
yarn.nodemanager.amrmproxy.interceptor-class.pipeline org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor A comma-separated list of interceptors to be run at the amrmproxy. For federation the last step in the pipeline should be the FederationInterceptor.

Optional:

Property Example Description
yarn.nodemanager.amrmproxy.ha.enable true Whether or not the AMRMProxy HA is enabled for multiple application attempt support.
yarn.federation.statestore.max-connections 1 The maximum number of parallel connections from each AMRMProxy to the state-store. This value is typically lower than the router one, since we have many AMRMProxy that could burn-through many DB connections quickly.
yarn.federation.cache-ttl.secs 300 The time to leave for the AMRMProxy cache. Typically larger than at the router, as the number of AMRMProxy is large, and we want to limit the load to the centralized state-store.

Running a Sample Job

In order to submit jobs to a Federation cluster one must create a separate set of configs for the client from which jobs will be submitted. In these, the conf/yarn-site.xml should have the following additional configurations:

Property Example Description
yarn.resourcemanager.address <router_host>:8050 Redirects jobs launched at the client to the router’s client RM port.
yarn.resourcemanager.scheduler.address localhost:8049 Redirects jobs to the federation AMRMProxy port.

Any YARN jobs for the cluster can be submitted from the client configurations described above. In order to launch a job through federation, first start up all the clusters involved in the federation as described here. Next, start up the router on the router machine with the following command:

  $HADOOP_HOME/bin/yarn --daemon start router

Now with $HADOOP_CONF_DIR pointing to the client configurations folder that is described above, run your job the usual way. The configurations in the client configurations folder described above will direct the job to the router’s client RM port where the router should be listening after being started. Here is an example run of a Pi job on a federation cluster from the client:

  $HADOOP_HOME/bin/yarn jar hadoop-mapreduce-examples-3.0.0.jar pi 16 1000

This job is submitted to the router which as described above, uses a generated policy from the GPG to pick a home RM for the job to which it is submitted.

The output from this particular example job should be something like:

  2017-07-13 16:29:25,055 INFO mapreduce.Job: Job job_1499988226739_0001 running in uber mode : false
  2017-07-13 16:29:25,056 INFO mapreduce.Job:  map 0% reduce 0%
  2017-07-13 16:29:33,131 INFO mapreduce.Job:  map 38% reduce 0%
  2017-07-13 16:29:39,176 INFO mapreduce.Job:  map 75% reduce 0%
  2017-07-13 16:29:45,217 INFO mapreduce.Job:  map 94% reduce 0%
  2017-07-13 16:29:46,228 INFO mapreduce.Job:  map 100% reduce 100%
  2017-07-13 16:29:46,235 INFO mapreduce.Job: Job job_1499988226739_0001 completed successfully
  .
  .
  .
  Job Finished in 30.586 seconds
  Estimated value of Pi is 3.14250000......

也可以在RouterWeb UI上的routerhost:8089上跟踪作业的状态。请注意,使用联合身份验证无需更改代码或重新编译输入jar。同样,此作业的输出与不进行联合运行时的输出完全相同。另外,为了获得联合的全部好处,请使用足够数量的映射器,以便需要多个群集。在上面的示例中,该数字恰好是16。