irpas技术客

org.apache.spark.SparkException: Task not serializable_Shockang

网络投稿 2160

前言

本文隶属于专栏《Spark异常问题汇总》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见 Spark异常问题汇总

正文

报错原因解析如果出现“org.apache.spark.SparkException: Task not serializable”错误,一般是因为在 map 、 filter 等的参数使用了外部的变量,但是这个变量不能序列化(不是说不可以引用外部变量,只是要做好序列化工作)。

其中最普遍的情形是:当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。

虽然许多情形下,当前类使用了“extends Serializable”声明支持序列化,但是由于某些字段不支持序列化,仍然会导致整个类序列化时出现问题,最终导致出现 Task 未序列化问题。

实践 1 需求描述

由于 Spark 程序中的 map 、 filter 等算子内部引用了类成员函数或变量导致需要该类所有成员都支持序列化,又由于该类某些成员变量不支持序列化,最终引发 Task 无法序列化问题。

为了验证上述原因,我们编写了一个实例程序,如下所示。

该类的功能是从域名列表中( RDD )过滤得到特定顶级域名(rootDomain,如.com,cn,org)的域名列表,而该特定顶级域名需在要函数调用时指定。

代码 1 package com.shockang.study.bigdata.spark.errors.serializable import org.apache.spark.{SparkConf, SparkContext} class MyTest1 private(conf: String) extends Serializable { private val list = List("a.com", "·", "a.com.cn", "a.org") private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest1") private val sc = new SparkContext(sparkConf) private val rdd = sc.parallelize(list) private val rootDomain = conf private def getResult(): Unit = { val result = rdd.filter(item => item.contains(rootDomain)) result.foreach(println) } private def stop(): Unit = { sc.stop() } } object MyTest1 { def main(args: Array[String]): Unit = { val test = new MyTest1("com") test.getResult() test.stop() } } 日志 1

依据上述分析的原因,由于依赖了当前类的成员变量,所以导致当前类全部需要序列化。

当前类的某些字段未做好序列化,导致出错。

实际情况与分析的原因一致,运行过程中出现的错误如下所示。

分析下面的日志,可知错误是由于 sc ( SparkContext )引起的。

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 21/07/25 15:50:42 INFO SparkContext: Running Spark version 3.1.2 21/07/25 15:50:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 21/07/25 15:50:43 INFO ResourceUtils: ============================================================== 21/07/25 15:50:43 INFO ResourceUtils: No custom resources configured for spark.driver. 21/07/25 15:50:43 INFO ResourceUtils: ============================================================== 21/07/25 15:50:43 INFO SparkContext: Submitted application: MyTest1 21/07/25 15:50:43 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0) 21/07/25 15:50:43 INFO ResourceProfile: Limiting resource is cpu 21/07/25 15:50:43 INFO ResourceProfileManager: Added ResourceProfile id: 0 21/07/25 15:50:43 INFO SecurityManager: Changing view acls to: shockang 21/07/25 15:50:43 INFO SecurityManager: Changing modify acls to: shockang 21/07/25 15:50:43 INFO SecurityManager: Changing view acls groups to: 21/07/25 15:50:43 INFO SecurityManager: Changing modify acls groups to: 21/07/25 15:50:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(shockang); groups with view permissions: Set(); users with modify permissions: Set(shockang); groups with modify permissions: Set() 21/07/25 15:50:43 INFO Utils: Successfully started service 'sparkDriver' on port 63559. 21/07/25 15:50:43 INFO SparkEnv: Registering MapOutputTracker 21/07/25 15:50:43 INFO SparkEnv: Registering BlockManagerMaster 21/07/25 15:50:43 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 21/07/25 15:50:43 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 21/07/25 15:50:43 INFO SparkEnv: Registering BlockManagerMasterHeartbeat 21/07/25 15:50:43 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-862d818a-ff7d-473b-a321-84ca60962ada 21/07/25 15:50:43 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB 21/07/25 15:50:43 INFO SparkEnv: Registering OutputCommitCoordinator 21/07/25 15:50:44 INFO Utils: Successfully started service 'SparkUI' on port 4040. 21/07/25 15:50:44 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040 21/07/25 15:50:44 INFO Executor: Starting executor ID driver on host 192.168.0.105 21/07/25 15:50:44 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63560. 21/07/25 15:50:44 INFO NettyBlockTransferService: Server created on 192.168.0.105:63560 21/07/25 15:50:44 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 21/07/25 15:50:44 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 63560, None) 21/07/25 15:50:44 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:63560 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 63560, None) 21/07/25 15:50:44 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 63560, None) 21/07/25 15:50:44 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 63560, None) Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2459) at org.apache.spark.rdd.RDD.$anonfun$filter$1(RDD.scala:439) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.filter(RDD.scala:438) at com.shockang.study.bigdata.spark.errors.serializable.MyTest1.com$shockang$study$bigdata$spark$errors$serializable$MyTest1$$getResult(MyTest1.scala:13) at com.shockang.study.bigdata.spark.errors.serializable.MyTest1$.main(MyTest1.scala:25) at com.shockang.study.bigdata.spark.errors.serializable.MyTest1.main(MyTest1.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@23811a09) - field (class: com.shockang.study.bigdata.spark.errors.serializable.MyTest1, name: sc, type: class org.apache.spark.SparkContext) - object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest1, com.shockang.study.bigdata.spark.errors.serializable.MyTest1@256aa5f2) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.shockang.study.bigdata.spark.errors.serializable.MyTest1, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/shockang/study/bigdata/spark/errors/serializable/MyTest1.$anonfun$getResult$1$adapted:(Lcom/shockang/study/bigdata/spark/errors/serializable/MyTest1;Ljava/lang/String;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/Object;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest1$$Lambda$689/1155399955, com.shockang.study.bigdata.spark.errors.serializable.MyTest1$$Lambda$689/1155399955@66bacdbc) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413) ... 11 more 21/07/25 15:50:44 INFO SparkContext: Invoking stop() from shutdown hook 21/07/25 15:50:44 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040 21/07/25 15:50:44 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 21/07/25 15:50:45 INFO MemoryStore: MemoryStore cleared 21/07/25 15:50:45 INFO BlockManager: BlockManager stopped 21/07/25 15:50:45 INFO BlockManagerMaster: BlockManagerMaster stopped 21/07/25 15:50:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 21/07/25 15:50:45 INFO SparkContext: Successfully stopped SparkContext 21/07/25 15:50:45 INFO ShutdownHookManager: Shutdown hook called 21/07/25 15:50:45 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-f084a4b0-2f36-4e9d-a1f6-fb2bbbbd99d9 2 代码 2

