irpas技术客

FlinkCDC-Hudi:Mysql数据实时入湖全攻略六:极限压测下炸出来的FlinkCDC-Hudi坑,真多_大数据点灯人

大大的周 7755

前序:FlinkCDC-Hudi系列文章:

FlinkCDC-Hudi:Mysql数据实时入湖全攻略一:初试风云 FlinkCDC-Hudi:Mysql数据实时入湖全攻略二:Hudi与Spark整合时所遇异常与解决方案 FlinkCDC-Hudi:Mysql数据实时入湖全攻略三:探索实现FlinkCDC mysql 主从库同步高可用 FlinkCDC-Hudi:Mysql数据实时入湖全攻略四:两种FlinkSql kafka connector的特征与应用 FlinkCDC-Hudi:Mysql数据实时入湖全攻略五:FlinkSQL同时输出到kafka与hudi的几种实现

一、背景

在探索完FlinkCDC-Hudi的特征和基础应用之后,我们对FlinkCDC-Hudi入湖 程序进行了极限压测,也因此炸出了很多坑,一些是内存不够引起的,一些是bug引起的。相应坑点与解决方法记录如下。

二、内存不足炸出的坑

内存不足时,表现出的异常有很多种,有各种time out,gc overhead,oom等。内存不足导致taskmanager重启时,大概率会产生数据丢失,因为重启后的运行状态不一定能与出异常时的状态完成一致。这时只能重新拉取数据。所以FlinkCDC-Hudi作业一定要配置足够的内存资源,否则出现的异常可能会影响数据的可用性。这种问题直接加大内存来解决。

内存不足主要原因是hudi merge on read 表在compact时需要大量额外的内存。

2.1 jvm gc状态

Full GC 下的超时:

S0 S1 E O M CCS YGC YGCT FGC FGCT GCT 0.00 0.00 100.00 99.92 95.40 92.78 2156 34.635 126 85.840 120.475 0.00 0.00 100.00 99.92 95.40 92.78 2156 34.635 126 85.840 120.475 0.00 0.00 100.00 99.92 95.40 92.78 2156 34.635 126 85.840 120.475 0.00 0.00 100.00 99.92 95.40 92.78 2156 34.635 126 85.840 120.475 2.2 timeout异常 java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_e09_1636707402790_0448_01_000030 timed out. at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299) at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2.3 OOM导致checkpoint失败 java.io.IOException: Could not perform checkpoint 17 for operator hoodie_stream_write (1/1)#13. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1048) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431) at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 17 for operator hoodie_stream_write (1/1)#13. Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1092) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1076) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1032) ... 19 more Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upsetting bucketType UPDATE for partition : at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpsertPartition(BaseFlinkCommitActionExecutor.java:199) at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:108) at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:70) at org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:72) at org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor.execute(FlinkUpsertDeltaCommitActionExecutor.java:49) at org.apache.hudi.table.HoodieFlinkMergeOnReadTable.upsert(HoodieFlinkMergeOnReadTable.java:72) at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:145) at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:183) at org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:460) at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) at org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:453) at org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:130) at org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:150) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218) ... 29 more Caused by: java.lang.OutOfMemoryError: Java heap space 2.4 GC overhead limit exceeded java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.flink.table.data.binary.BinaryStringData.copy(BinaryStringData.java:362) at org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:56) at org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:34) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:188) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:134) at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:157) at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:124) at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:109) at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100) at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:53) 三、hudi bug 3.1 hudi异常回滚失败Cannot use marker based rollback strategy on completed instant

作业异常进行回滚时,回滚失败。这个异常由于instance已经完成,但仍被要求回滚产生的。这个问题0.9已经出现,0.10.0 bug修复时只修复了一个场景,在0.10.1再次进行了修复。最新的master代码又将0.10.1的代码重构了。更新后暂未重现这个问题。

[BUG] ROLLBACK meet Cannot use marker based rollback strategy on completed error #4439

