irpas技术客

Flink 1.14.0 全新的 Kafka Connector_JasonLee实时计算

大大的周 2576

Apache Kafka Connector#

Flink 提供了一个 Apache Kafka 连接器,用于从 Kafka Topic 读取数据和向 Kafka Topic 写入数据,并保证恰好一次次语义。

Dependency#

Apache Flink 附带了一个通用的 Kafka 连接器,它试图跟踪最新版本的 Kafka 客户端。它使用的客户端版本可能会在 Flink 版本之间发生变化。最近的 Kafka 客户端向后兼容 broker 版本 0.10.0 或更高版本。关于 Kafka 兼容性的详细信息,请参考 Kafka 官方 文档。

<dependency> ????<groupId>org.apache.flink</groupId> ????<artifactId>flink-connector-kafka_2.11</artifactId> ????<version>1.14.0</version> </dependency>

Flink 的流连接器目前不是二进制发行版的一部分。在此处 查看如何与它们链接以进行集群执行。

Kafka Source# ?

本部分介绍基于新 数据源 API 的 Kafka Source。

? Usage#

Kafka Source 提供了一个 builder 类来构建 KafkaSource 的实例。下面的代码片段展示了如何构建一个 KafkaSource 来消费来自主题 “input-topic” 最早偏移量的消息,消费者组是“my-group”,并且仅将消息的值反序列化为字符串。

KafkaSource<String>?source?=?KafkaSource.<String>builder() ????.setBootstrapServers(brokers) ????.setTopics("input-topic") ????.setGroupId("my-group") ????.setStartingOffsets(OffsetsInitializer.earliest()) ????.setValueOnlyDeserializer(new?SimpleStringSchema()) ????.build(); env.fromSource(source,?WatermarkStrategy.noWatermarks(),?"Kafka?Source");

构建 KafkaSource 「需要」以下属性:

Bootstrap servers,通过 setBootstrapServers(String)来配置

Topics / partitions to subscribe,请参阅以下 主题-分区订阅 以了解更多详细信息。

Deserializer to parse Kafka messages,更多详细信息请参见以下 Deserializer。

Topic-partition Subscription#

Kafka 源码提供了 3 种 topic-partition 订阅方式:

主题列表,订阅主题列表中所有分区的消息。例如:

KafkaSource.builder().setTopics("topic-a",?"topic-b")

主题模式,从名称与提供的正则表达式匹配的所有主题订阅消息。例如:

KafkaSource.builder().setTopicPattern("topic.*")

分区集,订阅提供的分区集中的分区。例如:

final?HashSet<TopicPartition>?partitionSet?=?new?HashSet<>(Arrays.asList( ????????new?TopicPartition("topic-a",?0),????//?Partition?0?of?topic?"topic-a" ????????new?TopicPartition("topic-b",?5)));??//?Partition?5?of?topic?"topic-b" KafkaSource.builder().setPartitions(partitionSet) Deserializer#

解析 Kafka 消息需要一个反序列化器。Deserializer(反序列化模式)可以通过 配置setDeserializer(KafkaRecordDeserializationSchema),其中KafkaRecordDeserializationSchema定义了如何反序列化一个 Kafka ConsumerRecord。

如果只需要 Kafka ConsumerRecord的值,可以使用 setValueOnlyDeserializer(DeserializationSchema)在 builder 中使用,其中DeserializationSchema定义了如何反序列化 Kafka 消息值的二进制文件。

你还可以使用 Kafka Deserializer 来反序列化 Kafka 消息值. 例如使用 StringDeserializer 将 Kafka 消息值反序列化为字符串:

import?org.apache.kafka.common.serialization.StringDeserializer; KafkaSource.<String>builder() ????????.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class)); Starting Offset#

Kafka Source 能够通过指定 OffsetsInitializer来消费从不同偏移量开始的消息。内置的初始值设定项包括:

KafkaSource.builder() ????//?Start?from?committed?offset?of?the?consuming?group,?without?reset?strategy ????.setStartingOffsets(OffsetsInitializer.committedOffsets()) ????//?Start?from?committed?offset,?also?use?EARLIEST?as?reset?strategy?if?committed?offset?doesn't?exist ????.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) ????//?Start?from?the?first?record?whose?timestamp?is?greater?than?or?equals?a?timestamp ????.setStartingOffsets(OffsetsInitializer.timestamp(1592323200L)) ????//?Start?from?earliest?offset ????.setStartingOffsets(OffsetsInitializer.earliest()) ????//?Start?from?latest?offset ????.setStartingOffsets(OffsetsInitializer.latest())

