irpas技术客

Window Apply,Process Function,ReduceFunction,AggregateFunction分析_zhaoweiwei369

irpas 4533

本文全部来源于官方文档解释。

Window Apply?#

WindowedStream → DataStream?#

AllWindowedStream → DataStream?#

Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.

解释:将一个通用的函数应用到整个窗口。可见apply用于窗口函数之后。

举例:

windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } });

源码的角度分析:

接口函数windowFuntion中 定义了apply方法。

源码的注释为:计算窗口的值,并输出none或多个元素。 ?

总结:apply 是一个通用的函数应用于窗口中数据的计算,可以紧挨window? Funtion之后使用,另外在接口方法 WindowFunction中定义了apply方法,用户可以自定义apply对窗口中数据的处理规则。

Process Function?# The ProcessFunction?#

ProcessFunction是一种底层流处理操作,可以访问所有(非循环)流应用程序的基本构建块:

事件(流元素)状态(容错,一致,仅在键控流上)计时器(事件时间和处理时间,仅在键控流上)

可以将其ProcessFunction视为可以FlatMapFunction访问键控状态和计时器。它通过为输入流中接收到的每个事件调用来处理事件。

对于容错状态,ProcessFunction可以访问 Flink 的keyed state,可以通过 访问?RuntimeContext,类似于其他有状态函数访问 keyed state 的方式。

计时器允许应用程序对处理时间和事件时间的变化做出反应。对该函数的每次调用processElement(...)都会获得一个Context对象,该对象可以访问元素的事件时间时间戳和TimerService。可TimerService用于为将来的事件/处理时间瞬间注册回调。对于事件时间计时器,onTimer(...)当当前水印超过或超过计时器的时间戳时调用该方法,而对于处理时间计时器,onTimer(...)当挂钟时间达到指定时间时调用该方法。在该调用期间,所有状态再次限定为创建计时器的键,允许计时器操作键控状态。

如果要访问键控状态和计时器,则必须 ProcessFunction在键控流上应用:

他的具体介绍可以参照官网:Process Function | Apache Flinkhttps://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/process_function/

?总结:processFunction 是一个底层的处理流的函数:它能为用户提供三方面东西,1.元素2.状态3.计时器 可以让用户很方便的定义流数据的规则。一般我们通常使用ProcessFunction较多。

ReduceFunction?#

ReduceFunction指定输入中的两个元素如何组合生成相同类型的输出元素。?Flink使用ReduceFunction递增地聚合窗口的元素。 ?

val input: DataStream[(String, Long)] = ... input .keyBy(<key selector>) .window(<window assigner>) .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) } AggregateFunction

AggregateFunction是ReduceFunction的一般化版本,

它有三种类型:

输入类型(IN)、

累加类型(ACC)

输出类型(OUT)。

?输入类型是输入流中元素的类型,AggregateFunction有一个将一个输入元素添加到累加器的方法。?该接口还提供了创建初始累加器、将两个累加器合并到一个累加器以及从累加器提取输出(类型为OUT)的方法。?我们将在下面的例子中看到它是如何工作的。 输入类型和输出类型可以不相同?

与ReduceFunction一样,Flink会在一个窗口的输入元素到达时递增地聚合它们。 ?

/** * The accumulator is used to keep a running sum and a count. The [getResult] method * computes the average. */ class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] { override def createAccumulator() = (0L, 0L) override def add(value: (String, Long), accumulator: (Long, Long)) = (accumulator._1 + value._2, accumulator._2 + 1L) override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2 override def merge(a: (Long, Long), b: (Long, Long)) = (a._1 + b._1, a._2 + b._2) } val input: DataStream[(String, Long)] = ... input .keyBy(<key selector>) .window(<window assigner>) .aggregate(new AverageAggregate)

?

?

?

?

?

?

?

?


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Window #apply #process #function #ReduceFunction #本文全部来源于官方文档解释