irpas技术客

Spark使用foreach与map算子的区别_大数据面壁者_spark中map和foreach

irpas 3744

spark使用map算子不产生数据的问题 如下代码,使用累加器对数据进行求和的过程中,发现代码没有报错,运行也无异常,但是累加器结果总是为空,经过多番查证, 发现问题出现在下面代码的第六步。 开始时使用的是map算子对RDD中数据遍历,通过累加器进行计算,结果没有值。 后来发现map底层使用的是迭代器,循环遍历一遍之后,迭代器的指针已经跳到了最末尾,当进行累加器计算的时候,已经没有数据 进行计算了,因此迭代器结果没有值。 修正后结果:使用foreach替代map进行循环计算。 package com.spark.exercise import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext} object $02_request1Update { def main(args: Array[String]): Unit = { //1.创建SparkContext val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test")) //2.读取数据 val datas = sc.textFile("datas/user_visit_action.txt") //3.创建累加器对象 val acc = new SumAcc //4.注册累加器 sc.register(acc) //5、切割数据,选择点击品类id,下单品类ids,支付品类ids,转换 val source = datas.flatMap(line=>{ val arr = line.split("_") //创建容器,封装数据 var result:List[(String,Int,Int,Int)] = Nil val clickid = arr(6) if(clickid!="-1"){ result = (clickid,1,0,0) :: result }else if( arr(8)!="null" ){ arr(8).split(",").foreach(x=>{ result = (x,0,1,0) :: result }) }else if(arr(10)!="null"){ arr(10).split(",").foreach(x=>{ result = (x,0,0,1) :: result }) } result } ) //6.通过累加器累加数据 source.foreach(x => acc.add(x)) // println(acc.value.toBuffer) //7.获取累加结果,并排序取前十 val result = acc.value.toList.sortBy(_._2).reverse.take(10) println(result) Thread.sleep(100000) } } //定义累加器 class SumAcc extends AccumulatorV2[(String,Int,Int,Int),Map[String,(Int,Int,Int)]] { //创建一个容器,累加每个品类的点击次数、下单次数、支付次数 var result = Map[String, (Int, Int, Int)]() //判断是否为空 override def isZero: Boolean = result.isEmpty //赋值副本 override def copy(): AccumulatorV2[(String, Int, Int, Int), Map[String, (Int, Int, Int)]] = new SumAcc //重置累加器 override def reset(): Unit = Map[String, (Int, Int, Int)]() //累加元素 override def add(v: (String, Int, Int, Int)): Unit = { //判断品类在容器中是否存在 if(result.contains(v._1)){ //取出之前的累加结果 val value = result.getOrElse(v._1,(0,0,0)) result = result.+((v._1,(value._1+v._2,value._2+v._3,value._3+v._4))) }else{ result = result.+((v._1,(v._2,v._3,v._4))) } } //合并多个累加器的结果 override def merge(other: AccumulatorV2[(String, Int, Int, Int), Map[String, (Int, Int, Int)]]): Unit = { //取出累加器的值 val values = other.value values.foreach{ case (id,(clicknum,ordernum,paynum)) => //判断品类在容器中是否存在 if(result.contains(id)){ //取出之前的累加结果 val value = result.getOrElse(id,(0,0,0)) result = result.+((id,(value._1+clicknum,value._2+ordernum,value._3+paynum))) }else{ result = result.+((id,(clicknum,ordernum,paynum))) } } } //返回值 override def value: Map[String, (Int, Int, Int)] = this.result }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #package #C