Spark Task未序列化(Task not serializable)问题分析

问题描述及原因分析

在编写Spark程序中,由于在map等算子内部使用了外部定义的变量和函数,从而引发Task未序列化问题。然而,Spark算子在计算过程中使用外部变量在许多情形下确实在所难免,比如在filter算子根据外部指定的条件进行过滤,map根据相应的配置进行变换等。为了解决上述Task未序列化问题,这里对其进行了研究和总结。

  出现“org.apache.spark.SparkException: Task not serializable”这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化(不是说不可以引用外部变量,只是要做好序列化工作,具体后面详述)。其中最普遍的情形是:当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。虽然许多情形下,当前类使用了“extends Serializable”声明支持序列化,但是由于某些字段不支持序列化,仍然会导致整个类序列化时出现问题,最终导致出现Task未序列化问题。

引用成员变量的实例分析

  如上所述,由于Spark程序中的map、filter等算子内部引用了类成员函数或变量导致需要该类所有成员都需要支持序列化,又由于该类某些成员变量不支持序列化,最终引发Task无法序列化问题。为了验证上述原因,我们编写了一个实例程序,如下所示。该类的功能是从域名列表中(rdd)过滤得到特定顶级域名(rootDomain,如.com,.cn,.org)的域名列表,而该特定顶级域名需要函数调用时指定。

{ val list = List(“a.com”, “www.b.com”, “a.cn”, “a.com.cn”, “a.org”); private val sparkConf = new SparkConf().setAppName(“AppName”); private val sc = new SparkContext(sparkConf); val rdd = sc.parallelize(list); private val rootDomain = conf def getResult(): Array[(String)] = {val result = rdd.filter(item => item.contains(rootDomain))result.take(result.count().toInt) }}

  依据上述分析的原因,由于依赖了当前类的成员变量,所以导致当前类全部需要序列化,由于当前类某些字段未做好序列化,导致出错。实际情况与分析的原因一致,运行过程中出现错误,如下所示。分析下面的错误报告得到错误是由于sc(SparkContext)引起的。

Exception : Task not serializableat org$.ensureSerializable(ClosureCleaner.scala:166)at org$.clean(ClosureCleaner.scala:158)at org(**SparkContext**.scala:1435) ……Caused by: java- field (class “com.ntci.test.MyTest1”, name: “sc”, type: “class org.apache.spark.SparkContext”)- object (class 1@63700353)- field (class “com.ntci.test.MyTest1$$anonfun$1”, name: “$outer”, type: “class com.ntci.test.MyTest1”)

  为了验证上述结论,将不需要序列化的的成员变量使用关键字“@transent”标注,表示不序列化当前类中的这两个成员变量,再次执行函数,同样报错。

Exception : Task not serializableat org$.ensureSerializable(ClosureCleaner.scala:166)…… Caused by: java- field (class “com.ntci.test.MyTest1”, name: “sparkConf”, type: “class org.apache.spark.**SparkConf**”)- object (class 1@6107799e)

  虽然错误原因相同,但是这次导致错误的字段是sparkConf(SparkConf)。使用同样的“@transent”标注方式,将sc(SparkContext)和sparkConf(SparkConf)都标注为不需序列化,再次执行时,程序正常执行。

{ val list = List(“a.com”, “www.b.com”, “a.cn”, “a.com.cn”, “a.org”); sparkConf = new SparkConf().setAppName(“AppName”); sc = new SparkContext(sparkConf); val rdd = sc.parallelize(list); private val rootDomain = conf def getResult(): Array[(String)] = {val result = rdd.filter(item => item.contains(rootDomain))result.take(result.count().toInt) }}

  所以,通过上面的例子我们可以得到结论:由于Spark程序中的map、filter等算子内部引用了类成员函数或变量导致该类所有成员都需要支持序列化,又由于该类某些成员变量不支持序列化,最终引发Task无法序列化问题。相反地,对类中那些不支持序列化问题的成员变量标注后,使得整个类能够正常序列化,最终消除Task未序列化问题。

引用成员函数的实例分析

  成员变量与成员函数的对序列化的影响相同,即引用了某类的成员函数,会导致该类所有成员都支持序列化。为了验证这个假设,我们在map中使用了当前类的一个成员函数,作用是如果当前域名没有以“”开头,那么就在域名头添加“”前缀(注:由于rootDomain是在getResult函数内部定义的,就不存在引用类成员变量的问题,也就不存在和排除了上一个例子所讨论和引发的问题,因此这个例子主要讨论成员函数引用的影响;此外,不直接引用类成员变量也是解决这类问题的一个手段,如本例中为了消除成员变量的影响而在函数内部定义变量的这种做法,这类问题具体的规避做法此处略提,在下一节作详细阐述)。下面的代码同样会报错,同上面的例子一样,由于当前类中的sc(SparkContext)和sparkConf(SparkConf)两个成员变量没有做好序列化处理,导致当前类的序列化出现问题。

{ val list = List(“a.com”, “www.b.com”, “a.cn”, “a.com.cn”, “a.org”); private val sparkConf = new SparkConf().setAppName(“AppName”); private val sc = new SparkContext(sparkConf); val rdd = sc.parallelize(list); def getResult(): Array[(String)] = {val rootDomain = confval result = rdd.filter(item => item.contains(rootDomain)).map(item => addWWW(item))result.take(result.count().toInt) } def addWWW(str:String):String = {if(str.startsWith(“www.”))strelse”www.”+str }}当你感到悲哀痛苦时,最好是去学些什么东西。

Spark Task未序列化(Task not serializable)问题分析

相关文章:

你感兴趣的文章:

标签云: