irpas技术客

flink table & sql时间属性与窗口_老鼠扛刀满街找猫@_flink sql 时间窗口函数

大大的周 3071

文章目录 flink table & sql时间属性与窗口1 maven 依赖引用2 时间属性2.1 事件时间2.1 处理时间 3 窗口(window)3.1 分组窗口3.1.1 老版本3.1.2 新版本(窗口表值函数 Windowing TVFs) 3 聚合(Aggregation)查询3.1 TTL3.2 窗口聚合3.3 开窗(Over)聚合3.3.1 语法 4 topN example

flink table & sql时间属性与窗口

flink版本:1.13.1 scala版本:2.12

1 maven 依赖引用 <properties> <flink.version>1.13.1</flink.version> <scala.version>2.12</scala.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 实现自定义的数据格式来做序列化,可以引入下面的依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> </dependencies> 2 时间属性 2.1 事件时间 在DDL连接表中创建 CREATE TABLE EventTable( user STRING, url STRING, ts TIMESTAMP(3), // 传入得值是bigint,自动转换 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( ... );

说明:

这里我们把 ts 字段定义为事件时间属性,而且基于 ts 设置了 5 秒的水位线延迟。这里的“5 秒”是以“时间间隔”的形式定义的,格式是 INTERVAL <数值> <时间单位>:INTERVAL ‘5’ SECOND,这里的数值必须用单引号引起来,而单位用 SECOND 和 SECONDS 是等效的。TIMESTAMP会自动转为国际UTF时间,使用当地时间需要使用TIMESTAMP_LTZ CREATE TABLE events ( user STRING, url STRING, ts BIGINT, ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND ) WITH ( ... );

说明:

Flink 中支持的事件时间属性数据类型必须为 TIMESTAMP 或者 TIMESTAMP_LTZ。这里 TIMESTAMP_LTZ 是指带有本地时区信息的时间戳(TIMESTAMP WITH LOCAL TIME ZONE);一般情况下如果数据中的时间戳是“年-月-日-时-分-秒”的形式,那就是不带时区信 息的,可以将事件时间属性定义为 TIMESTAMP 类型。而如果原始的时间戳就是一个长整型的毫秒数,这时就需要另外定义一个字段来表示事件 时间属性,类型定义为 TIMESTAMP_LTZ 会更方便。 在数据流转换为表时定义 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); SingleOutputStreamOperator<Event> dataStream = env.addSource(new ClickSource()) .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.getTimestamp(); } })); Table table = tableEnv.fromDataStream(dataStream, $("user"), $("url"), $("timestamp").as("ts"), $("et").rowtime(), $("ps").proctime());

说明

设置水位线(assignTimestampsAndWatermarks),$(“et”).rowtime()自动获取,et为虚拟表别名$(“ps”).proctime()表示系统处理时间,ps为虚拟表别名.rowtime(),.proctime()值均是国际UTF时间 2.1 处理时间 在创建表的 DDL 中定义 CREATE TABLE EventTable( user STRING, url STRING, ts AS PROCTIME() ) WITH ( ... ); 在数据流转换为表时定义 DataStream<Tuple2<String, String>> stream = ...; // 声明一个额外的字段作为处理时间属性字段,$("ts").proctime()系统处理时间 Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").proctime()); 3 窗口(window) 3.1 分组窗口

在 Flink 1.12 之前的版本中,Table API 和 SQL 提供了一组“分组窗口”(Group Window)函数,常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现;具体在 SQL 中就是调用 TUMBLE()、HOP()、SESSION(),传入时间属性字段、窗口大小等参数就可以了。

3.1.1 老版本 Table result = tableEnv.sqlQuery( "SELECT " + "user, " + "TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, " + "COUNT(url) AS cnt " + "FROM EventTable " + "GROUP BY " + // 使用窗口和用户名进行分组 "user, " + "TUMBLE(ts, INTERVAL '1' HOUR)" // 定义 1 小时滚动窗口 );

这里定义了 1 小时的滚动窗口,将窗口和用户 user 一起作为分组的字段。用聚合函数COUNT()对分组数据的个数进行了聚合统计,并将结果字段重命名为cnt;用TUPMBLE_END()函数获取滚动窗口的结束时间,重命名为 endT 提取出来。

3.1.2 新版本(窗口表值函数 Windowing TVFs)

从 1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions,Windowing TVFs)来定义窗口。窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表进行扩展后返回。表函数(table function)可以看作是返回一个表的函数

案例 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); TableConfig config = tableEnv.getConfig(); config.setIdleStateRetention(Duration.ofMillis(60)); SingleOutputStreamOperator<Event> dataStream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.getTimestamp(); } })); // 1. 注册虚拟表 tableEnv.createTemporaryView("table_click", dataStream, $("user"), $("ts").rowtime()); // 2. 窗口聚合查询 老版本-滚动窗口 Table agg = tableEnv.sqlQuery("select " + " user," + " count(1) AS ct," + " TUMBLE_END(ts,INTERVAL '10' SECOND) AS endTime" + " from table_click " + " group by user, TUMBLE(ts, INTERVAL '10' SECOND)"); // 滚动窗口 布长10 // 3. 窗口聚合查询 TVF-滚动窗口 Table tvfTumbleAgg = tableEnv.sqlQuery("select " + " user," + " count(1) AS ct," + " window_start," + " window_end" + " from TABLE(" + // 创建一个滚动窗口,参数1:数据来源的虚拟表,参数2:滚动日期参数,参数3:窗口布长 " TUMBLE(table table_click, DESCRIPTOR(ts), INTERVAL '10' SECOND))" + " GROUP BY user, window_start, window_end"); // group by window_start, window_end 固定写法 // 4. 窗口聚合查询 TVF-滑动窗口 Table tvfHopAgg = tableEnv.sqlQuery("select " + " user," + " count(1) AS ct," + " window_start," + " window_end" + " from TABLE(" + // 创建一个滚动窗口,参数1:数据来源的虚拟表,参数2:滚动日期参数,参数3:滑动补布长 参数4:窗口布长 " HOP(table table_click, DESCRIPTOR(ts), INTERVAL '5' SECOND,INTERVAL '10' SECOND))" + " GROUP BY user, window_start, window_end"); // group by window_start, window_end 固定写法 // 5. 累计窗口 Table tvfCumulateAgg = tableEnv.sqlQuery("select " + " CURRENT_TIME as cutTime," + " user," + " count(1) AS ct," + " window_start," + " window_end" + " from TABLE(" + // 创建一个滚动窗口,参数1:数据来源的虚拟表,参数2:滚动日期参数,参数3:每隔多久计算输出一次 参数4:全窗口布长 " CUMULATE(table table_click, DESCRIPTOR(ts), INTERVAL '5' SECOND,INTERVAL '10' SECOND))" + " GROUP BY user, window_start, window_end"); // group by window_start, window_end 固定写法 dataStream.print("source:"); //tableEnv.toChangelogStream(agg).print("agg:"); //tableEnv.toChangelogStream(tvfTumbleAgg).print("tvfTumbleAgg:"); //tableEnv.toChangelogStream(tvfHopAgg).print("tvfHotAgg:"); tableEnv.toChangelogStream(tvfCumulateAgg).print("tvfCumulateAgg:"); 3 聚合(Aggregation)查询 3.1 TTL

在持续查询的过程中,由于用于分组的 key 可能会不断增加,因此计算结果所需要 维护的状态也会持续增长。为了防止状态无限增长耗尽资源

# 方式1 TableEnvironment tableEnv = ... // 获取表环境的配置 TableConfig tableConfig = tableEnv.getConfig(); // 配置状态保持时间 tableConfig.setIdleStateRetention(Duration.ofMinutes(60)); # 方式2 TableEnvironment tableEnv = ... Configuration configuration = tableEnv.getConfig().getConfiguration(); configuration.setString("table.exec.state.ttl", "60 min"); 3.2 窗口聚合 TVF实现 Table result = tableEnv.sqlQuery( "SELECT " + "user, " + "window_end AS endT, " + "COUNT(url) AS cnt " + "FROM TABLE( " + "TUMBLE( TABLE EventTable, " + "DESCRIPTOR(ts), " + "INTERVAL '1' HOUR)) " + "GROUP BY user, window_start, window_end " );

注意:GROUP BY window_start, window_end 是固定写法

3.3 开窗(Over)聚合

Flink SQL 中的开窗函数也是通过 OVER 子句来实现的

3.3.1 语法 <聚合函数> OVER ( [PARTITION BY <字段 1>[, <字段 2>, ...]] ORDER BY <时间属性字段> <开窗范围>), ... FROM ... PARTITION BY(可选) 用来指定分区的键(key),类似于 GROUP BY 的分组,这部分是可选的ORDER BY OVER 窗口是基于当前行扩展出的一段数据范围,选择的标准可以基于时间也可以基于数量。不论那种定义,数据都应该是以某种顺序排列好的;而表中的数据本身是无序的。所以在OVER 子句中必须用 ORDER BY 明确地指出数据基于那个字段排序。在 Flink 的流处理中,目前只支持按照时间属性的升序排列,所以这里 ORDER BY 后面的字段必须是定义好的时间属性。开窗范围 对于开窗函数而言,还有一个必须要指定的就是开窗的范围,也就是到底要扩展多少行来做聚合。这个范围是由 BETWEEN <下界> AND <上界> 来定义的,也就是“从下界到上界”的范围。目前支持的上界只能是 CURRENT ROW,也就是定义一个“从之前某一行到当前行”的范围,所以一般的形式为: # PRECEDING 指前面几个/前一段时间;CURRENT ROW:指到当前最新得行数 BETWEEN ... PRECEDING AND CURRENT ROW 范围间隔 范围间隔以 RANGE 为前缀,就是基于 ORDER BY 指定的时间字段去选取一个范围,一般就是当前行时间戳之前的一段时间。例如开窗范围选择当前行之前 1 小时的数据: RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW 行间隔 行间隔以 ROWS 为前缀,就是直接确定要选多少行,由当前行出发向前选取就可以了。 例如开窗范围选择当前行之前的 5 行数据(最终聚合会包括当前行,所以一共 6 条数据) ROWS BETWEEN 5 PRECEDING AND CURRENT ROW SELECT user, COUNT(url) OVER ( PARTITION BY user ORDER BY ts RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW ) AS cnt FROM EventTable 4 topN example import com.flink.dto.Event; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class WindowTopNExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 读取数据源,并分配时间戳、生成水位线 SingleOutputStreamOperator<Event> eventStream = env .fromElements( new Event("Alice", "./home", 1000L), new Event("Bob", "./cart", 1000L), new Event("Alice", "./prod?id=1", 25 * 60 * 1000L), new Event("Alice", "./prod?id=4", 55 * 60 * 1000L), new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L), new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L), new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L) ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.getTimestamp(); } })); // 创建表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 将数据流转换成表,并指定时间属性 Table eventTable = tableEnv.fromDataStream( eventStream, $("user"), $("url"), $("timestamp").rowtime().as("ts") // 将 timestamp 指定为事件时间,并命名为 ts ); // 为方便在 SQL 中引用,在环境中注册表 EventTable tableEnv.createTemporaryView("EventTable", eventTable); // 定义子查询,进行窗口聚合,得到包含窗口信息、用户以及访问次数的结果表 String subQuery = "SELECT window_start, window_end, user, COUNT(url) as cnt " + "FROM TABLE ( " + "TUMBLE( TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR )) " + // 滚动窗口 布长1小时 "GROUP BY window_start, window_end, user "; // 定义 Top N 的外层查询 String topNQuery = "SELECT * " + "FROM (" + "SELECT *, " + "ROW_NUMBER() OVER ( " + "PARTITION BY window_start, window_end " + "ORDER BY cnt desc " + ") AS row_num " + "FROM (" + subQuery + ")) " + "WHERE row_num <= 2"; // 执行 SQL 得到结果表 Table result = tableEnv.sqlQuery(topNQuery); tableEnv.toDataStream(result).print(); env.execute(); } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #SQL #时间窗口函数 #文章目录flink #TABLE #ampamp #基本API使用1