irpas技术客

Doris对接消费kafka数据方案实现_HD0do_doris kafka

网络 7783

?????????本篇主要讲述消费kafka中的数据同步到Doris中。其他olap分析型数据库中,如clickhouse中有对应的kafka引擎表消费kafka的数据而后再通过物化视图的方式将消费的数据同步到对应的物理表中。但在doris中没有对应的kafka引擎表将要如何来实现同步kafka的数据呢?

? ?

接下来该篇将讲述两种方案来实现同步kafka的数据到Doris中:

通过Routine Load?Doris带有的数据导入的方式来实现 kafka中数据为普通间隔字符串,如 ‘|’

?创建接收数据表

CREATE TABLE IF NOT EXISTS sea.user ( siteid INT DEFAULT '10', citycode SMALLINT, username VARCHAR(32) DEFAULT '', pv BIGINT SUM DEFAULT '0' ) AGGREGATE KEY(siteid, citycode, username) DISTRIBUTED BY HASH(siteid) BUCKETS 10 PROPERTIES("replication_num"?=?"1");

对接kafka语句

CREATE ROUTINE LOAD sea.test ON user COLUMNS TERMINATED BY "|", COLUMNS(siteid,citycode,username,pv) PROPERTIES( "desired_concurrent_number"="1", "max_batch_interval"="20", "max_batch_rows"="300000", "max_batch_size"="209715200") FROM KAFKA( "kafka_broker_list"="192.168.18.129:9092", "kafka_topic"="doris", "property.group.id"="gid", "property.clinet.id"="cid", "property.kafka_default_offsets"="OFFSET_BEGINNING");

?

要注意的是:sea为库名,必须在导入test别名指定,同时user表不能在指定否则会不识别报错。

kafka中数据为JSON数据

创建接收数据的表

create table dev_ods.ods_user_log( `distinct_id` String not null COMMENT '会员id', `time` bigint not null COMMENT '时间戳', event_at datetime comment '事件时间;年月日,时分秒', `_track_id` string COMMENT '追踪id', `login_id` string COMMENT '登录号', `lib` String COMMENT 'lib', `anonymous_id` String COMMENT '匿名id', `_flush_time` bigint COMMENT '刷新时间', `type` String COMMENT '类型', `event` String COMMENT '事件类型', `properties` String COMMENT '具备的属性', `identities` string comment '身份信息', `dt` Date COMMENT '事件时间' ) primary key (distinct_id,`time`) distributed?by?hash(distinct_id);

????对接kafka的语句,解析JSON数据

CREATE ROUTINE LOAD dev_ods.user_log ON ods_user_log COLUMNS(distinct_id,time,_track_id,login_id,lib,anonymous_id,_flush_time,type,event,properties,identities,dt = from_unixtime(time/1000, '%Y%m%d'),event_at=from_unixtime(time/1000, 'yyyy-MM-dd HH:mm:ss')) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json" )FROM KAFKA ( "kafka_broker_list"= "10.150.20.12:9092", "kafka_topic" = "bigDataSensorAnalyse", "property.group.id"="test_group_2", "property.kafka_default_offsets" = "OFFSET_BEGINNING", "property.enable.auto.commit"="false" ?);

?JSON结构:

其中properties的字段值为:JSON对象,dt,event_at不是kafka中的数据的值,为处理后写入表中。

说明:1)如果json数据是以数组开始,并且数组中每个对象是一条记录,则需要将strip_outer_array设置成true,表示展平数组。

   2)如果json数据是以数组开始,并且数组中每个对象是一条记录,在设置jsonpath时,我们的ROOT节点实际上是数组中对象。

支持两种json数据格式: 1){"category":"a9jadhx","author":"test","price":895} 2)[ {"category":"a9jadhx","author":"test","price":895}, {"category":"axdfa1","author":"EvelynWaugh","price":1299}  ]

?

?这也是目前Doris所支持的两种JSON数据格式的解析。

JSON格式为如下数组结构时:

{ "RECORDS": [ { "category": "11", "title": "SayingsoftheCentury", "price": 895, "timestamp": 1589191587 }, { "category": "22", "author": "2avc", "price": 895, "timestamp": 1589191487 }, { "category": "33", "author": "3avc", "title": "SayingsoftheCentury", "timestamp": 1589191387 } ]}

对应解析SQL语句为:

6. 用户指定根节点json_root CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json", "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", "strip_outer_array" = "true", "json_root" = "$.RECORDS" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );

?

查看对应的查看routine load状态??????? 显示所有的example_db库下的状态 use example_db; SHOW ALL ROUTINE LOAD; Ⅴ).查看routine load状态 SHOW ALL ROUTINE LOAD FOR datasource_name.kafka_load; Ⅵ).常用routine load命令 a).暂停routine load PAUSE ROUTINE LOAD FOR datasource_name.kafka_load; b).恢复routine load RESUME ROUTINE LOAD FOR datasource_name.kafka_load; c).停止routine load STOP ROUTINE LOAD FOR datasource_name.kafka_load; d).查看所有routine load SHOW [ALL] ROUTINE LOAD FOR datasource_name.kafka_load; e).查看routine load任务 SHOW ROUTINE LOAD TASK datasource_name.kafka_load; Ⅶ).查看数据 SELECT * FROM datasource_name.table_name LIMIT 10; 参数解读??????? 1) OFFSET_BEGINNING: 从有数据的位置开始订阅。 2) OFFSET_END: 从末尾开始订阅 ???????

注:上述对接kafka为无认证的kafka对接方式,更多可以参看文章底部官网链接查看。

2.通过FlinkSQL的方式对接kafka写入Doris??????? create table flink_test_1 ( id BIGINT, day_time VARCHAR, amnount BIGINT, proctime AS PROCTIME () ) with ( 'connector' = 'kafka', 'topic' = 'flink_test', 'properties.bootstrap.servers' = '10.150.60.5:9092', 'properties.group.id' = 'flink_gp_test1', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ); CREATE TABLE sync_test_1( day_time string, total_gmv bigint, PRIMARY KEY (day_time) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url'='jdbc:mysql://10.150.60.2:9030', 'load-url'='10.150.60.2:8040;10.150.60.11:8040;10.150.60.17:8040', 'database-name' = 'test', 'table-name' = 'sync_test_1', 'username' = 'root', 'password' = 'bigdata1234', 'sink.buffer-flush.max-rows' = '1000000', 'sink.buffer-flush.max-bytes' = '300000000', 'sink.buffer-flush.interval-ms' = '5000', 'sink.max-retries' = '3' ); INSERT INTO sync_test_1 SELECT day_time,SUM(amnount) AS total_gmv FROM flink_test_1 GROUP BY day_time; ???????

????以上FlinkSQL同步数据方式提供参考,更多Flink sql相关内容会在后期文章中逐步讲解。

Doris官网链接

参考文章

?kafka 导入数据到 doris???????


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #doris #Kafka