2022-01-28 14:30:27 org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'hoodie_stream_write' (operator 3e96073598340713e5fe75c259385e9e). at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:170) at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:103) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [initialize instant ] error ... 5 more Caused by: org.apache.hudi.exception.HoodieRollbackException: Failed to rollback hdfs:///tmp/flink/cdcata/flinkcdc_user_cdc commits 20220127163259397 at org.apache.hudi.client.AbstractHoodieWriteClient.rollback(AbstractHoodieWriteClient.java:661) at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:957) at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:940) at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:928) at org.apache.hudi.client.AbstractHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(AbstractHoodieWriteClient.java:816) at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:143) at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:815) at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:808) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.startInstant(StreamWriteOperatorCoordinator.java:334) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$initInstant$5(StreamWriteOperatorCoordinator.java:361) at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:93) ... 3 more Caused by: java.lang.IllegalArgumentException: Cannot use marker based rollback strategy on completed instant:[20220127163259397__commit__COMPLETED] at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.<init>(BaseRollbackActionExecutor.java:90) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.<init>(BaseRollbackActionExecutor.java:71) at org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor.<init>(MergeOnReadRollbackActionExecutor.java:48) at org.apache.hudi.table.HoodieFlinkMergeOnReadTable.rollback(HoodieFlinkMergeOnReadTable.java:131) at org.apache.hudi.client.AbstractHoodieWriteClient.rollback(AbstractHoodieWriteClient.java:646) ... 13 more 3.2 Duplicate key In Map

AbstractHoodieWriteClient.getPendingRollbackInfos使用stream.collect.map出现key重复,新版已修复。 [SUPPORT] java.lang.IllegalStateException: Duplicate key Option #4227

2022-01-27 17:08:52,198 ERROR org.apache.hudi.sink.StreamWriteOperatorCoordinator [] - Executor executes action [initialize instant ] error java.lang.IllegalStateException: Duplicate key Option{val=org.apache.hudi.common.HoodiePendingRollbackInfo@14a67ea4} at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133) ~[?:1.8.0_202] at java.util.HashMap.merge(HashMap.java:1254) ~[?:1.8.0_202] at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) ~[?:1.8.0_202] at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) ~[?:1.8.0_202] at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_202] at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_202] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_202] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_202] at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_202] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_202] at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[?:1.8.0_202] at org.apache.hudi.client.AbstractHoodieWriteClient.getPendingRollbackInfos(AbstractHoodieWriteClient.java:910) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:927) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:917) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.client.AbstractHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(AbstractHoodieWriteClient.java:810) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:143) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:809) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:802) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.startInstant(StreamWriteOperatorCoordinator.java:334) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$initInstant$5(StreamWriteOperatorCoordinator.java:361) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:93) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202] 2022-01-27 17:08:52,221 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure. org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'hoodie_stream_write' (operator 3e96073598340713e5fe75c259385e9e). at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553) ~[flink-dist_2.11-1.13.5.jar:1.13.5] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:170) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:103) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_202] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_202] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_202] Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [initialize instant ] error ... 5 more Caused by: java.lang.IllegalStateException: Duplicate key Option{val=org.apache.hudi.common.HoodiePendingRollbackInfo@14a67ea4} at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133) ~[?:1.8.0_202] at java.util.HashMap.merge(HashMap.java:1254) ~[?:1.8.0_202] at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) ~[?:1.8.0_202] at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) ~[?:1.8.0_202] at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_202] at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_202] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_202] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_202] at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_202] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_202] at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[?:1.8.0_202] at org.apache.hudi.client.AbstractHoodieWriteClient.getPendingRollbackInfos(AbstractHoodieWriteClient.java:910) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:927) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:917) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.client.AbstractHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(AbstractHoodieWriteClient.java:810) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:143) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:809) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:802) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.startInstant(StreamWriteOperatorCoordinator.java:334) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$initInstant$5(StreamWriteOperatorCoordinator.java:361) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:93) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] ... 3 more 四、FlinkCDC bug 4.1 同步Binlog时报ArrayIndexOutOfBoundsException

Flinkcdc同步binlog时,高频写入多个更新语句时,概率性出现ArrayIndexOutOfBoundsException。该异常会导致作业异常重启,并且存在重启失败、数据丢失的可能。目前社区中已有反馈,bug未修复。

java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:223) at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:141) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onEventDeserializationFailure(MySqlStreamingChangeEventSource.java:1193) at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:958) at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ... 1 more Caused by: io.debezium.DebeziumException at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146) ... 5 more Caused by: java.lang.ArrayIndexOutOfBoundsException at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:65) at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:262) at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:241) at java.io.InputStream.skip(InputStream.java:224) at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.skipToTheEndOfTheBlock(ByteArrayInputStream.java:280) at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:305) at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:232) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:233) at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945) ... 3 more

以上是压测中炸出来的坑,后续再有新坑,另行补充。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #真多