irpas技术客

完美解决keyby造成的数据倾斜导致吞吐量降低的问题_第一片心意

大大的周 7894

1. 问题现象

????????最近在做一个类似页面pv的累加统计,根据页面id维度来统计一段时间内收到了数据。

????????下面模拟的是处理数据的原始程序。

2. 原始处理 2.1.模拟kafka源 import org.apache.flink.streaming.api.functions.source.SourceFunction import scala.util.Random /** * 每1毫秒发出一个二元组,第一个元素为随机获取到numbers中的一个数字,第二个元素为1,表示统计字段,本示例只进行累加,模拟接收到的数据的条数 * * @author ziqiang.wang * @date 2022-01-02 17:33 * */ class Tuple2Source extends SourceFunction[(String, Int)] { private val random: Random = new Random() private var flag: Boolean = true; /** * 数字数组,数据分布不均匀 */ private val numbers: Array[Int] = Array[Int](1, 1, 1, 1, 1, 2, 2, 3, 4, 5) override def run(ctx: SourceFunction.SourceContext[(String, Int)]): Unit = { while (flag) { val index: Int = random.nextInt(10) ctx.collect((numbers(index).toString), 1) Thread.sleep(1) } } override def cancel(): Unit = { flag = false } }

????????原始需求中,接收到的是kafka数据,为字符串类型,然后通过map转换,提取出页面的id,然后转化为二元组。上面自定义的source就是模拟的原始数据转化为二元组之后的数据。注意里面的numbers,数字1出现了5次,数字2出现了2次,剩下的数字出现了1次,以此来模拟数据分布不均匀的生产数据。

2.2.?对数据进行窗口累加 object NoMiniBatchDemo extends Serializable { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(10) val source: DataStream[(String, Int)] = env.addSource(new Tuple2Source).name("tuple2") source .keyBy(_._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .reduce((x, y) => (x._1, x._2 + y._2)).name("windowAgg") .print("结果数据:") env.execute("NoMiniBatch") } }

????????具体计算时,根据二元组第一个字段进行keyby,然后reduce,对第二个元素进行累加即可。但是由于页面很少,而且每个页面的数据量分布很不均匀,所以造成了很大的数据倾斜。

????????从上图可以看出,运行一段时间之后发现,数据倾斜非常严重,而且由于key数量比较少,又很多并行度一直是收不到数据的。在这种情况下,即使是增加算子的并行度,也是无法解决根本问题的。

????????网上有种方案是,在key后面添加一个固定的随机数,亦或是对key取hash,然后和key一起拼接起来作为key,虽然能在一定程度上改善数据倾斜的问题,但还是无法彻底决绝数据倾斜造成的低吞吐量问题。

?3.?两阶段聚合解决方案

????????上图是flink观望中对于flink sql的一种优化,优化方案为:Local-Global Aggregation,意思就是,现在本地进行一次聚合,然后将数据发送到下游的Agg聚合算子,然后再进行全局聚合。这种方案类似于MapReduce中的combiner,先在上游对数据进行一次聚合,以减少发送到下游的数据量,从而使降低下游的数据处理量,以增加总体的吞吐量。

3.1.?本地聚合 /** * 不断聚合接收到的数据,当算子子任务处理的数据量或者是时间达到要求,则输出所有累计结果,以减少发往下游的数据量 * * @author ziqiang.wang * @date 2022-01-02 17:57 * */ class LocalAggProcess extends ProcessFunction[(String, Int), (String, Int)] with CheckpointedFunction { // 状态,保存中间结果middleResult表的所有元素 var listState: ListState[(String, Int)] = _ // 当前并行度已处理的数据量 var count: Long = 0 // 处理数据量达到该阈值,输出所有中间结果 val outThreshold: Long = 100 // 上次输出的时间 var lastOutTimeMillis: Long = System.currentTimeMillis() // 当前时间 var currentTimeMillis: Long = System.currentTimeMillis() // 输出时间间隔 val outDuration: Long = 1000 // 保存中间聚合结果 var middleResult: mutable.Map[String, Int] = mutable.Map[String, Int]() override def open(parameters: Configuration): Unit = { // 每秒更新一次当前时间,而不是每处理一条数据就更新一次当前时间,以减少获取系统时间造成的资源消耗 new Timer("更新当前时间", true).scheduleAtFixedRate(new TimerTask { override def run(): Unit = currentTimeMillis = System.currentTimeMillis() }, 1000, 1000) } override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = { if (middleResult.contains(value._1)) { middleResult.put(value._1, middleResult(value._1) + value._2) } else { middleResult.put(value._1, value._2) } count += 1 // 如果该并行度处理数据量达到输出阈值,或者是当前时间达到输出时间间隔,则输出所有中间结果 if (count % outThreshold == 0 || currentTimeMillis - lastOutTimeMillis >= outDuration) { middleResult.foreach(entry => out.collect((entry._1, entry._2))) lastOutTimeMillis = currentTimeMillis middleResult.clear() } } /** * 做checkpoint快照时执行,将中间结果保存到listState状态中 */ override def snapshotState(context: FunctionSnapshotContext): Unit = { listState.clear() middleResult.foreach(entry => listState.add(entry._1, entry._2)) } /** * 初始化和从checkpoint恢复时调用 */ override def initializeState(context: FunctionInitializationContext): Unit = { val descriptor: ListStateDescriptor[(String, Int)] = new ListStateDescriptor[(String, Int)]("中间结果", TypeInformation.of(new TypeHint[(String, Int)] {})) listState = context.getOperatorStateStore.getListState(descriptor) if (context.isRestored) { middleResult.clear() listState.get().forEach(entry => middleResult.put(entry._1, entry._2)) } } } 3.2.?两阶段聚合实现 object LocalAggDemo extends Serializable { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(10) val source: DataStream[(String, Int)] = env.addSource(new Tuple2Source).name("tuple2") source .process(new LocalAggProcess).name("miniBatch") .keyBy(_._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .reduce((x, y) => (x._1, x._2 + y._2)).name("windowAgg") .print("结果数据:") env.execute("NoMiniBatch") } }

????????从上图可以看出,resuce算子接收到的数据明显比不使用两阶段聚合时少的多。在本地聚合算自重,我设置的是满足100条或5秒中任何一个条件时输出数据,在生产环境中,可以根据实际接收到的数据量来对这两个条件进行调整。?


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #1 #下面模拟的是处理数据的原始程序 #2 #scalautilRandom