Spark入门到精通:第四节 Spark编程模型(一)

作者:周志湖 网名:摇摆少年梦 微信号:zhouzhihubeyond

本节主要内容Spark重要概念弹性分布式数据集(RDD)基础1. Spark重要概念

本节部分内容源自官方文档:

(1)Spark运行模式

目前最为常用的Spark运行模式有: – local:本地线程方式运行,主要用于开发调试Spark应用程序 – Standalone:利用Spark自带的资源管理与调度器运行Spark集群,采用Master/Slave结构,为解决单点故障,可以采用ZooKeeper实现高可靠(High Availability,HA) – Apache Mesos :运行在著名的Mesos资源管理框架基础之上,该集群运行模式将资源管理交给Mesos,Spark只负责进行任务调度和计算 – Hadoop YARN : 集群运行在Yarn资源管理器上,资源管理交给Yarn,Spark只负责进行任务调度和计算 Spark运行模式中Hadoop YARN的集群运行方式最为常用,本课程中的第一节便是采用Hadoop YARN的方式进行Spark集群搭建。如此Spark便与Hadoop生态圈完美搭配,组成强大的集群,可谓无所不能。

(2)Spark组件(Components)

一个完整的Spark应用程序,如前一节当中SparkWordCount程序,在提交集群运行时,它涉及到如下图所示的组件:

各Spark应用程序以相互独立的进程集合运行于集群之上,由SparkContext对象进行协调,SparkContext对象可以视为Spark应用程序的入口,被称为driver program,SparkContext可以与不同种类的集群资源管理器(Cluster Manager),例如Hadoop Yarn、Mesos等 进行通信,从而分配到程序运行所需的资源,获取到集群运行所需的资源后,SparkContext将得到集群中其它工作节点(Worker Node) 上对应的Executors (不同的Spark应用程序有不同的Executor,它们之间也是独立的进程,Executor为应用程序提供分布式计算及数据存储功能),之后SparkContext将应用程序代码分发到各Executors,最后将任务(Task)分配给executors执行。

Term(术语) Meaning(解释)

Application(Spark应用程序) 运行于Spark上的用户程序,由集群上的一个driver program(包含SparkContext对象)和多个executor线程组成

Application jar(Spark应用程序JAR包) Jar包中包含了用户Spark应用程序,如果Jar包要提交到集群中运行,不需要将其它的Spark依赖包打包进行,在运行时

Driver program 包含main方法的程序,负责创建SparkContext对象

Cluster manager 集群资源管理器,例如Mesos,Hadoop Yarn

Deploy mode 部署模式,用于区别driver program的运行方式:集群模式(cluter mode),driver在集群内部启动;客户端模式(client mode),driver进程从集群外部启动

Worker node 工作节点, 集群中可以运行Spark应用程序的节点

Executor Worker node上的进程,该进程用于执行具体的Spark应用程序任务,负责任务间的数据维护(数据在内存中或磁盘上)。不同的Spark应用程序有不同的Executor

Task 运行于Executor中的任务单元,Spark应用程序最终被划分为经过优化后的多个任务的集合(在下一节中将详细阐述)

Job 由多个任务构建的并行计算任务,具体为Spark中的action操作,如collect,save等)

Stage 每个job将被拆分为更小的task集合,这些任务集合被称为stage,各stage相互独立(类似于MapReduce中的map stage和reduce stage),由于它由多个task集合构成,因此也称为TaskSet

2. 弹性分布式数据集(RDD)基础

弹性分布式数据集(RDD,Resilient Distributed Datasets),由Berkeley实验室于2011年提出,原始论文名字:Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing 原始论文非常值得一读,是研究RDD的一手资料,本节内容大部分将基于该论文。

(1)RDD设计目标

RDD用于支持在并行计算时能够高效地利用中间结果,支持更简单的编程模型,同时也具有像MapReduce等并行计算框架的高容错性、能够高效地进行调度及可扩展性。RDD的容错通过记录RDD转换操作的lineage关系来进行,lineage记录了RDD的家族关系,当出现错误的时候,直接通过lineage进行恢复。RDD最合数据挖掘, 机器学习及图计算,因此这些应用涉及到大家的迭代计算,基于内存能够极大地提升其在分布式环境下的执行效率;RDD不适用于诸如分布式爬虫等需要频繁更新共享状态的任务。

下面给出的是在spark-shell中如何查看RDD的Lineage

//textFile读取hdfs根目录下的README.md文件,然后筛选出所有包括Spark的行scala> val rdd2=sc.textFile(“/README.md”).filter(line => line.contains(“Spark”))rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[<console>:21//toDebugString方法会打印出RDD的家族关系//可以看到textFile方法会生成两个RDD,分别是HadoopRDD//MapPartitionsRDD,而filter同时也会生成新的MapPartitionsRDDscala> rdd2.toDebugString: 1res0: String = (<console>:21 [] | MapPartitionsRDD[1] at textFile at <console>:21 [] | /README.md HadoopRDD[0] at textFile at <console>:21 [](2)RDD抽象

RDD在Spark中是一个只读的(val类型)、经过分区的记录集合。RDD在Spark中只有两种创建方式:(1)从存储系统中创建;(2)从其它RDD中创建。从存储中创建有多种方式,可以是本地文件系统,也可以是分布式文件系统,还可以是内存中的数据。 下面的代码演示的是从HDFS中创建RDD

scala> sc.textFile(“/README.md”)[String] = MapPartitionsRDD[4] at textFile at <console>:22

下面的代码演示的是从内存中创建RDD

//内存中定义了一个数组: Array[Int] = Array(1, 2, 3, 4, 5)//通过parallelize方法创建: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:23寂寞时,想想我的影子,我会在远方给你一个微笑;难过时,

Spark入门到精通:第四节 Spark编程模型(一)

相关文章:

你感兴趣的文章:

标签云: