文章下半部分:Debezium connector for MySQL 配置部署
Debezium connector for MySQLMySQL 的 binlog 会按照事务提交的顺序记录所有的操作变更。这些变更既包含 表 schema的变更也包含 数据的变更。MySQL 使用binlog来复制和恢复数据。
Debezium MySQL 连接器读取 binlog,为行级INSERT,UPDATE和DELETE操作生成更改事件,并将更改事件发送到 Kafka 主题。客户端应用程序读取这些 Kafka 主题。
由于 MySQL 通常设置为在指定时间段后清除 binlog,因此 MySQL 连接器会对您的每个数据库执行初始一致快照。MySQL 连接器从创建快照的位置读取 binlog。
有关与此连接器兼容的 MySQL 数据库版本的信息,请参阅 https://debezium.io/releases/。
1. connector 是怎么工作的?从整体了解连接器支持的 MySQL (集群运行模式)拓扑 对于规划您的应用程序很有价值。为了优化配置和运行 Debezium MySQL 连接器,了解连接器如何跟踪表结构、对外公开模式更改、执行快照以及确定 Kafka 主题名称会很有帮助。
Debezium MySQL 连接器尚未在 MariaDB 上进行测试,但来自社区的多份报告表明该连接器已成功用于该数据库。计划在未来的 Debezium 版本中提供对 MariaDB 的官方支持。
1.1 支持的 MySQL (集群运行模式)拓扑Debezium MySQL 连接器支持以下 MySQL 拓扑:
Standalone 独立实例
当使用单个 MySQL 服务器时,服务器必须启用 binlog(并且可选地启用 GTID),以便 Debezium MySQL 连接器可以跟踪监控数据库服务器。这通常是可以接受的,因为二进制日志也可以用作增量备份。在这种情况下,MySQL 连接器始终连接并跟随这个独立的 MySQL 服务器实例。
Primary and replica 主库和副本
Debezium MySQL 连接器可以跟随主服务器之一或副本之一(如果该副本启用了其 binlog),但连接器仅看到该服务器可见的集群中的更改。通常,除了 在多主拓扑之外,这基本不是问题【因为只能跟踪一个实例,再多主模式种,需要跟踪多个实例这是Debezium 不支持的】。
连接器将其位置记录在服务器的 binlog 中,这在集群中的每台服务器上都是不同的。因此,连接器必须只跟随一个 MySQL 服务器实例。如果该服务器出现故障,则必须重新启动或恢复该服务器,然后连接器才能继续。
High available clusters 高可用集群
MySQL 存在多种高可用性解决方案,这些方案能容忍问题和故障并且几乎立即可以从问题和故障中恢复变得更加容易。大多数 HA MySQL 集群使用 GTID,以便副本能够跟踪任何主服务器上的所有更改。
Multi-primary 多主
网络数据库 (NDB) 集群复制使用一个或多个 MySQL 副本节点,每个节点从多个主服务器复制。这是聚合多个 MySQL 集群的复制的强大方法。此拓扑需要使用 GTID。
Debezium MySQL 连接器可以使用这些多主 MySQL 副本作为源,并且只要新副本赶上旧副本,就可以故障转移到不同的多主 MySQL 副本。也就是说,新副本具有在第一个副本上看到的所有事务。即使连接器仅使用数据库或表的一个子集,这也有效,因为可以将连接器配置为在尝试重新连接到新的多主 MySQL 副本并找到二进制日志的正确偏移位置。
Hosted 托管云服务
支持 Debezium MySQL 连接器以使用托管选项,例如 Amazon RDS 和 Amazon Aurora。
因为这些托管选项不允许全局读锁,所以使用表级锁来创建一致快照。
1.2 表模式历史主题 schema history topic当数据库客户端查询数据库时,客户端使用数据库的当前 schema。但是,数据库schema可以随时更改,这意味着连接器必须能够识别在每个记录插入、更新或删除操作时的schema是什么。此外,连接器不能只使用当前schema,因为连接器可能正在处理的记录是在 schema 变更之前生成的。
为了确保正确处理schema变更后发生的数据变更,MySQL 在 binlog 中不仅包括对数据的行级变更,还包括应用于数据库的 DDL 语句。当连接器读取 binlog 并遇到这些 DDL 语句时,它会解析它们并更新每个表schema的内存表示。连接器使用此schema表示来识别每次插入、更新或删除操作时的表schema,并产生适当的更改事件。在一个单独的数据库历史 Kafka 主题中,连接器记录所有 DDL 语句以及每个 DDL 语句出现在 binlog 中的位置。
当连接器在崩溃或优雅停止后重新启动时,连接器会从特定位置,即从特定时间点开始读取 binlog。连接器通过读取数据库历史 Kafka 主题并解析所有 DDL 语句,直到连接器启动的 binlog 中的点,来重建此时存在的表结构。
此数据库历史主题仅供连接器使用。连接器可以选择将模式更改事件发送到针对消费者应用程序的不同主题。
当 MySQL 连接器捕获应用了架构更改工具(例如gh-ost或)的表中的更改pt-online-schema-change时,会在迁移过程中创建辅助表。需要配置连接器以捕获对这些帮助表的更改。如果消费者不需要为帮助表生成的记录,则可以应用单个消息转换将它们过滤掉。
查看接收 Debezium 事件记录的默认主题名称。
1.3 schema change topic您可以配置 Debezium MySQL 连接器以生成模式更改事件,这些事件描述应用于数据库中捕获的表的模式更改。连接器将模式更改事件写入名为 的 Kafka 主题<serverName>,其中serverName是连接器配置属性中database.server.name指定的逻辑服务器名称。连接器发送到模式更改主题的消息包含有效负载,并且(可选)还包含更改事件消息的schema。
schema 更改事件消息的有效负载包括以下元素:
ddl
提供导致schema 更改的 SQL CREATE、ALTER或DROP语句。
databaseName
应用 DDL 语句的数据库的名称。databaseName的值用作消息键。
pos
语句出现在 binlog 中的位置。
tableChanges
schema 更改后整个表schema 的结构化表示。该tableChanges字段包含一个数组,其中包含表中每一列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此消费者可以轻松读取消息,而无需先通过 DDL 解析器对其进行处理。
示例:发送到 MySQL 连接器schema 更改主题的消息
以下示例显示了 JSON 格式的典型schema 更改消息。该消息包含表模式的逻辑表示。
{ "schema": { ... }, "payload": { "source": { // (1) "version": "2.0.0.Alpha1", "connector": "mysql", "name": "dbserver1", "ts_ms": 0, "snapshot": "false", "db": "inventory", "sequence": null, "table": "customers", "server_id": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 219, "row": 0, "thread": null, "query": null }, "databaseName": "inventory", // (2) "schemaName": null, "ddl": "ALTER TABLE customers ADD COLUMN middle_name VARCHAR(2000)", // (3) "tableChanges": [ // (4) { "type": "ALTER", // (5) "id": "\"inventory\".\"customers\"", // (6) "table": { // (7) "defaultCharsetName": "latin1", "primaryKeyColumnNames": [ // (8) "id" ], "columns": [ // (9) { "name": "id", "jdbcType": 4, "nativeType": null, "typeName": "INT", "typeExpression": "INT", "charsetName": null, "length": 11, "scale": null, "position": 1, "optional": false, "autoIncremented": true, "generated": true }, { "name": "first_name", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "latin1", "length": 255, "scale": null, "position": 2, "optional": false, "autoIncremented": false, "generated": false }, { "name": "last_name", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "latin1", "length": 255, "scale": null, "position": 3, "optional": false, "autoIncremented": false, "generated": false }, { "name": "email", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "latin1", "length": 255, "scale": null, "position": 4, "optional": false, "autoIncremented": false, "generated": false }, { "name": "middle_name", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "latin1", "length": 2000, "scale": null, "position": 5, "optional": true, "autoIncremented": false, "generated": false } ] } } ] }, "payload": { "databaseName": "inventory", "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT ); ALTER TABLE products AUTO_INCREMENT = 101;", "source" : { "version": "2.0.0.Alpha1", "name": "mysql-server-1", "server_id": 0, "ts_ms": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": null, "table": null, "query": null } } }表 1. 发送到schema 更改主题的消息中的字段描述
另请参阅:模式历史主题。
1.4 快照首次启动 Debezium MySQL 连接器时,它会执行数据库的初始一致快照。以下流程描述了连接器如何创建此快照。此流程适用于默认快照模式,即initial. 有关其他快照模式的信息,请参阅MySQL 连接器snapshot.mode配置属性。
表 2. 使用全局读锁执行初始快照的工作流程
连接器重新启动
如果连接器在执行初始快照时发生故障、停止或重新平衡,则在连接器重新启动后,它会执行新的快照。在初始快照完成后,Debezium MySQL 连接器从 binlog 中的相同位置重新启动,因此它不会错过任何更新。
如果连接器停止的时间足够长,MySQL 可能会清除旧的二进制日志文件,连接器的位置就会丢失。如果位置丢失,连接器将恢复为其起始位置的*初始快照。*有关对 Debezium MySQL 连接器进行故障排除的更多提示,请参阅出现问题时的行为。
不允许全局读锁
某些环境不允许全局读锁。如果 Debezium MySQL 连接器检测到不允许全局读锁,则连接器使用表级锁代替并使用此方法执行快照。这要求 Debezium 连接器的数据库用户具有LOCK TABLES权限。
表 3. 使用表级锁执行初始快照的工作流程
默认情况下,连接器仅在首次启动后才运行初始快照操作。在这个初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来更改事件数据仅通过流式处理进入。
但是,在某些情况下,连接器在初始快照期间获得的数据可能会变得陈旧、丢失或不完整。为了提供一种重新捕获表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能会导致执行临时快照:
修改连接器配置以捕获一组不同的表。
Kafka 主题被删除,必须重建。
由于配置错误或其他问题而发生数据损坏。
您可以通过启动所谓的ad-hoc 快照为之前捕获快照的表重新运行快照。即席快照需要使用signaling tables (信号表)。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。
当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了以前存在的主题,如果启用了自动主题创建,Debezium 可以自动创建主题。
即席快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,或仅捕获数据库中表的子集。
您可以通过向信令表发送execute-snapshot消息来指定要捕获的表。将execute-snapshot信号的类型设置为incremental,并提供要包含在快照中的表的名称,如下表所述:
Table 4. Example of an ad hoc execute-snapshot signal record
触发临时快照
您可以通过将具有信号execute-snapshot类型的条目添加到信号表来启动临时快照。连接器处理完消息后,将开始快照操作。快照进程读取第一个和最后一个主键值,并将这些值用作每个表的起点和终点。根据表中的条目数和配置的块大小,Debezium 将表划分为块,并继续对每个块进行快照,一次一个。
目前,execute-snapshot操作类型仅触发增量快照。有关详细信息,请参阅增量快照。
1.4.2 增量快照为了提供管理快照的灵活性,Debezium 包含一个补充快照机制,称为增量快照。增量快照依靠 Debezium 机制向 Debezium 连接器发送信号。增量快照基于DDD-3设计文档。
在增量快照中,Debezium 不是像在初始快照中那样一次捕获数据库的完整状态,而是在一系列可配置的块中分阶段捕获每个表。您可以指定您希望快照捕获的表和每个块的大小。块大小决定了快照在数据库上的每次提取操作期间收集的行数。增量快照的默认块大小为 1 KB。
随着增量快照的进行,Debezium 使用watermarks 来跟踪其进度,维护它捕获的每个表行的记录。与标准初始快照过程相比,这种分阶段捕获数据的方法具有以下优势:
您可以在流式数据捕获的同时运行增量快照,而不是将流式传输推迟到快照完成。连接器在整个快照过程中继续从更改日志中捕获近乎实时的事件,并且两个操作都不会阻塞另一个操作。
如果增量快照的进度中断,您可以恢复它而不会丢失任何数据。进程恢复后,快照从它停止的点开始,而不是从头重新捕获表。
您可以随时按需运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可以在修改连接器配置以将表添加到其table.include.list属性后重新运行快照。
增量快照过程
当您运行增量快照时,Debezium 按主键对每个表进行排序,然后根据配置的块大小将表拆分为块。逐块工作,然后捕获块中的每个表行。对于它捕获的每一行,快照都会发出一个READ事件。该事件表示块的快照开始时行的值。
随着快照的进行,其他进程可能会继续访问数据库,可能会修改表记录。为反映此类更改,INSERT、UPDATE或DELETE操作将照常提交到事务日志。同样,正在进行的 Debezium 流式处理继续检测这些更改事件并将相应的更改事件记录发送到 Kafka。
Debezium 如何解决具有相同主键的记录之间的冲突
在某些情况下,流式处理发出的UPDATE或DELETE事件被乱序接收。也就是说,流式处理可能会在快照捕获包含该行的READ事件的块之前发出一个修改表行的事件。当快照最终为该行发出相应的READ事件时,它的值已经被取代。为了确保以正确的逻辑顺序处理乱序到达的增量快照事件,Debezium 采用了一种缓冲方案来解决冲突。只有在解决了快照事件和流事件之间的冲突后,Debezium 才会向 Kafka 发出事件记录。
快照窗口
为了帮助解决延迟到达事件和修改同一表行的流事件之间的冲突READ,Debezium 采用了所谓的快照窗口。快照窗口划分了增量快照捕获指定表块数据的时间间隔。在一个块的快照窗口打开之前,Debezium 遵循其通常的行为并从事务日志直接向下游发送事件到目标 Kafka 主题。但是从特定块的快照打开的那一刻起,直到它关闭,Debezium 执行重复数据删除步骤以解决具有相同主键的事件之间的冲突。
对于每个数据集合,Debezium 发出两种类型的事件,并将它们的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为READ操作发出。同时,随着用户不断更新数据集合中的记录,事务日志也更新以反映每次提交,Debezium 会针对每次更改发出UPDATE或操作。DELETE
当快照窗口打开时,Debezium 开始处理快照块,它将快照记录传递到内存缓冲区。在快照窗口期间,READ缓冲区中事件的主键与传入流事件的主键进行比较。如果未找到匹配项,则将流式事件记录直接发送到 Kafka。如果 Debezium 检测到匹配,它会丢弃缓冲的READ事件,并将流式记录写入目标主题,因为流式事件在逻辑上取代了静态快照事件。块的快照窗口关闭后,缓冲区仅包含READ不存在相关事务日志事件的事件。Debezium 将这些剩余READ事件发送到表的 Kafka 主题。
连接器对每个快照块重复该过程。
触发增量快照
目前,启动增量快照的唯一方法是将临时快照信号发送到源数据库上的信令表。INSERT您将信号作为 SQL查询提交给表。Debezium 检测到信号表中的变化后,它会读取信号,并运行请求的快照操作。
您提交的查询指定要包含在快照中的表,并且可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值incremental.
要指定要包含在快照中的表,请提供一个data-collections列出这些表的数组,例如, {"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
增量快照信号的data-collections数组没有默认值。如果data-collections数组为空,Debezium 检测到不需要任何操作并且不执行快照。
先决条件
信令已启用。
源数据库上存在信令数据集合,连接器配置为捕获它。
信令数据集合在signal.data.collection属性中指定。
程序
发送 SQL 查询以将临时增量快照请求添加到信令表:
INSERT INTO _<signalTable>_ (id, type, data) VALUES (_'<id>'_, _'<snapshotType>'_, '{"data-collections": ["_<tableName>_","_<tableName>_"],"type":"_<snapshotType>_"}');例如,
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.table1", "schema2.table2"],"type":"incremental"}');id命令中的、type和参数的值data对应于信令表的字段。
下表描述了这些参数:
表 5. 向信令表发送增量快照信号的 SQL 命令字段说明
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例:增量快照事件消息
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" }, "op":"r", "ts_ms":"1620393591654", "transaction":null }MySQL 连接器允许使用与数据库的只读连接来运行增量快照。要运行具有只读访问权限的增量快照,连接器使用已执行的全局事务 ID (GTID) 设置为高 watermarks和低watermarks。通过将二进制日志 (binlog) 事件的 GTID 或服务器的心跳与低watermarks和高watermarks进行比较来更新块窗口的状态。
要切换到只读实现,请将read.only属性的值设置为true。
先决条件
启用 MySQL GTID。
如果连接器从多线程副本(即,值replica_parallel_workers大于的副本0)读取,则必须设置以下选项之一:
replica_preserve_commit_order=ON
slave_preserve_commit_order=ON
1.4.4 即席只读增量快照当 MySQL 连接为只读时,信令表机制](https://debezium.io/documentation/reference/2.0/connectors/mysql.html#mysql-property-signal-kafka-topic)还可以通过向 Kafka 主题发送消息 并指定 signal.kafka.topic属性来运行快照。
Kafka 消息的键必须与database.server.name连接器配置选项的值匹配。
该值是一个带有type和data字段的 JSON 对象。
信号类型是execute-snapshot,data字段必须有以下字段:
表 6. 执行快照数据字段
执行快照 Kafka 消息的示例:
键 = test_connector 值 = {"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}
1.5 快照事件的操作类型MySQL 连接器将快照事件比如将 READ操作定义为("op" : "r") 发出。如果您希望连接器将快照事件作为CREATE( c) 事件发出,请配置 DebeziumReadToInsertEvent单消息转换 (SMT) 以修改事件类型。
以下示例显示了如何配置 SMT:
示例:使用ReadToInsertEventSMT 更改快照事件的类型
transforms=snapshotasinsert,... transforms.snapshotasinsert.type=io.debezium.connector.mysql.transforms.ReadToInsertEvent 1.6 主题名称默认情况下,MySQL 连接器将表中发生的所有 INSERT, UPDATE, 和 DELETE操作更改事件写入特定于该表的单个 Apache Kafka 主题。
连接器使用以下约定来命名更改事件主题:
serverName.databaseName.tableName
假设fulfillment是服务器名称,inventory是数据库名称,并且数据库包含名为orders、customers和的表products。Debezium MySQL 连接器向三个 Kafka 主题发出事件,每个主题对应一个数据库中的表:
fulfillment.inventory.orders fulfillment.inventory.customers fulfillment.inventory.products以下列表提供了默认名称组件的定义:
serverName
database.server.name由连接器配置属性指定的服务器的逻辑名称。
schemaName
发生操作的模式的名称。
tableName
发生操作的表的名称。
连接器应用类似的命名约定来标记其内部数据库历史主题、模式更改主题和事务元数据主题。
如果默认主题名称不符合您的要求,您可以配置自定义主题名称。要配置自定义主题名称,请在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅主题路由。
1.7 事务元数据 Transaction metadataDebezium 可以生成表示事务边界和增强的数据更改事件消息的事件。
Debezium 为每个事务中的BEGIN和END分隔符生成事务边界事件。事务边界事件包含以下字段:
status
BEGIN或END。
id
唯一事务标识符的字符串表示形式。
event_count(用于END活动)
事务发出的事件总数。
data_collections(用于END活动)
一对data_collection和event_count元素的数组。表示连接器针对源自数据集合的更改发出的事件数。
例子
{ "status": "BEGIN", "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10", "event_count": null, "data_collections": null } { "status": "END", "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10", "event_count": 2, "data_collections": [ { "data_collection": "s1.a", "event_count": 1 }, { "data_collection": "s2.a", "event_count": 1 } ] }除非通过transaction.topic选项覆盖,否则连接器会向主题发出事务事件。*<database.server.name>*.transaction
更改增强的数据事件
启用事务元数据后,数据消息Envelope会增加一个新transaction字段。此字段以字段组合的形式提供有关每个事件的信息:
id- 唯一交易标识符的字符串表示
total_order- 事件在事务产生的所有事件中的绝对位置
data_collection_order- 事件在事务发出的所有事件中的每个数据收集位置
以下是消息的示例:
{ "before": null, "after": { "pk": "2", "aa": "1" }, "source": { ... }, "op": "c", "ts_ms": "1580390884335", "transaction": { "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10", "total_order": "1", "data_collection_order": "1" } }对于没有启用 GTID 的系统,事务标识符是使用 binlog 文件名和 binlog 位置的组合构建的。例如,如果事务 BEGIN 事件对应的 binlog 文件名和位置分别为 mysql-bin.000002 和 1913,则 Debezium 构造的事务标识符将为file=mysql-bin.000002,pos=1913.
2. 数据更改事件Debezium MySQL 连接器为每个行级INSERT、UPDATE和DELETE操作生成数据更改事件。每个事件都包含一个键和一个值。键和值的结构取决于已更改的表。
Debezium 和 Kafka Connector 是围绕连续的事件消息流设计的。但是,这些事件的结构可能会随着时间的推移而发生变化,这对于消费者来说可能难以处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,消费者可以使用它从注册表中获取模式的模式 ID。这使得每个事件都是独立的。
以下 JSON框架 显示了更改事件的基本四个部分。但是,如何配置您选择在应用程序中使用的 Kafka Connector 转换器决定了这四个部分在更改事件中的表示。仅当您将转换器配置为生成该字段时,该schema字段才处于更改事件中。同样,仅当您将转换器配置为生成事件键和值时,事件键和事件有效负载才在更改事件中。如果您使用 JSON 转换器并将其配置为生成所有四个基本更改事件部分,则更改事件具有以下结构:
{ "schema": { // (1) ... }, "payload": { // (2) ... }, "schema": { // (3) ... }, "payload": { // (4) ... }, }表 7. 变更事件基本内容概览
默认情况下,连接器将事件记录流更改为名称与事件源表相同的主题。请参阅主题名称。
更改事件的键包含更改表键的模式和更改行的实际键。PRIMARY KEY在连接器创建事件时,模式及其相应的有效负载都包含已更改表(或唯一约束)中每一列的字段。
请考虑下customers表,随后是此表的更改事件键的示例。
CREATE TABLE customers ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY ) AUTO_INCREMENT=1001;捕获对表的更改的每个更改事件customers都具有相同的事件键架构。只要customers表具有先前的定义,捕获customers表更改的每个更改事件都具有以下关键结构。在 JSON 中,它看起来像这样:
{ "schema": { // (1) "type": "struct", "name": "mysql-server-1.inventory.customers.Key", // (2) "optional": false, // (3) "fields": [ // (4) { "field": "id", "type": "int32", "optional": false } ] }, "payload": { // (5) "id": 1001 } }表 8. 更改事件键的说明
更改事件中的值比键复杂一些。与键一样,值也有一个schema部分和一个payload部分。该schema部分包含描述该部分Envelope结构的架构payload,包括其嵌套字段。创建、更新或删除数据的操作的更改事件都有一个带有信封结构的值负载。
考虑用于显示更改事件键示例的相同示例表:
CREATE TABLE customers ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY ) AUTO_INCREMENT=1001;对此表的更改的更改事件的值部分描述为:
创建事件
更新事件
主键更新
删除事件
墓碑事件
2.2.1 创建事件以下示例显示了连接器为在customers表中创建数据的操作生成的更改事件的值部分:
{ "schema": { // (1) "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "mysql-server-1.inventory.customers.Value", // (2) "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "mysql-server-1.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", // (3) "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "mysql-server-1.inventory.customers.Envelope" // (4) }, "payload": { // (5) "op": "c", // (6) "ts_ms": 1465491411815, // (7) "before": null, // (8) "after": { // (9) "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { // (10) "version": "2.0.0.Alpha1", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 0, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "thread": 7, "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')" } } }表 9.创建事件值字段的描述
示例表中更新的更改事件的值与该表的创建customers事件具有相同的模式。同样,事件值的有效负载具有相同的结构。但是,事件值有效负载在更新事件中包含不同的值。以下是连接器为表中的更新生成的事件中的更改事件值示例:customers
{ "schema": { ... }, "payload": { "before": { // (1) "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": { // (2) "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { // (3) "version": "2.0.0.Alpha1", "name": "mysql-server-1", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581029100, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 484, "row": 0, "thread": 7, "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004" }, "op": "u", // (4) "ts_ms": 1465581029523 // (5) } }表 10.更新事件值字段的描述
更改行的主键字段的UPDATE操作称为主键更改。对于主键更改,代替UPDATE事件记录,连接器会发出DELETE旧键的CREATE事件记录和新(更新的)键的事件记录。这些事件具有通常的结构和内容,此外,每个事件都有一个与主键更改相关的消息头:
事件DELETE记录具有__debezium.newkey消息头。此标头的值是更新行的新主键。
事件CREATE记录具有__debezium.oldkey消息头。此标头的值是更新行所具有的先前(旧)主键。
2.2.4 删除事件删除更改事件中的值与同一表的创建和更新schema事件具有相同的部分。示例表的删除事件中的部分如下所示:payload``customers
{ "schema": { ... }, "payload": { "before": { // (1) "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": null, // (2) "source": { // (3) "version": "2.0.0.Alpha1", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581902300, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 805, "row": 0, "thread": 7, "query": "DELETE FROM customers WHERE id=1004" }, "op": "d", // (4) "ts_ms": 1465581902461 // (5) } }表 11.删除事件值字段的描述
删除更改事件记录为消费者提供了处理删除该行所需的信息。包含旧值是因为某些消费者可能需要它们才能正确处理删除。
MySQL 连接器事件旨在与Kafka 日志压缩一起使用。只要至少保留每个键的最新消息,日志压缩就可以删除一些较旧的消息。这让 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并可用于重新加载基于键的状态。
2.2.5 墓碑事件当删除一行时,删除事件值仍然适用于日志压缩,因为 Kafka 可以删除所有具有相同键的早期消息。但是,要让 Kafka 删除具有相同键的所有消息,消息值必须是null. 为了实现这一点,在 Debezium 的 MySQL 连接器发出删除事件后,连接器会发出一个特殊的墓碑事件,该事件具有相同的键但有一个null值。
3. 数据类型映射Debezium MySQL 连接器表示对行的更改,事件的结构类似于行所在的表。该事件包含每个列值的字段。该列的 MySQL 数据类型决定了 Debezium 如何表示事件中的值。
存储字符串的列在 MySQL 中使用字符集和排序规则定义。MySQL 连接器在读取 binlog 事件中列值的二进制表示时使用列的字符集。
连接器可以将 MySQL 数据类型映射到文字和语义类型。
文字类型:如何使用 Kafka Connect 模式类型表示值
语义类型:Kafka Connect 模式如何捕获字段的含义(模式名称)
3.1 基本类型下表显示了连接器如何映射基本 MySQL 数据类型。
表 12. 基本类型映射的描述
排除TIMESTAMP数据类型,MySQL 时态类型取决于time.precision.mode连接器配置属性的值。对于TIMESTAMP默认值指定为CURRENT_TIMESTAMP或的列NOW,该值1970-01-01 00:00:00用作 Kafka Connect 架构中的默认值。
MySQL 允许、 和列使用零值DATE,因为零值有时优于空值。当列定义允许空值时,MySQL 连接器将零值表示为空值,或者当列不允许空值时,将零值表示为纪元日。DATETIME``TIMESTAMP
没有时区的时间值
该DATETIME类型表示本地日期和时间,例如“2018-01-13 09:48:27”。如您所见,没有时区信息。此类列使用 UTC 根据列的精度转换为纪元毫秒或微秒。该TIMESTAMP类型表示没有时区信息的时间戳。MySQL 在写入时将其从服务器(或会话的)当前时区转换为 UTC,在读回值时将其从 UTC 转换为服务器(或会话的)当前时区。例如:
DATETIME值为. 2018-06-20 06:37:03_1529476623000
TIMESTAMP值为. 2018-06-20 06:37:03_2018-06-20T13:37:03Z
io.debezium.time.ZonedTimestamp根据服务器(或会话的)当前时区,此类列将转换为 UTC 中的等效项。默认从服务器查询时区。如果失败,则必须由数据库connectionTimeZoneMySQL 配置选项明确指定。例如,如果数据库的时区(全局或通过connectionTimeZone选项为连接器配置)是“America/Los_Angeles”,则 TIMESTAMP 值“2018-06-20 06:37:03”ZonedTimestamp由值“2018-06-20T13:37:03Z”。
运行 Kafka Connect 和 Debezium 的 JVM 的时区不会影响这些转换。
有关与时间值相关的属性的更多详细信息,请参见MySQL 连接器配置属性的文档。
time.precision.mode=adaptive_time_microseconds(默认)
MySQL 连接器根据列的数据类型定义确定文字类型和语义类型,以便事件准确表示数据库中的值。所有时间字段都以微秒为单位。只有在 to 范围内的正字段TIME值才能被正确捕获。00:00:00.000000``23:59:59.999999
表 13 . 映射时time.precision.mode=adaptive_time_microseconds
time.precision.mode=连接
MySQL 连接器使用定义的 Kafka Connect 逻辑类型。这种方法不如默认方法精确,如果数据库列的小数秒精度值大于3. 00:00:00.000只能处理to范围内的值23:59:59.999。time.precision.mode=connect仅当您可以确保TIME表中的值永远不会超过支持的范围时才设置。该connect设置预计将在 Debezium 的未来版本中删除。
表 14. 映射时time.precision.mode=connect
Debezium 连接器根据decimal.handling.mode连接器配置属性的设置处理小数。
decimal.handling.mode=精确
表 15. 映射时decimal.handing.mode=precise
decimal.handling.mode=double
十进制处理模式=字符串
表 17. 映射时decimal.handing.mode=string
MySQLBOOLEAN以特定方式在内部处理该值。该BOOLEAN列在内部映射到TINYINT(1)数据类型。在流式传输期间创建表时,它使用正确的BOOLEAN映射,因为 Debezium 接收原始 DDL。在快照期间,Debezium 执行以获取为和列SHOW CREATE TABLE返回的表定义。Debezium 然后无法获得原始类型映射,因此映射到.TINYINT(1)``BOOLEAN``TINYINT(1)``TINYINT(1)
操作员可以配置开箱即用的TinyIntOneToBooleanConverter自定义转换器TINYINT(1),它将所有列映射到,BOOLEAN或者如果selector设置了参数,则可以使用逗号分隔的正则表达式枚举列的子集。
以下是一个示例配置:
converters=boolean boolean.type=io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter boolean.selector=db1.table1.*, db1.table2.column1 3.5空间类型目前,Debezium MySQL 连接器支持以下空间数据类型。
表 18. 空间类型映射的描述
在安装和运行 Debezium 连接器之前,需要执行一些 MySQL 设置任务。
4.1创建用户Debezium MySQL 连接器需要 MySQL 用户帐户。此 MySQL 用户必须对 Debezium MySQL 连接器捕获更改的所有数据库具有适当的权限。
先决条件
一个 MySQL 服务器。
SQL 命令的基本知识。
程序
创建 MySQL 用户:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';授予用户所需的权限:
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';下表描述了权限。
最终确定用户的权限:
mysql> FLUSH PRIVILEGES;表 19. 用户权限的描述
您必须为 MySQL 复制启用二进制日志记录。二进制日志记录复制工具的事务更新以传播更改。
先决条件
一个 MySQL 服务器。
适当的 MySQL 用户权限。
程序
检查该log-bin选项是否已打开:
// for MySql 5.x mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin'; // for MySql 8.x mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM performance_schema.global_variables WHERE variable_name='log_bin';如果是OFF,请使用以下属性配置您的 MySQL 服务器配置文件,如下表所述:
server-id = 223344 log_bin = mysql-bin binlog_format = ROW binlog_row_image = FULL expire_logs_days = 10通过再次检查 binlog 状态来确认您的更改:
// for MySql 5.x mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin'; // for MySql 8.x mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM performance_schema.global_variables WHERE variable_name='log_bin';表 20. MySQL binlog 配置属性的描述
全局事务标识符 (GTID) 唯一标识集群内服务器上发生的事务。尽管 Debezium MySQL 连接器不需要,但使用 GTID 可以简化复制并使您能够更轻松地确认主服务器和副本服务器是否一致。
GTID 在 MySQL 5.6.5 及更高版本中可用。有关更多详细信息,请参阅MySQL 文档。
先决条件
一个 MySQL 服务器。
SQL 命令的基本知识。
访问 MySQL 配置文件。
程序
启用gtid_mode:
mysql> gtid_mode=ON启用enforce_gtid_consistency:
mysql> enforce_gtid_consistency=ON确认更改:
mysql> show global variables like '%GTID%';结果
+--------------------------+-------+ | Variable_name | Value | +--------------------------+-------+ | enforce_gtid_consistency | ON | | gtid_mode | ON | +--------------------------+-------+表 21. GTID 选项的描述
当为大型数据库制作初始一致快照时,您建立的连接可能会在读取表时超时。您可以通过在 MySQL 配置文件中配置interactive_timeout和来防止这种行为。wait_timeout
先决条件
一个 MySQL 服务器。
SQL 命令的基本知识。
访问 MySQL 配置文件。
程序
配置interactive_timeout:
mysql> interactive_timeout=<duration-in-seconds>配置wait_timeout:
mysql> wait_timeout=<duration-in-seconds>表 22. MySQL 会话超时选项的描述
您可能希望查看SQL每个 binlog 事件的原始语句。在 MySQL 配置文件中启用该binlog_rows_query_log_events选项允许您执行此操作。
此选项在 MySQL 5.6 及更高版本中可用。
先决条件
一个 MySQL 服务器。
SQL 命令的基本知识。
访问 MySQL 配置文件。
程序
启用binlog_rows_query_log_events:
mysql> binlog_rows_query_log_events=ONbinlog_rows_query_log_events设置为启用/禁用对SQL在 binlog 条目中包含原始语句的支持的值。
ON= 启用
OFF= 禁用
如果需要配置安装请参考
本文的下半部分: # Debezium connector for MySQL 配置部署
或查看英文原文 点击查看原文: Debezium connector for MySQL :: Debezium Documentation
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。 |