irpas技术客

Spark中的DataFrame是什么?以及如何构建DataFrame?(附案例)_奇迹虎虎_spark的dataframe

irpas 5436

1、Spark中的DataFrame是什么? 官方解释:

DataFrame = RDD[Person] - 泛型 + Schema + SQL操作 + 优化

官方原文:A DataFrame is a DataSet organized into named columns.

中文翻译:以列(列名,列类型,列值)的形式构成的分布式的数据集。

用大白话讲:

在 Spark 中,DataFrame 是一种以 RDD 为基础的分布式数据集,是一种特殊的RDD,是一个分布式的表,类似于传统数据库中的二维表格。DataFrame 与 RDD 的主要区别在于,前者带有 schema 元信息,即 DataFrame 所表示的二维表数据集的每一列都带有名称和类型。


2、DataFrame中的Schema是什么?

解释:其实就是结构表的列名与列类型。

Schema 的两种定义方式:

使用 StructType 定义,是一个样例类,属性为 StructField 的数组使用 StructField 定义,同样是一个样例类,有四个属性,其中字段名称和类型为必填 from pyspark.sql.types import * # 构建 Schema 对象 schema=StructType([ StructField('name',StringType()), StructField('age',IntegerType()) ])
3、DataFrame中的Row是什么?

解释:DataFrame中每条数据封装在Row中,Row表示每行数据。

如何构建Row对象?

from pyspark.sql import Row # 构建 Row 对象 Row(value1,?value2,?value3,?...)
4、构建DataFrame的几种方式: 4.1 通过 RDD 转换 DataFrame 下方4种方式的结果均是上图,不再逐一阐述 4.1.1?通过 Row 构建 DataFrame 这种方法为使用反射方法Schema模式,Spark SQL 可以将 Row 对象的 RDD 转换为 DataFrame,从而推断数据类型。 from pyspark.sql import SparkSession from pyspark import Row if __name__ == '__main__': # 创建上下文对象 spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate() sc = spark.sparkContext # 创建RDD rdd1 = sc.parallelize(['张三,30','李四,20','王五,50']) # 将RDD的每个元素从String转成Row rdd2 = rdd1.map(lambda x:Row(name=x.split(',')[0],age=int(x.split(',')[1]))) # 直接通过RDD的Row创建DataFrame df = spark.createDataFrame(rdd2) # 打印DataFrame df.printSchema() df.show() # 关闭退出 spark.stop() 4.1.2 通过 StructedType 构建 DataFrame 从原始 RDD 创建元组或列表的 RDD。StructType 在步骤 1 中创建的 RDD 中创建由匹配的元组或列表结构表示的模式。通过 createDataFrame 提供的方法将模式应用到 RDD?SparkSession。 from pyspark.sql import SparkSession from pyspark.sql.types import * if __name__ == '__main__': # 创建上下文对象 spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate() sc = spark.sparkContext # 创建RDD rdd1 = sc.parallelize(['张三,30','李四,20','王五,50']) # 将RDD的每个元素从String转成Tuple rdd2 = rdd1.map(lambda x:(x.split(',')[0],int(x.split(',')[1]))) # 为上述tuple量身定义schema schema = StructType([ StructField('name',StringType()), StructField('age',IntegerType()) ]) # 通过RDD和Schema创建DataFrame df = spark.createDataFrame(rdd2,schema) # 打印DataFrame df.printSchema() df.show() # 关闭退出 spark.stop() 4.1.3 通过 toDF?构建 DataFrame from pyspark.sql import SparkSession if __name__ == '__main__': # 创建上下文对象 spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate() sc = spark.sparkContext # 创建RDD rdd1 = sc.parallelize(['张三,30','李四,20','王五,50']) # 将RDD的每个元素从String转成Tuple rdd2 = rdd1.map(lambda x:(x.split(',')[0],int(x.split(',')[1]))) # 调用toDF传输字段名称,直接创建DataFrame df = rdd2.toDF(['name','age']) # 打印DataFrame df.printSchema() df.show() # 关闭退出 spark.stop() 4.1.4 通过 Pandas 构建 DataFrame from pyspark.sql import SparkSession import pandas as pd from datetime import * if __name__ == '__main__': # 创建上下文对象 spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate() pdf=pd.DataFrame({ 'a': [1, 2, 3], 'b': [2.9, 3.9, 4.9], 'c': ['string1', 'string2', 'string3'], 'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)], 'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)] }) print(pdf) # spark.createDataFrame(pd.DataFrame) df=spark.createDataFrame(pdf) # 打印df的schema信息 df.printSchema() # 打印df的行数据 df.show() # 关闭退出 spark.stop() 4.2 读取外部数据 转化为 DataFrame 4.2.1?读取 Json 文件创建 DataFrame

from pyspark.sql import SparkSession if __name__ == '__main__': # 创建SparkSession入口 spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate() # spark.read读取json文件,并打印schema,和数据 df1=spark.read.json('file:///root/test.json') df1.printSchema() df1.show() 4.2.2 读取 parquet 列式存储格式文件创建 DataFrame

from pyspark.sql import SparkSession if __name__ == '__main__': # 创建SparkSession入口 spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate() # spark.read读取parquet文件,并打印schema,和数据 df2=spark.read.parquet('file:///root/test.parquet') df2.printSchema() df2.show() 4.2.3 读取 csv 文件创建 DataFrame

from pyspark.sql import SparkSession if __name__ == '__main__': # 1-创建SparkSession入口 spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate() # 2-spark.read读取csv文件,并打印schema,和数据 df3=spark.read.option('sep',';').option('header',True).option('inferSchema',True).csv('file:///root/test.csv') df3.printSchema() df3.show() spark.stop() ?4.3 加载文件时,什么时候用textFile?什么时候用read? 如果加载的数据结构化程度不高,则用 textFile 返回 RDD 再处理 from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate() sc = spark.sparkContext # 读取文件生成 RDD rdd1 = sc.textFile('file:///root/1.txt') 如果加载的数据结构化程度很高,比如 mysql 或 半结构化数据 json、csv,则用 read 返回 DataFrame 再处理 from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate() sc = spark.sparkContext # 读取文件生成 DataFrame(特殊RDD) rdd1 = spark.read.text('file:///root/1.txt')


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #spark的dataframe # #spark #中DataFrame #是一种以 #RDD #DataFrame