如果上面的内置初始值设定项无法满足你的要求,您还可以实现自定义偏移量初始值设定项。

如果未指定 offsets 初始值设定项,「则」默认使用 「OffsetsInitializer.earliest()」。

Boundedness#

Kafka Source 旨在支持流式和批量运行模式。默认情况下,KafkaSource 设置为以流式方式运行,因此永远不会停止,直到 Flink 作业失败或被取消。您可以使用setBounded(OffsetsInitializer)指定停止偏移量并设置以批处理模式运行的源。当所有分区都达到它们的停止偏移量时,Source 将退出。

您还可以将 KafkaSource 设置为在流模式下运行,但仍然使用setUnbounded(OffsetsInitializer). 当所有分区达到其指定的停止偏移量时,Source 将退出。

Additional Properties #

除了上述属性外,您还可以使用setProperties(Properties)和为 KafkaSource 和 KafkaConsumer 设置任意属性setProperty(String, String)。KafkaSource 有以下配置选项:

client.id.prefix 定义用于 Kafka 消费者的客户端 ID 的前缀

partition.discovery.interval.ms定义 Kafka 源发现新分区的时间间隔 im 毫秒。有关更多详细信息,请参阅下面的 动态分区发现。

register.consumer.metrics 指定是否在 Flink 指标组中注册 KafkaConsumer 的指标

commit.offsets.on.checkpoint 指定是否在检查点向 Kafka broker 提交消费偏移量

KafkaConsumer 的配置可以参考 Apache Kafka文档 了解更多。

请注意,即使配置了以下键,构建器也会覆盖它:

key.deserializer 始终设置为 ByteArrayDeserializer

value.deserializer 始终设置为 ByteArrayDeserializer

auto.offset.reset.strategy被起始偏移量覆盖OffsetsInitializer#getAutoOffsetResetStrategy()

partition.discovery.interval.ms被调用时被覆盖为 -1setBounded(OffsetsInitializer)

下面的代码片段显示了配置 KafkaConsumer 以使用“PLAIN”作为 SASL 机制并提供 JAAS 配置:

KafkaSource.builder() ????.setProperty("sasl.mechanism",?"PLAIN") ????.setProperty("sasl.jaas.config",?"org.apache.kafka.common.security.plain.PlainLoginModule?required?username=\"username\"?passw Dynamic Partition Discovery#

为了在不重启 Flink 作业的情况下处理主题扩展或主题创建等场景,可以将 Kafka 源配置为在提供的主题-分区订阅模式下定期发现新分区。要启用分区发现,请为 property 设置一个非负值partition.discovery.interval.ms:

KafkaSource.builder() ????.setProperty("partition.discovery.interval.ms",?"10000")?//?discover?new?partitions?per?10?seconds ?

默认情况下「禁用」分区发现。您需要明确设置分区发现间隔才能启用此功能。

? Event Time and Watermarks#

默认情况下,记录将使用嵌入在 Kafka 中的时间戳ConsumerRecord作为事件时间。您可以定义自己WatermarkStrategy的从记录本身提取事件时间,并在下游发出水印:

env.fromSource(kafkaSource,?new?CustomWatermarkStrategy(),?"Kafka?Source?With?Custom?Watermark?Strategy")

本文档 描述了有关如何定义WatermarkStrategy.

Consumer Offset Committing#

Kafka source 在 checkpoint 「完成」时提交当前消费的 offset ,以保证 Flink 的 checkpoint 状态和 Kafka brokers 上的 commit offset 的一致性。

如果未启用检查点,则 Kafka 源依赖于 Kafka 消费者内部的自动定期偏移提交逻辑,由Kafka 消费者的属性配置enable.auto.commit并在其属性中配置auto.commit.interval.ms。

需要注意的是 Kafka source 不依赖提交偏移量来实现容错。提交偏移量只是为了暴露消费者和消费组的进度以供监控。

Monitoring#

Kafka Source 在各自的 范围内 公开以下指标。

Scope of Metric# ScopeMetricsUser VariablesDescriptionTypeOperatorcurrentEmitEventTimeLagn/a从记录事件时间戳到源连接器发出记录的时间跨度1: currentEmitEventTimeLag = EmitTime - EventTime.GaugeOperatorwatermarkLagn/a水印滞后于墙时钟时间的时间跨度: watermarkLag = CurrentTime - WatermarkGaugeOperatorsourceIdleTimen/a源没有处理任何记录的时间跨度: sourceIdleTime = CurrentTime - LastRecordProcessTimeGaugeOperatorpendingRecordsn/a源尚未提取的记录数。例如 Kafka 分区中消费者偏移后的可用记录。GaugeOperatorKafkaSourceReader.commitsSucceededn/a如果偏移提交被打开并且检查点被开启,那么成功的偏移提交到 Kafka 的总数。CounterOperatorKafkaSourceReader.commitsFailedn/a如果打开偏移提交并启用检查点,则向 Kafka 提交偏移提交失败的总数。请注意,将偏移量提交回 Kafka 只是暴露消费者进度的一种方式,因此提交失败不会影响 Flink 的检查点分区偏移量的完整性。CounterOperatorKafkaSourceReader.committedOffsetstopic, partition对于每个分区,最后一次成功提交到 Kafka 的偏移量。可以通过主题名称和分区 id 指定特定分区的指标。GaugeOperatorKafkaSourceReader.currentOffsetstopic, partition每个分区的消费者当前读取偏移量。可以通过主题名称和分区 id 指定特定分区的指标。Gauge

1 该指标是为最后处理的记录记录的瞬时值。提供此指标是因为延迟直方图可能很昂贵。瞬时延迟值通常足以很好地指示延迟。

Kafka Consumer Metrics#

Kafka 消费者的所有指标也都注册在 group 下KafkaSourceReader.KafkaConsumer。例如,Kafka 消费者指标“records-consumed-total”将在指标中报告:<some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total。

您可以通过配置 option 来配置是否注册 Kafka 消费者的指标register.consumer.metrics。默认情况下,此选项将设置为 true。

对于 Kafka 消费者的指标,您可以参考 Apache Kafka 文档 了解更多详细信息。

Behind the Scene# ?

如果您对 Kafka Source 在新数据源 API 的设计下如何工作感兴趣,您可能需要阅读此部分作为参考。有关新数据源 API 的详细信息,数据源 文档 和FLIP-27 提供了更多描述性讨论。

?

在新数据源 API 的抽象下,Kafka Source 由以下组件组成:

Source Split#

Kafka Source 中的一个源拆分代表 Kafka 主题的一个分区。Kafka Source 拆分包括:

TopicPartition 分裂代表

分区的起始偏移量

停止分区的偏移量,仅在源以有界模式运行时可用

Kafka source split 的状态也存储了partition 的当前消费 offset,当 Kafka source reader 为 snapshot 时,状态会转换为 immutable split,将当前 offset 赋值给immutable split 的起始偏移量。

您可以查看类 KafkaPartitionSplit和KafkaPartitionSplitState`了解更多详情。

Split Enumerator#

Kafka 的拆分枚举器负责在提供的主题分区订阅模式下发现新的拆分(分区),并将拆分分配给读者,以循环方式均匀分布在子任务中。请注意,Kafka Source 的拆分枚举器会急切地将拆分推送到源阅读器,因此它不需要处理来自源阅读器的拆分请求。

Source Reader#

Kafka source 的 source reader 扩展了提供的SourceReaderBase,并使用单线程多路复用线程模型,该模型读取多个分配的拆分(分区),一个 KafkaConsumer 由一个 驱动SplitReader。消息在从 Kafka 中获取后立即反序列化SplitReader。拆分的状态或消息消费的当前进度由 更新KafkaRecordEmitter,它还负责在记录向下游发出时分配事件时间。

Kafka SourceFunction # ?

FlinkKafkaConsumer已弃用,将随 Flink 1.15 一起删除,请使用 KafkaSource。

?

对于较旧的参考,您可以查看 Flink 1.13文档。

Kafka Sink#

KafkaSink 允许将记录流写入一个或多个 Kafka 主题。

Usage#

Kafka sink 提供了一个 builder 类来构造一个 KafkaSink 的实例。下面的代码片段显示了如何将字符串记录写入 Kafka 主题,并保证至少一次交付。

DataStream<String>?stream?=?... ???????? KafkaSink<String>?sink?=?KafkaSink.<String>builder() ????????.setBootstrapServers(brokers) ????????.setRecordSerializer(KafkaRecordSerializationSchema.builder() ????????????.setTopic("topic-name") ????????????.setValueSerializationSchema(new?SimpleStringSchema()) ????????????.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) ????????????.build() ????????) ????????.build(); ???????? stream.sinkTo(sink);

构建 KafkaSink 「需要」以下属性:

Bootstrap servers, setBootstrapServers(String)

Record serializer, setRecordSerializer(KafkaRecordSerializationSchema)

如果您配置交货保证 DeliveryGuarantee.EXACTLY_ONCE 你也必须设置 setTransactionalIdPrefix(String)`

Serializer#

你始终需要提供一个KafkaRecordSerializationSchema以将传入元素从数据流转换为 Kafka 生产者记录。Flink 提供了一个模式构建器来提供一些常见的构建块,即键/值序列化、主题选择、分区。您也可以自行实现接口以施加更多控制。

KafkaRecordSerializationSchema.builder() ????.setTopicSelector((element)?->?{<your-topic-selection-logic>}) ????.setValueSerializationSchema(new?SimpleStringSchema()) ????.setKeySerializationSchema(new?SimpleStringSchema()) ????.setPartitioner(new?FlinkFixedPartitioner()) ????.build();

据「需要」始终设定的值序列化方法和一个主题(选择方法)。此外,还可以通过使用setKafkaKeySerializer(Serializer)或来使用 Kafka 序列化器代替 Flink 序列化器setKafkaValueSerializer(Serializer)。

Fault Tolerance#

总的来说,KafkaSink支持三种不同的DeliveryGuarantees。ForDeliveryGuarantee.AT_LEAST_ONCE和DeliveryGuarantee.EXACTLY_ONCEFlink 的检查点必须启用。默认情况下KafkaSink使用DeliveryGuarantee.NONE. 您可以在下面找到对不同保证的解释。

DeliveryGuarantee.NONE 不提供任何保证:如果 Kafka broker 出现问题,消息可能会丢失,如果 Flink 故障,消息可能会重复。

DeliveryGuarantee.AT_LEAST_ONCE:接收器将等待 Kafka 缓冲区中所有未完成的记录在检查点上由 Kafka 生产者确认。如果 Kafka 代理出现任何问题,则不会丢失任何消息,但是当 Flink 重新启动时,消息可能会重复,因为 Flink 会重新处理旧的输入记录。

DeliveryGuarantee.EXACTLY_ONCE:在这种模式下,KafkaSink 将写入 Kafka 事务中的所有消息,这些消息将在检查点上提交给 Kafka。因此,如果消费者只读取提交的数据(参见 Kafka 消费者配置隔离级别),则在 Flink 重启的情况下不会看到重复数据。但是,这会有效地延迟记录可见性,直到写入检查点,因此相应地调整检查点持续时间。请确保在同一 Kafka 集群上运行的应用程序中使用唯一的 transactionalIdPrefix,这样多个正在运行的作业不会干扰它们的事务!此外,强烈建议调整 Kafka 事务超时(请参阅 Kafka 生产者 transaction.timeout.ms)? 最大检查点持续时间 + 最大重启持续时间或当 Kafka 未提交的事务到期时可能会发生数据丢失。

Monitoring#

Kafka sink 在各自的 scope 中 公开以下指标。

ScopeMetricsUser VariablesDescriptionTypeOperatorcurrentSendTimen/a发送最后一条记录所花费的时间。这个度量是为最后处理的记录记录的瞬时值。Gauge
Kafka Producer# ?

FlinkKafkaProducer已弃用,将随 Flink 1.15 一起删除,请使用 KafkaSink。

?

对于较旧的参考,您可以查看 Flink 1.13文档。

Kafka Connector Metrics#

Flink 的 Kafka 连接器通过 Flink 的指标系统提供了一些指标来分析连接器的行为。生产者和消费者通过 Flink 的所有支持版本的指标系统导出 Kafka 的内部指标。Kafka 文档在其文档中列出了所有导出的指标。

也可以register.consumer.metrics通过本节概述的 KafkaSource 配置或在使用 KafkaSink 时禁用 Kafka 指标的转发,您可以通过生产者属性将配置设置register.producer.metrics为 false。

Enabling Kerberos Authentication#

Flink 通过 Kafka 连接器提供一流的支持,以对为 Kerberos 配置的 Kafka 安装进行身份验证。只需配置 Flinkflink-conf.yaml即可为 Kafka 启用 Kerberos 身份验证,如下所示:

通过设置以下内容来配置 Kerberos 凭据 -

security.kerberos.login.use-ticket-cache:默认情况下,这是trueFlink 将尝试在由kinit. 请注意,在 YARN 上部署的 Flink 作业中使用 Kafka 连接器时,使用票证缓存的 Kerberos 授权将不起作用。

security.kerberos.login.keytab和security.kerberos.login.principal:要改用 Kerberos 密钥表,请为这两个属性设置值。

附加KafkaClient到security.kerberos.login.contexts:这告诉 Flink 将配置的 Kerberos 凭据提供给 Kafka 登录上下文以用于 Kafka 身份验证。

启用基于 Kerberos 的 Flink 安全性后,您可以使用 Flink Kafka Consumer 或 Producer 向 Kafka 进行身份验证,只需在传递给内部 Kafka 客户端的提供的属性配置中包含以下两个设置:

设置security.protocol为SASL_PLAINTEXT(默认NONE):用于与 Kafka 代理通信的协议。使用独立的 Flink 部署时,也可以使用SASL_SSL; 请在此处查看如何为 SSL 配置 Kafka 客户端。

设置sasl.kerberos.service.name为kafka(默认kafka):此值应与用于 Kafka 代理配置的值相匹配sasl.kerberos.service.name。客户端和服务器配置之间的服务名称不匹配将导致身份验证失败。

有关 Kerberos 安全性的 Flink 配置的更多信息,请参阅此处。您还可以在此处找到有关 Flink 如何在内部设置基于 Kerberos 的安全性的更多详细信息。

Upgrading to the Latest Connector Version#

升级作业和 Flink 版本指南中概述了通用升级步骤。对于 Kafka,您还需要执行以下步骤:

请勿同时升级 Flink 和 Kafka Connector 版本。

确保您为您的消费者配置了一个group.id。

在消费者上设置setCommitOffsetsOnCheckpoints(true),以便将读取偏移量提交给 Kafka。在停止并获取保存点之前执行此操作很重要。您可能必须在旧的连接器版本上执行停止/重新启动循环才能启用此设置。

在消费者上设置setStartFromGroupOffsets(true),以便我们从 Kafka 获得读取偏移量。这只有在 Flink 状态下没有读取偏移时才会生效,这也是下一步非常重要的原因。

更改源/接收器的分配uid。这确保新的源/接收器不会从旧的源/接收器操作符读取状态。

开始新作业,--allow-non-restored-state因为我们在保存点中仍然拥有先前连接器版本的状态。

Troubleshooting# ?

如果您在使用 Flink 时遇到 Kafka 问题,请记住,Flink 只包装了KafkaConsumer或KafkaProducer,您的问题可能与 Flink 无关,有时可以通过升级 Kafka brokers、重新配置 Kafka brokers 或重新配置KafkaConsumer或KafkaProducerin Flink 来解决。下面列出了一些常见问题的示例。

? Data loss#

根据您的 Kafka 配置,即使在 Kafka 确认写入之后,您仍然可能会遇到数据丢失的情况。特别要记住 Kafka 配置中的以下属性:

acks

log.flush.interval.messages

log.flush.interval.ms

log.flush.*

上述选项的默认值很容易导致数据丢失。更多解释请参考 Kafka 文档。

UnknownTopicOrPartitionException #

此错误的一个可能原因是正在进行新的领导者选举时,例如在重新启动 Kafka Broker 之后或期间。这是一个可重试的异常,因此 Flink 作业应该能够重新启动并恢复正常运行。它也可以通过更改retries生产者设置中的属性来规避。然而,这可能会导致消息重新排序,反过来,如果不需要,可以通过设置为 1 来规避max.in.flight.requests.per.connection。

ProducerFencedException #

此异常的原因很可能是代理端的事务超时。随着KAFKA-6119的实施,(producerId, epoch)将在事务超时后被隔离,并且其所有挂起的事务都被中止(每个transactional.id都映射到一个单独的事务producerId;这在下面的博客文章中有更详细的描述)。

推荐阅读

Flink 任务实时监控最佳实践

Flink on yarn 实时日志收集最佳实践

如果你觉得文章对你有帮助,麻烦点一下赞和在看吧,你的支持是我创作的最大动力.


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #kafka连接器 #apache #Kafka #ConnectorFlink #提供了一个