为了验证上述结论,将不需要序列化的成员变量使用关键字“@transient”标注,表示不序列化当前类中的这两个成员变量。

package com.shockang.study.bigdata.spark.errors.serializable import org.apache.spark.{SparkConf, SparkContext} class MyTest2 private(conf: String) extends Serializable { private val list = List("a.com", "·", "a.com.cn", "a.org") @transient private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest2") @transient private val sc = new SparkContext(sparkConf) private val rdd = sc.parallelize(list) private val rootDomain = conf private def getResult(): Unit = { val result = rdd.filter(item => item.contains(rootDomain)) result.foreach(println) } private def stop(): Unit = { sc.stop() } } object MyTest2 { def main(args: Array[String]): Unit = { val test = new MyTest2("com") test.getResult() test.stop() } } 日志 2

再次执行程序,程序运行正常。

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 21/07/25 15:51:17 INFO SparkContext: Running Spark version 3.1.2 21/07/25 15:51:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 21/07/25 15:51:18 INFO ResourceUtils: ============================================================== 21/07/25 15:51:18 INFO ResourceUtils: No custom resources configured for spark.driver. 21/07/25 15:51:18 INFO ResourceUtils: ============================================================== 21/07/25 15:51:18 INFO SparkContext: Submitted application: MyTest2 21/07/25 15:51:18 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0) 21/07/25 15:51:18 INFO ResourceProfile: Limiting resource is cpu 21/07/25 15:51:18 INFO ResourceProfileManager: Added ResourceProfile id: 0 21/07/25 15:51:18 INFO SecurityManager: Changing view acls to: shockang 21/07/25 15:51:18 INFO SecurityManager: Changing modify acls to: shockang 21/07/25 15:51:18 INFO SecurityManager: Changing view acls groups to: 21/07/25 15:51:18 INFO SecurityManager: Changing modify acls groups to: 21/07/25 15:51:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(shockang); groups with view permissions: Set(); users with modify permissions: Set(shockang); groups with modify permissions: Set() 21/07/25 15:51:18 INFO Utils: Successfully started service 'sparkDriver' on port 63584. 21/07/25 15:51:18 INFO SparkEnv: Registering MapOutputTracker 21/07/25 15:51:18 INFO SparkEnv: Registering BlockManagerMaster 21/07/25 15:51:18 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 21/07/25 15:51:18 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 21/07/25 15:51:18 INFO SparkEnv: Registering BlockManagerMasterHeartbeat 21/07/25 15:51:18 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-292dce43-a0c2-4ba7-aed2-7786b9c34b6d 21/07/25 15:51:18 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB 21/07/25 15:51:18 INFO SparkEnv: Registering OutputCommitCoordinator 21/07/25 15:51:19 INFO Utils: Successfully started service 'SparkUI' on port 4040. 21/07/25 15:51:19 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040 21/07/25 15:51:19 INFO Executor: Starting executor ID driver on host 192.168.0.105 21/07/25 15:51:19 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63585. 21/07/25 15:51:19 INFO NettyBlockTransferService: Server created on 192.168.0.105:63585 21/07/25 15:51:19 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 21/07/25 15:51:19 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 63585, None) 21/07/25 15:51:19 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:63585 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 63585, None) 21/07/25 15:51:19 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 63585, None) 21/07/25 15:51:19 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 63585, None) 21/07/25 15:51:19 INFO SparkContext: Starting job: foreach at MyTest2.scala:14 21/07/25 15:51:19 INFO DAGScheduler: Got job 0 (foreach at MyTest2.scala:14) with 12 output partitions 21/07/25 15:51:19 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at MyTest2.scala:14) 21/07/25 15:51:19 INFO DAGScheduler: Parents of final stage: List() 21/07/25 15:51:19 INFO DAGScheduler: Missing parents: List() 21/07/25 15:51:19 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at filter at MyTest2.scala:13), which has no missing parents 21/07/25 15:51:19 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.4 KiB, free 2004.6 MiB) 21/07/25 15:51:19 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1922.0 B, free 2004.6 MiB) 21/07/25 15:51:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.105:63585 (size: 1922.0 B, free: 2004.6 MiB) 21/07/25 15:51:19 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1388 21/07/25 15:51:19 INFO DAGScheduler: Submitting 12 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at filter at MyTest2.scala:13) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)) 21/07/25 15:51:19 INFO TaskSchedulerImpl: Adding task set 0.0 with 12 tasks resource profile 0 21/07/25 15:51:19 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.0.105, executor driver, partition 0, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map() 21/07/25 15:51:19 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (192.168.0.105, executor driver, partition 1, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map() 21/07/25 15:51:19 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2) (192.168.0.105, executor driver, partition 2, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map() 21/07/25 15:51:19 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3) (192.168.0.105, executor driver, partition 3, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map() 21/07/25 15:51:19 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4) (192.168.0.105, executor driver, partition 4, PROCESS_LOCAL, 4461 bytes) taskResourceAssignments Map() 21/07/25 15:51:19 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5) (192.168.0.105, executor driver, partition 5, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map() 21/07/25 15:51:19 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6) (192.168.0.105, executor driver, partition 6, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map() 21/07/25 15:51:19 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7) (192.168.0.105, executor driver, partition 7, PROCESS_LOCAL, 4456 bytes) taskResourceAssignments Map() 21/07/25 15:51:19 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8) (192.168.0.105, executor driver, partition 8, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map() 21/07/25 15:51:19 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9) (192.168.0.105, executor driver, partition 9, PROCESS_LOCAL, 4460 bytes) taskResourceAssignments Map() 21/07/25 15:51:19 INFO TaskSetManager: Starting task 10.0 in stage 0.0 (TID 10) (192.168.0.105, executor driver, partition 10, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map() 21/07/25 15:51:19 INFO TaskSetManager: Starting task 11.0 in stage 0.0 (TID 11) (192.168.0.105, executor driver, partition 11, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map() 21/07/25 15:51:19 INFO Executor: Running task 7.0 in stage 0.0 (TID 7) 21/07/25 15:51:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 21/07/25 15:51:19 INFO Executor: Running task 5.0 in stage 0.0 (TID 5) 21/07/25 15:51:19 INFO Executor: Running task 6.0 in stage 0.0 (TID 6) 21/07/25 15:51:19 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 21/07/25 15:51:19 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) 21/07/25 15:51:19 INFO Executor: Running task 3.0 in stage 0.0 (TID 3) 21/07/25 15:51:19 INFO Executor: Running task 10.0 in stage 0.0 (TID 10) 21/07/25 15:51:19 INFO Executor: Running task 11.0 in stage 0.0 (TID 11) 21/07/25 15:51:19 INFO Executor: Running task 9.0 in stage 0.0 (TID 9) 21/07/25 15:51:19 INFO Executor: Running task 4.0 in stage 0.0 (TID 4) 21/07/25 15:51:19 INFO Executor: Running task 8.0 in stage 0.0 (TID 8) · 21/07/25 15:51:20 INFO Executor: Finished task 8.0 in stage 0.0 (TID 8). 880 bytes result sent to driver 21/07/25 15:51:20 INFO Executor: Finished task 11.0 in stage 0.0 (TID 11). 880 bytes result sent to driver 21/07/25 15:51:20 INFO Executor: Finished task 9.0 in stage 0.0 (TID 9). 880 bytes result sent to driver 21/07/25 15:51:20 INFO Executor: Finished task 10.0 in stage 0.0 (TID 10). 880 bytes result sent to driver 21/07/25 15:51:20 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 880 bytes result sent to driver 21/07/25 15:51:20 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 880 bytes result sent to driver 21/07/25 15:51:20 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 880 bytes result sent to driver 21/07/25 15:51:20 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 880 bytes result sent to driver 21/07/25 15:51:20 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 880 bytes result sent to driver 21/07/25 15:51:20 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 880 bytes result sent to driver 21/07/25 15:51:20 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 880 bytes result sent to driver 21/07/25 15:51:20 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 880 bytes result sent to driver 21/07/25 15:51:20 INFO TaskSetManager: Finished task 11.0 in stage 0.0 (TID 11) in 595 ms on 192.168.0.105 (executor driver) (1/12) 21/07/25 15:51:20 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 600 ms on 192.168.0.105 (executor driver) (2/12) 21/07/25 15:51:20 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 601 ms on 192.168.0.105 (executor driver) (3/12) 21/07/25 15:51:20 INFO TaskSetManager: Finished task 10.0 in stage 0.0 (TID 10) in 599 ms on 192.168.0.105 (executor driver) (4/12) 21/07/25 15:51:20 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 603 ms on 192.168.0.105 (executor driver) (5/12) 21/07/25 15:51:20 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 602 ms on 192.168.0.105 (executor driver) (6/12) 21/07/25 15:51:20 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 606 ms on 192.168.0.105 (executor driver) (7/12) 21/07/25 15:51:20 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 603 ms on 192.168.0.105 (executor driver) (8/12) 21/07/25 15:51:20 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 607 ms on 192.168.0.105 (executor driver) (9/12) 21/07/25 15:51:20 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 606 ms on 192.168.0.105 (executor driver) (10/12) 21/07/25 15:51:20 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 609 ms on 192.168.0.105 (executor driver) (11/12) 21/07/25 15:51:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 637 ms on 192.168.0.105 (executor driver) (12/12) 21/07/25 15:51:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 21/07/25 15:51:20 INFO DAGScheduler: ResultStage 0 (foreach at MyTest2.scala:14) finished in 0.763 s 21/07/25 15:51:20 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job 21/07/25 15:51:20 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished 21/07/25 15:51:20 INFO DAGScheduler: Job 0 finished: foreach at MyTest2.scala:14, took 0.807256 s 21/07/25 15:51:20 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040 21/07/25 15:51:20 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 21/07/25 15:51:20 INFO MemoryStore: MemoryStore cleared 21/07/25 15:51:20 INFO BlockManager: BlockManager stopped 21/07/25 15:51:20 INFO BlockManagerMaster: BlockManagerMaster stopped 21/07/25 15:51:20 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 21/07/25 15:51:20 INFO SparkContext: Successfully stopped SparkContext 21/07/25 15:51:20 INFO ShutdownHookManager: Shutdown hook called 21/07/25 15:51:20 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-b6b9cd75-c3cc-4701-8b63-93e25180bd56 初步结论

所以,通过上面的例子可以得到结论:

由于 Spark 程序中的 map 、 filter 等算子内部引用了类成员函数或变量,导致该类所有成员都需要支持序列化,又由于该类某些成员变量不支持序列化,最终引发 Task 无法序列化问题。

相反,对类中那些不支持序列化问题的成员变量标注后,使得整个类能够正常序列化,最终消除 Task 未序列化问题。

3 引用成员函数的实例分析

成员变量与成员函数对序列化的影响相同,即引用了某类的成员函数,会导致该类所有成员都支持序列化。

为了验证这个假设,我们在 map 中使用了当前类的一个成员函数,作用是如果当前域名没有以“·", "a.com.cn", "a.org") private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest3") private val sc = new SparkContext(sparkConf) private val rdd = sc.parallelize(list) private def getResult(): Unit = { val rootDomain = conf val result = rdd.filter(item => item.contains(rootDomain)).map(item => addWWW(item)) result.foreach(println) } private def addWWW(str: String): String = { if (str.startsWith("www")) str else "·", "a.com.cn", "a.org") private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest4") private val sc = new SparkContext(sparkConf) private val rdd = sc.parallelize(list) private def getResult(): Unit = { val rootDomain = conf val result = rdd.filter(item => item.contains(rootDomain)).map(item => MyTest4.addWWW(item)) result.foreach(println) } private def stop(): Unit = { sc.stop() } } object MyTest4 { private def addWWW(str: String): String = { if (str.startsWith("www")) str else "· ·", "a.com.cn", "a.org") @transient private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest5") @transient private val sc = new SparkContext(sparkConf) private val rdd = sc.parallelize(list) private val rootDomain = conf private def getResult(): Unit = { val result = rdd.filter(item => item.contains(rootDomain)).map(item => MyTest5.addWWW(item)) result.foreach(println) } private def stop(): Unit = { sc.stop() } } object MyTest5 { private def addWWW(str: String): String = { if (str.startsWith("www")) str else "·", "a.com.cn", "a.org") @transient private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest6") @transient private val sc = new SparkContext(sparkConf) private val rdd = sc.parallelize(list) private def getResult(): Unit = { val rootDomain = conf val result = rdd.filter(item => item.contains(rootDomain)).map(item => MyTest6.addWWW(item)) result.foreach(println) } private def stop(): Unit = { sc.stop() } } object MyTest6 { private def addWWW(str: String): String = { if (str.startsWith("www")) str else "· 21/07/25 16:16:39 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 880 bytes result sent to driver 21/07/25 16:16:39 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 880 bytes result sent to driver 21/07/25 16:16:39 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 880 bytes result sent to driver 21/07/25 16:16:39 INFO Executor: Finished task 10.0 in stage 0.0 (TID 10). 880 bytes result sent to driver 21/07/25 16:16:39 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 880 bytes result sent to driver 21/07/25 16:16:39 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 880 bytes result sent to driver 21/07/25 16:16:39 INFO Executor: Finished task 9.0 in stage 0.0 (TID 9). 923 bytes result sent to driver 21/07/25 16:16:39 INFO Executor: Finished task 8.0 in stage 0.0 (TID 8). 880 bytes result sent to driver 21/07/25 16:16:39 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 880 bytes result sent to driver 21/07/25 16:16:39 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 880 bytes result sent to driver 21/07/25 16:16:39 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 923 bytes result sent to driver 21/07/25 16:16:39 INFO Executor: Finished task 11.0 in stage 0.0 (TID 11). 880 bytes result sent to driver 21/07/25 16:16:39 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 706 ms on 192.168.0.105 (executor driver) (1/12) 21/07/25 16:16:39 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 665 ms on 192.168.0.105 (executor driver) (2/12) 21/07/25 16:16:39 INFO TaskSetManager: Finished task 10.0 in stage 0.0 (TID 10) in 663 ms on 192.168.0.105 (executor driver) (3/12) 21/07/25 16:16:39 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 664 ms on 192.168.0.105 (executor driver) (4/12) 21/07/25 16:16:39 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 667 ms on 192.168.0.105 (executor driver) (5/12) 21/07/25 16:16:39 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 666 ms on 192.168.0.105 (executor driver) (6/12) 21/07/25 16:16:39 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 672 ms on 192.168.0.105 (executor driver) (7/12) 21/07/25 16:16:39 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 667 ms on 192.168.0.105 (executor driver) (8/12) 21/07/25 16:16:39 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 671 ms on 192.168.0.105 (executor driver) (9/12) 21/07/25 16:16:39 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 670 ms on 192.168.0.105 (executor driver) (10/12) 21/07/25 16:16:39 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 672 ms on 192.168.0.105 (executor driver) (11/12) 21/07/25 16:16:39 INFO TaskSetManager: Finished task 11.0 in stage 0.0 (TID 11) in 667 ms on 192.168.0.105 (executor driver) (12/12) 21/07/25 16:16:39 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 21/07/25 16:16:39 INFO DAGScheduler: ResultStage 0 (foreach at MyTest6.scala:14) finished in 0.863 s 21/07/25 16:16:39 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job 21/07/25 16:16:39 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished 21/07/25 16:16:39 INFO DAGScheduler: Job 0 finished: foreach at MyTest6.scala:14, took 0.910534 s 21/07/25 16:16:39 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040 21/07/25 16:16:39 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 21/07/25 16:16:39 INFO MemoryStore: MemoryStore cleared 21/07/25 16:16:39 INFO BlockManager: BlockManager stopped 21/07/25 16:16:39 INFO BlockManagerMaster: BlockManagerMaster stopped 21/07/25 16:16:39 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 21/07/25 16:16:39 INFO SparkContext: Successfully stopped SparkContext 21/07/25 16:16:39 INFO ShutdownHookManager: Shutdown hook called 21/07/25 16:16:39 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-6e765980-e5b6-410a-b0e7-5635b476cb11 总结

承上所述,这类问题主要是引用了某类的成员变量或函数,并且相应的类没有做好序列化处理导致的。

解决这个问题有以下两种方法:

1. 不在(或不直接在) map 等闭包内部直接引用某类成员函数或成员变量 对于依赖某类成员变量的情形: 如果程序依赖的值相对固定,可取固定的值,或定义在 map 、 filter 等操作内部,或定义在 scala object 对象中(类似于 Java 中的 static 变量)。如果依赖值需要程序调用时动态指定(以函数参数形式),则在 map 、 filter 等操作时,可不直接引用该成员变量,而是在类似上面例子的 getResult 函数中根据成员变量的值重新定义一个局部变量,这样, map 等算子就无需引用类的成员变量。 对于依赖某类成员函数的情形:

如果函数功能独立,可定义在 scala object 对象中(类似于 Java 中的 static 方法),这样就无需特定的类。

2. 如果引用了某类的成员函数或变量,则需对相应的类做好序列化处理。

对于这种情况,需对该类做好序列化处理,首先该类继承序列化类,然后对不能序列化的成员变量使用“@transient”标注,告诉编译器不需要序列化。

此外,如果可以,可将依赖的变量独立放到一个小的 Class 中,让这个 Class 支持序列化,这样做可以减少网络传输量,提高效率。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #task #not #Serializable