irpas技术客

大数据——Spark RDD算子(十一)保存操作saveAsTextFile、saveAsSequenceFile、saveAsObjectFile、saveA

irpas 2535

Spark RDD算子(十一)保存操作saveAsTextFile、saveAsSequenceFile、saveAsObjectFile、saveAsHadoopFile saveAsTextFile存储到文件系统中指定压缩格式保存 saveAsSequenceFilesaveAsObjectFilesaveAsHadoopFilesaveAsHadoopDataset将RDD保存到HDFS中保存数据到HBase saveAsNewAPIHadoopFilesaveAsNewAPIHadoopDataset

saveAsTextFile def saveAsTextFile(path: String): Unit def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中codec参数可以指定压缩的类名 存储到文件系统中 package nj.zb.sparkstu import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object saveAsTextFile { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("saveAsTextFile") val sc: SparkContext = new SparkContext(conf) val rdd1: RDD[Int] = sc.parallelize(1 to 10) //保存到HDFS rdd1.saveAsTextFile("hdfs://hadoop100:9000/usr/scala") //保存到本地 rdd1.saveAsTextFile("file:///Scala2") } }

注意:如果将文件保存到本地文件系统,那么只会保存在Executor所在机器的本地机器 结果展示:

指定压缩格式保存 package nj.zb.sparkstu import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object saveAsTextFile { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("saveAsTextFile") val sc: SparkContext = new SparkContext(conf) val rdd1: RDD[Int] = sc.parallelize(1 to 10)

结果展示:

saveAsSequenceFile saveAsSequenceFile用于将RDD以SequenceFile文件格式保存在HDFS上用法同saveAsTextFile saveAsObjectFile def saveAsObjectFile(path: String): Unit saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中对于HDFS,默认采用SequenceFile保存 package nj.zb.sparkstu import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object saveAsObjectFile { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("saveAsObjectFileScala") val sc: SparkContext = new SparkContext(conf) val rdd1: RDD[Int] = sc.parallelize(1 to 10) //保存到HDFS rdd1.saveAsObjectFile("hdfs://hadoop100:9000/usr/scala3") //保存到本地文件夹 rdd1.saveAsObjectFile("file:///scala3") } }

结果展示:

saveAsHadoopFile def saveAsHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[, ]], codec: Class[_ <: CompressionCodec]): Unit def saveAsHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[, ]], conf: JobConf = …, codec: Option[Class[_ <: CompressionCodec]] = None): Unit saveAsHadoopFile是将RDD存储在HDFS上的文件中,支持老版本Hadoop API可以指定outputKeyClass、outputValueClass以及压缩格式 package nj.zb.sparkstu import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object saveAsHadoopFile { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("saveAsHadoopFile") val sc: SparkContext = new SparkContext(conf) var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable rdd1.saveAsHadoopFile("hdfs://hadoop100:9000/usr/scala4",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]]) } }

结果展示:

saveAsHadoopDataset def saveAsHadoopDataset(conf: JobConf): Unit

saveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储汇总,比如HBase

在JobConf中,通常需要关注或者设置五个参数

文件的保存路径 key值的class类型 value值的class类型 RDD的输出格式(OutPutFormat) 压缩相关的参数 将RDD保存到HDFS中 package nj.zb.sparkstu import org.apache.hadoop.mapred.{JobConf, TextOutputFormat} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.mapred.JobConf object saveAsHadoopDataset { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("saveAsHadoopDataset") val sc: SparkContext = new SparkContext(conf) var rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) val jobconf: JobConf = new JobConf() jobconf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]]) jobconf.setOutputKeyClass(classOf[Text]) jobconf.setOutputValueClass(classOf[IntWritable]) jobconf.set("mapred.output.dir","hdfs://hadoop100:9000/usr/scala5") rdd1.saveAsHadoopDataset(jobconf) } }

结果展示:

保存数据到HBase

HBase建表

create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import SparkContext._ import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.io.ImmutableBytesWritable var conf = HBaseConfiguration.create() var jobConf = new JobConf(conf) jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3") jobConf.set("zookeeper.znode.parent","/hbase") jobConf.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234") jobConf.setOutputFormat(classOf[TableOutputFormat]) var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7))) rdd1.map(x => { var put = new Put(Bytes.toBytes(x._1)) put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2)) (new ImmutableBytesWritable,put) } ).saveAsHadoopDataset(jobConf) ##结果: hbase(main):005:0> scan 'lxw1234' ROW COLUMN+CELL A column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02 B column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06 C column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07 3 row(s) in 0.0550 seconds

注意:保存到HBase,运行时需要在SPARK_CLASSPATH中加入HBase相关的jar包

saveAsNewAPIHadoopFile def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit def saveAsNewAPIHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[, ]], conf: Configuration = self.context.hadoopConfiguration): Unit saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API用法基本同saveAsHadoopFile package nj.zb.sparkstu import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object saveAsNewAPIHadoopFile { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("saveAsNewAPIHadoopFile") val sc: SparkContext = new SparkContext(conf) val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable rdd1.saveAsNewAPIHadoopFile("hdfs://hadoop100:9000/usr/scala6",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]]) } }

结果展示:

saveAsNewAPIHadoopDataset def saveAsNewAPIHadoopDataset(conf: Configuration): Unit 作用同savaAsHadoopDataset,只不过采用新版本Hadoop API

HBase建表

create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1} package com.lxw1234.test import org.apache.spark.SparkConf import org.apache.spark.SparkContext import SparkContext._ import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put object Test { def main(args : Array[String]) { val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com") val sc = new SparkContext(sparkConf); var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7))) sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3") sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase") sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234") var job = new Job(sc.hadoopConfiguration) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) rdd1.map( x => { var put = new Put(Bytes.toBytes(x._1)) put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2)) (new ImmutableBytesWritable,put) } ).saveAsNewAPIHadoopDataset(job.getConfiguration) sc.stop() } }

注意:保存到HBase,运行时需要在SPARK_CLASSPATH中加入HBase相关的jar包


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #大数据Spark #spark #string #Unitdef