irpas技术客

SparkSQL--DSL风格API(TableApi)语法_JinVijay_sparksql tableapi

网络投稿 6459

DSL(DataSet Language)风格API,就是编程API的方式,来实现SQL语法

DSL:特定领域语言

DataSet的TableApi有一个特点:运算后返回值必回到DataFrame

因为select后,得到的结构,无法判断返回值的具体类型,只能用通用的Row封装

? ?TableAPI基本操作

数据准备 id,name,age,city,score 1,张三,21,BJ,80.0 2,李四,23,BJ,82.0 3,王五,20,SH,88.6 4,赵六,26,SH,83.0 5,田七,30,SZ,90.0

object TableAPI01 { Logger.getLogger("org").setLevel(Level.ERROR) def main(args: Array[String]): Unit = { //获取环境 val spark: SparkSession = SparkSession.builder() .appName("TableAPI") .master("local[*]") .getOrCreate() //读取数据 创建df val df: DataFrame = spark.read.option("header", true).option("inferSchema", true).csv("SQLData/csv/stu.csv") //导入spark中的隐式和函数 import spark.implicits._ import org.apache.spark.sql.functions._ println("-----------------------select及表达式----------------------------") //使用字符串表达"列" df.select("id","name").show() //无法对值进行二次操作 //df.select("id + 1","name").show //id+1会被视为一个列名从而出错 //使用字符串形式表达sql表达式,应该使用selectExpr df.selectExpr("id+1","upper(name)").show() //使用$符号创建Column对象来表达"列" df.select($"id",upper($"name"),$"age"+10).show() //使用单边单引号创建Column对象来表达"列" df.select('id,upper('name),'age+10).show() //使用col函数创建Column对象来表达"列" df.select(col("id"),upper(col("name")),col("age")+10).show() println("-----------------------起别名----------------------------") //使用字符串表达"列" //df.select("id","name").show() //无法对值进行二次操作 //使用字符串形式表达sql表达式,应该使用selectExpr df.selectExpr("id+1 as new_id","upper(name) as new_name").show() //使用$符号创建Column对象来表达"列" df.select($"id" as "new_id",upper($"name") as "new_name",$"age"+10 as "new_age").show() //使用单边单引号创建Column对象来表达"列" df.select('id as "new_id",upper('name) as "new_name",'age+10 as "new_age").show() //使用col函数创建Column对象来表达"列" df.select(col("id") as "new_id" ,upper(col("name")) as "new_name" ,col("age")+10 as "new_age").show() println("-----------------------条件过滤----------------------------") df.where("id > 1 and city='BJ'").show() df.where('id>1 and 'score >80 ).show() println("-----------------------order by----------------------------") df.orderBy($"id".desc).show() //id 降序 df.orderBy("age").show() //age升序 println("-----------------------group by 聚合函数----------------------------") df.groupBy("city").count().show() //每个city的人数 df.groupBy("city").avg("score").show //每个city的平均分 //常用情况 agg(sum() as '别名',...) df.groupBy("city").agg( avg("score") as "avg_score", sum("score") as "sum_score", count(lit(1)) as "cnt", //lit(值) 将值转换成常量字段 collect_list("name") as "names" ).show() println("-----------------------子查询---------------------------") /** * 相当于 * select * * * from( * select city,sum(score) as sum_score * from stu * group by city) t * where sum_score >150 */ df.groupBy("city").agg(sum("score") as "sum_score") .where($"sum_score">150).show() } }

窗口函数示例

数据准备

shop1,2022-01-01,500 shop1,2022-01-02,500 shop1,2022-02-01,500 shop1,2022-04-01,500 shop1,2022-03-01,500 shop1,2022-06-01,500 shop2,2022-01-01,500 shop2,2022-05-01,500 shop2,2022-02-01,500 shop2,2022-03-01,500 object TableAPI02 { Logger.getLogger("org").setLevel(Level.ERROR) def main(args: Array[String]): Unit = { //获取环境 val spark: SparkSession = SparkSession.builder() .appName("TableAPI") .master("local[*]") .getOrCreate() //读取数据 创建df //自定义结构 val schema: StructType = StructType( Seq( StructField("name", DataTypes.StringType), StructField("date", DataTypes.StringType), StructField("amount", DataTypes.IntegerType) ) ) val df: DataFrame = spark.read.schema(schema).csv("SQLData/shop/shop.csv") //导入函数和隐式 import spark.implicits._ import org.apache.spark.sql.functions._ //求每个店铺每个月的总金额以及总的累计金额 //1.求每个店铺每个月的总金额 val df2: DataFrame = df.groupBy($"name", substring($"date", 0, 7) as "month") .agg(sum("amount") as "m_sum_amount") //2.求总的累计金额 df2.select('name,'month,'m_sum_amount, sum("m_sum_amount") over(Window.partitionBy("name").orderBy("month")) as "total_money" ).show() df2.select('name,'month,'m_sum_amount, sum("m_sum_amount") over(Window.partitionBy("name").orderBy("month") .rowsBetween(Window.unboundedPreceding,Window.currentRow)) as "total_money" //指定行范围 ).show() } }

?join关联查询和union

数据准备 user

uid,name,age,gender,city 1,zss,18,M,BJ 2,ls,20,F,BJ 3,wx,30,M,SH

数据准备 order

oid,money,uid,id 1001,100,1,1 1002,100,2,2 1003,100,3,3 1004,100,1,1 1005,100,2,2 1006,100,3,3

object TableAPI03 { Logger.getLogger("org").setLevel(Level.ERROR) def main(args: Array[String]): Unit = { //获取环境 val spark: SparkSession = SparkSession.builder() .appName("TableAPI") .master("local[*]") .getOrCreate() //读取数据 创建df val userDF: DataFrame = spark.read.option("header", true).option("inferSchema", true).csv("sql_data/csv/user.csv") val orderDF: DataFrame = spark.read.option("header", true).option("inferSchema", true).csv("sql_data/csv/order.csv") //导入spark中的隐式和函数 import org.apache.spark.sql.functions._ import spark.implicits._ println("-----------------------join关联查询---------------------------") userDF.crossJoin(orderDF).show() //笛卡尔积 userDF.join(orderDF).show() //没有关联条件 也是笛卡尔积 userDF.join(orderDF,"uid").show() userDF.join(orderDF,Seq("uid")).show() userDF.join(orderDF,userDF("uid")===orderDF("id")).show() //当两张表的关联条件的字段名不一致时,可以使用这种形式 //外连接 userDF.join(orderDF,Seq("uid"),"left").show() userDF.join(orderDF,Seq("uid"),"right").show() //userDF.join(orderDF,"uid","left") 错误 没有这种构造方法 userDF.join(orderDF,userDF("uid")===orderDF("id"),"left").show() println("-----------------------union---------------------------") userDF.union(orderDF).show() //去重 userDF.unionAll(orderDF).show() //不去重 } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #SparkSQL #tableapi #DSLDataSet #name #age #city