Spark on Yarn: Cluster模式Scheduler实现

背景

Spark on Yarn分yarn-cluster和yarn-client两种模式。 本文通过Cluster模式的TaskScheduler实现入手,梳理一遍spark on yarn的大致实现逻辑。

前提我对两种模式以及yarn任务的整体运行逻辑不是很清楚。

主体逻辑

cluster模式中,使用的TaskScheduler是YarnClusterScheduler。 它继承了默认使用的TaskSchedulerImpl类,额外在postStartHook方法里,唤醒了ApplicationMaster类的设置sparkcontext的方法。 ApplicationMaster相当于是spark在yarn上的AM,内部的YarnRMClient类,负责向RM注册和注销AM,以及拿到attemptId。注册AM之后,得到一个可以申请/释放资源的YarnAllocationHandler类,从而可以维护container与executor之间的关系。

下节具体介绍几个主要类的实现逻辑。

具体实现AM

ApplicationMaster,通过YarnRMClient来完成自己的注册和注销。

AM的启动方式

/** * This object does not provide any special functionality. It exists so that it’s easy to tell * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps. */object ExecutorLauncher { def main(args: Array[String]) = {ApplicationMaster.main(args) }}

main里面调用AM的run方法:

def main(args: Array[String]) = {SignalLogger.register(log)val amArgs = new ApplicationMasterArguments(args)SparkHadoopUtil.get.runAsSparkUser { () =>master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))System.exit(master.run())} }

如果AM的启动参数里有用户自己定义的类,则是Driver模式,即cluster模式。用户自己定义的类里面带了spark driver,会在单独一个线程里启动。这也是cluster模式与client模式的区别,用户实现了driver vs 用户只是提交app。

run方法里 1. 如果不是Driver模式,执行runExecutorLauncher逻辑: 启动后,执行registerAM,里面new了YarnAllocator的实现,调用allocateResources,申请并执行container。同时,启动一个reporter线程,每隔一段时间调用YarnAllocator的allocateResources方法,或汇报有太多executor fail了。 2. 如果是Driver模式,执行runDriver逻辑: 也是执行registerAM,,但是之前需要反射执行jar包里用户定义的driver类。

YarnAllocator

YarnAllocator负责向yarn申请和释放containers,维护containe、executor相关关系,有一个线程池。申请到container之后,在container里执行ExecutorRunnable。需要子类实现的是申请和释放这两个方法:

protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponseprotected def releaseContainer(container: Container): Unit

YarnAllocationHandler继承了YarnAllocator。

allocateContainers方法: Yarn api里提供ResourceRequest这个类,里面包含了一个app向RM索要不同container的信息,包括机器名/机架名,cpu和mem资源数,container数,优先级,locality是否放松。然后组成AllocateRequest类,代表AM向RM从集群里获得resource。调用ApplicationMasterProtocal的allocate(AllocateRequest),由AM**向RM发起资源请求**。releaseContainer方法: 每次把需要release的container记录下来。在每次allocateContainers调用的时候, 会往AllocateRequest里addAllReleases(releasedContainerList),在请求资源的时候顺便把历史资源释放掉。

ExecutorRunnable与Yarn的关系: 1. 向ContainerManager建立连接,让cm来startContainer。 2. ContainerLaunchContext包含了yarn的NodeManager启动一个container需要的所有信息。ExecutorRunnable会构建这个container申请信息。 可以参考这段启动逻辑:

def startContainer = {logInfo(“Setting up ContainerLaunchContext”)val ctx = Records.newRecord(classOf[ContainerLaunchContext]).asInstanceOf[ContainerLaunchContext]ctx.setContainerId(container.getId())ctx.setResource(container.getResource())val localResources = prepareLocalResourcesctx.setLocalResources(localResources)val env = prepareEnvironmentctx.setEnvironment(env)ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())val credentials = UserGroupInformation.getCurrentUser().getCredentials()val dob = new DataOutputBuffer()credentials.writeTokenStorageToStream(dob)ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,appAttemptId, localResources)logInfo(“Setting up executor with commands: ” + commands)ctx.setCommands(commands)ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))(sparkConf.getBoolean(“spark.shuffle.service.enabled”, false)) {val secretString = securityMgr.getSecretKey()val secretBytes =if (secretString != null) {// This conversion must match how the YarnShuffleService decodes our secretJavaUtils.stringToBytes(secretString)} else {// Authentication is not enabled, so just provide dummy metadataByteBuffer.allocate(0)}ctx.setServiceData(Map[String, ByteBuffer](“spark_shuffle” -> secretBytes))}// Send the start request to the ContainerManagerval startReq = Records.newRecord(classOf[StartContainerRequest]).asInstanceOf[StartContainerRequest]startReq.setContainerLaunchContext(ctx)cm.startContainer(startReq) }突然之间失去了语言。那才是真正的寂寞,

Spark on Yarn: Cluster模式Scheduler实现

相关文章:

你感兴趣的文章:

标签云: