irpas技术客

Hello Kafka(九)——C客户端_天山老妖_c++ kafka客户端

大大的周 7989

一、Librdkafka简介 1、librdkafka简介

librdkafka是C语言实现的Apache?Kafka的高性能客户端,提供C++接口。librdkafka专为现代硬件而设计,尝试将内存复制保持在最小,可以让用户决定是需要高吞吐量还是低延迟的服务,当前可支持每秒超过100万的消息生产和300万每秒的消息消费。

Github地址:

https://github.com/edenhill/librdkafka

2、librdkafka安装 yum install librdkafka-devel

Linux系统下源码编译安装:

git clone https://github.com/edenhill/librdkafka.git cd librdkafka ./configure make -j sudo make install

编译完成后,C的静态库、动态库、头文件位于src目录,CPP的静态库、动态库、头文件位于src-cpp目录下。

安装完成后,librdkafka的头文件位于/usr/local/include/librdkafka,库文件位于/usr/local/lib,执行ldconfig确保librdkafka库生效。

librdkafka C API定义在rdkafka.h文件中。

librdkafka C++ API定义在rdkafkacpp.h文件中。

3、librdkafka特性

Iibradkafka最新版本基于Kafka 1.4.2版本,特性如下:

(1)完全EOS(精确一次语义)支持。

(2)高级Producer API支持,包括幂等性和事务型Producer支持。

(3)高级平衡KafkaConsumer?API支持。

(4)简单Consumer支持。

(5)Admin Client支持。

(6)支持snappy, gzip, lz4, zstd压缩。

(7)支持SSL。

(8)支持SASL(GSSAPI/Kerberos/SSPI, PLAIN, SCRAM, OAUTHBEARER)。

(9)支持Kafka 0.8以上版本。

(10)保证C/C++ API稳定性。

(11)支持统计指标。

(12)Debian支持:librdkafka1和librdkafka-dev

(13)RPM支持:librdkafka和librdkafka-devel

4、librdkafka术语

rd:Rapid?Development

rk:RdKafka

toppar:Topic?Partition

rep:Reply

msgq:Message?Queue

rkb:RdKafka?Broker

rko:RdKafka?Operation

rkm:RdKafka?Message

payload:存在Kafka上的消息(Log)

二、C API 1、librdkafka版本

#define RD_KAFKA_VERSION ?0x010400ff

MM.mm.rr.xx按十六进制解释,0x010400ff=1.4.0,用于编译时。

int?rd_kafka_version?(void);

返回整型的Kafka的版本。

const?char?*?rd_kafka_version_str?(void);

返回字符串类型的Kafka版本

2、Kafka Error const?char?*?rd_kafka_err2str(rd_kafka_resp_err_t?err);

根据错误码返回可读的错误信息。

void?rd_kafka_get_err_descs?(const?struct?rd_kafka_err_desc?**errdescs,size_t?*cntp);

获取全部错误代码的链表

const?char?*rd_kafka_err2name?(rd_kafka_resp_err_t?err);

获取错误代码的名称

rd_kafka_resp_err_t?rd_kafka_last_error?(void);

返回当前线程的API调用中生成的最近的错误代码

rd_kafka_resp_err_t?rd_kafka_fatal_error?(rd_kafka_t?*rk,char?*errstr,?size_t?errstr_size);

返回客户端实例中第一个fatal错误,输出可读错误字符串到errstr

rd_kafka_resp_err_t?rd_kafka_error_code?(const?rd_kafka_error_t?*error);

返回Kafka错误的错误代码

const?char?*rd_kafka_error_string?(const?rd_kafka_error_t?*error);

返回Kafka错误的可读错误描述

int?rd_kafka_error_is_fatal?(const?rd_kafka_error_t?*error);

检查Kafka错误是否时fatal错误,如果是fatal错误,返回1,否则返回0。

int?rd_kafka_error_is_retriable?(const?rd_kafka_error_t?*error);

检查Kafka错误是否可重试,如果可重试,返回1,否则返回0。

int?rd_kafka_error_txn_requires_abort?(const?rd_kafka_error_t?*error);

检查Kafka错误是否是事务型错误,如果Kafka错误是可终止的事务型错误,返回1,否则返回0。

void?rd_kafka_error_destroy?(rd_kafka_error_t?*error);

销毁Kafka错误实例

rd_kafka_error_t?*rd_kafka_error_new?(rd_kafka_resp_err_t?code,const?char?*fmt,?...);

使用错误码code和可读错误描述符创建一个Kafka错误

3、Topic+Partition typedef?struct?rd_kafka_topic_partition_s?{ ????????char????????*topic;?????????????/**<?Topic?name?*/ ????????int32_t??????partition;?????????/**<?Partition?*/ int64_t??????offset;????????????/**<?Offset?*/ ????????void????????*metadata;??????????/**<?Metadata?*/ ????????size_t???????metadata_size;?????/**<?Metadata?size?*/ ????????void????????*opaque;????????????/**<?Opaque?value?for?application?use?*/ ????????rd_kafka_resp_err_t?err;????????/**<?Error?code,?depending?on?use.?*/ ????????void???????*_private;???????????/**<?INTERNAL?USE?ONLY, ?????????????????????????????????????????*???INITIALIZE?TO?ZERO,?DO?NOT?TOUCH?*/ }?rd_kafka_topic_partition_t;

Topic+Partition占位符通常用于消费者位移(rd_kafka_commit()),分组再平衡回调函数(rd_kafka_conf_set_rebalance_cb()),位移提交回调函数(rd_kafka_conf_set_offset_commit_cb())。

void?rd_kafka_topic_partition_destroy?(rd_kafka_topic_partition_t?*rktpar);

销毁Topic+Partition对象,不能对Topic+Partition链表的元素对象调用。

typedef?struct?rd_kafka_topic_partition_list_s?{ ????????int?cnt;???????????????/**<?Current?number?of?elements?*/ ????????int?size;??????????????/**<?Current?allocated?size?*/ ????????rd_kafka_topic_partition_t?*elems;?/**<?Element?array[]?*/ }?rd_kafka_topic_partition_list_t;

Topic+Partition链表

rd_kafka_topic_partition_list_t?*rd_kafka_topic_partition_list_new?(int?size);

创建Topic+Partition链表实例

void?rd_kafka_topic_partition_list_destroy?(rd_kafka_topic_partition_list_t?*rkparlist);

销毁释放Topic+Partition链表实例

rd_kafka_topic_partition_t*?rd_kafka_topic_partition_list_add?( ????????rd_kafka_topic_partition_list_t?*rktparlist, ????????const?char?*topic,?int32_t?partition);

增加Topic+Partition到Topic+Partition链表

void?rd_kafka_topic_partition_list_add_range?(rd_kafka_topic_partition_list_t ?????????????????????????????????????????*rktparlist, ?????????????????????????????????????????const?char?*topic, ?????????????????????????????????????????int32_t?start,?int32_t?stop);

增加topic中从start到stop(包含)的分区到Topic+Partition链表

int?rd_kafka_topic_partition_list_del?(rd_kafka_topic_partition_list_t?*rktparlist, ???const?char?*topic,?int32_t?partition);

从Topic+Partition链表删除Topic+Partition

int?rd_kafka_topic_partition_list_del_by_idx?( rd_kafka_topic_partition_list_t?*rktparlist,int?idx);

根据Topic+Partition链表中的索引删除Topic+Partition,成功返回1,失败返回0。

rd_kafka_topic_partition_list_t?*?rd_kafka_topic_partition_list_copy?( ????????const?rd_kafka_topic_partition_list_t?*src);

从源Topic+Partition链表实例拷贝创建新的Topic+Partition链表

rd_kafka_resp_err_t?rd_kafka_topic_partition_list_set_offset?( rd_kafka_topic_partition_list_t?*rktparlist, const?char?*topic,?int32_t?partition,?int64_t?offset);

设置Topic+Partition链表中相应Topic+Partition的位移为offset,成功返回RD_KAFKA_RESP_ERR_NO_ERROR,没有找到Topic+Partition返回RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION。

rd_kafka_topic_partition_t*?rd_kafka_topic_partition_list_find?( ????????rd_kafka_topic_partition_list_t?*rktparlist, ???? ????????const?char?*topic,?int32_t?partition);

查找第一个匹配的Topic+Partition,没有找到返回NULL。

void?rd_kafka_topic_partition_list_sort?(rd_kafka_topic_partition_list_t?*rktparlist, int?(*cmp)?(const?void?*a,?const?void?*b,void?*cmp_opaque), void?*cmp_opaque);

使用cmp比较操作进行排序,如果cmp为NULL,会按topic和partition升序进行排序,cmp_opaque为cmp的cmp_opaque参数。 ?

4、Kafka Message typedef?struct?rd_kafka_message_s?{ rd_kafka_resp_err_t?err;??? rd_kafka_topic_t?*rkt;???? int32_t?partition;???????? void???*payload;?????????? size_t??len;??????????????? void???*key;?????????????? size_t??key_len;? int64_t?offset;???????????? ????void??*_private;?????????? }?rd_kafka_message_t;

Kafka Message通常由rd_kafka_consume*()族函数返回,并提供给dr_msg_cb()。应用程序必须检查err,然后确定采取的处理方式。

void?rd_kafka_message_destroy(rd_kafka_message_t?*rkmessage);

释放Kafka Message实例

static?const?char*?rd_kafka_message_errstr(const?rd_kafka_message_t?*rkmessage)

返回Kafka Message的错误字符串,如果没有错误,返回NULL。

int64_t?rd_kafka_message_timestamp?(const?rd_kafka_message_t?*rkmessage, ???? rd_kafka_timestamp_type_t?*tstype);

返回消费的Kafka Message的时间戳

int64_t?rd_kafka_message_latency?(const?rd_kafka_message_t?*rkmessage);

返回produce函数生产一条Kafka Message的延迟

5、Kafka客户端配置 typedef?enum?{ RD_KAFKA_CONF_UNKNOWN?=?-2,?/**<?Unknown?configuration?name.?*/ RD_KAFKA_CONF_INVALID?=?-1,?/**<?Invalid?configuration?value.?*/ RD_KAFKA_CONF_OK?=?0????????/**<?Configuration?okay?*/ }?rd_kafka_conf_res_t;

配置结果类型

rd_kafka_conf_t?*rd_kafka_conf_new(void)

创建Kafka配置实例

void?rd_kafka_conf_destroy(rd_kafka_conf_t?*conf)

销毁Kafka配置实例

rd_kafka_conf_t?*rd_kafka_conf_dup(const?rd_kafka_conf_t?*conf)

从已有Kafka配置实例创建Kafka配置对象副本

const?rd_kafka_conf_t?*rd_kafka_conf?(rd_kafka_t?*rk);

获取Kafka客户端实例的Kafka配置实例

rd_kafka_conf_res_t?rd_kafka_conf_set(rd_kafka_conf_t?*conf, ???????const?char?*name, ???????const?char?*value, ???????char?*errstr,?size_t?errstr_size);

设置Kafka配置实例属性。conf必须是rd_kafka_conf_new返回的实例。返回rd_kafka_conf_res_t类型值,表明成功或失败。如果失败,errstr字符串为可读字符串。

void?rd_kafka_conf_set_background_event_cb?(rd_kafka_conf_t?*conf, void?(*event_cb)?(rd_kafka_t?*rk, rd_kafka_event_t?*rkev, void?*opaque));

设置后台事件回调函数

void?rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t*?conf, void(*)(rd_kafka_t?*rk,?const?rd_kafka_message_t*?rkmessage,?void?*opaque))

在配置对象设置投递报告回调函数。一旦生产的每条消息被rd_kafka_produce函数接收,投递报告回调函数会被立即调用。

当成功生产消息或是RdKafka遭遇永久错误时,报告回调函数会被调用。当重试次数超过阈值或超时超过message.timeout.ms参数值或发生RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART错误都会导致发生消息投递错误。

应用程序必须在一定时间间隔内调用rd_kafka_poll()函数,执行队列化的消息投递报告回调函数。

dr_msg_cb回调函数的opaque参数是使用rd_kafka_conf_set_opaque()设置的opaque。

对于消息重试,幂等Producer可能会返回无效的时间戳RD_KAFKA_TIMESTAMP_NOT_AVAILABLE和位移RD_KAFKA_OFFSET_INVALID。

void?rd_kafka_conf_set_consume_cb?(rd_kafka_conf_t?*conf,? void(*consume_cb)(rd_kafka_message_t?*rkmessage,?void?*opaque));

设置消费回调函数,配合rd_kafka_consumer_poll函数使用。

consume_cb回调函数的opaque参数为rd_kafka_conf_set_opaque()设置的opaque参数。

void?rd_kafka_conf_set_rebalance_cb( rd_kafka_conf_t?*conf,? void(*rebalance_cb)(rd_kafka_t?*rk,? rd_kafka_resp_err_t?err,? rd_kafka_topic_partition_list_t?*partitions,? void?*opaque));

设置Rebalance回调函数,配合协调Consumer Group再平衡使用。

err设置为RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS或RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,partitions包含要assign和revoke的所有分区。

注册rebalance_cb回调函数将会关闭RdKafka的自动assign和revoke,并由rebalance_cb回调函数进行负责。

rebalance_cb回调函数会基于RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS和RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS事件负责更新RdKafka的分区分配,同时也能处理Rebalance的RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS、RD_KAFKA_RESP_ERR__REVOKE_PARTITION错误。在处理Rebalance错误时,应用程序必须调用rd_kafka_assign(rk, NULL)同步状态。

没有注册rebalance_cb回调函数,RdKafka也会自动完成分区分配,但注册rebalance_cb回调函数会使应用程序在执行带有分区分配的其它操作时具有更大的弹性,例如在可选位置(assign)内获取位移,在revoke手动提交位移。

rebalance_cb回调函数的opaque参数是使用rd_kafka_conf_set_opaque()设置的opaque参数。

Partitions链表会在rebalance_cb回调函数返回时释放,禁止应用程序释放或保存。

在rebalance_cb回调函数内修改partitions需要十分小心,对于partitions的分区只能在初始化位移时完成。但rd_kafka_position()函数可能会对分区产生意料之外的影响,如消费者分配到一个分区用于早期Rebalance的消费,因此分区的位移会被修改为一个旧的位移。较好的做法是使用rd_kafka_topic_partition_list_copy()生成的拷贝。

static?void?rebalance_cb?(rd_kafka_t?*rk,?rd_kafka_resp_err_t?err, ??????????????????????????rd_kafka_topic_partition_list_t?*partitions, ??????????????????????????void?*opaque) { ????switch?(err) ????{ ????case?RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: ????????//?application?may?load?offets?from?arbitrary?external ????????//?storage?here?and?update?\p?partitions ????????rd_kafka_assign(rk,?partitions); ????????break; ????case?RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: ????????if?(manual_commits)?//?Optional?explicit?manual?commit ????????????rd_kafka_commit(rk,?partitions,?0);?//?sync?commit ????????rd_kafka_assign(rk,?NULL); ????????break; ????default: ????????handle_unlikely_error(err); ????????rd_kafka_assign(rk,?NULL);?//?sync?state ????????break; ????} }

上述rebalance_cb代码示例表明了rebalance_cb需要处理的分区分配和位移提交逻辑。

void?rd_kafka_conf_set_offset_commit_cb?( rd_kafka_conf_t?*conf,? void(*offset_commit_cb)( rd_kafka_t?*rk,? rd_kafka_resp_err_t?err,? rd_kafka_topic_partition_list_t?*offsets,? void?*opaque));

设置用于Consumer Group的位移提交回调函数。

自动或手动位移提交的结果会在commit_cb函数内处理,并在

rd_kafka_consumer_poll()函数内被使用。

void?rd_kafka_conf_set_error_cb( rd_kafka_conf_t?*conf,? void(*error_cb)( rd_kafka_t?*rk,? int?err,? const?char?*reason,? void?*opaque));

在提供配置对象中设置错误回调函数。

error_cb用于RdKafka发送警告、错误信息给应用程序。

如果产生致命错误,error_cb回调函数会被触发,err参数被传递RD_KAFKA_RESP_ERR__FATAL值。此时,rd_kafka_fatal_error()函数可以获取到致命错误的错误代码和字符串信息,并开始终止Kafka客户端实例。

如果没有注册error_cb函数或是没有使用rd_kafka_conf_set_events函数注册RD_KAFKA_EVENT_ERROR事件,错误只会打印日志。

void?rd_kafka_conf_set_throttle_cb( rd_kafka_conf_t?*conf,? void(*throttle_cb)( rd_kafka_t?*rk,? const?char?*broker_name,? int32_t?broker_id,? int?throttle_time_ms,? void?*opaque));

在Kafka配置实例中设置throttle回调函数

throttle_cb回调函数用于转发Broker的限流次数到发起Produce和Fetch请求的应用程序。

当Broker返回非0限流次数或限流次数回落到0时,throttle_cb回调函数会被触发。

为了处理队列化的回调函数,应用程序必须在一定时间内调用rd_kafka_poll()或rd_kafka_consumer_poll()函数。

void?rd_kafka_conf_set_log_cb( rd_kafka_conf_t?*conf,? void(*log_cb)( const?rd_kafka_t?*rk,? int?level,? const?char?*fac,? const?char?*buf));

设置logger回调函数

默认日志会打印到标准错误,但syslog logger时可用的。应用程序可以提供自己的日志回调函数,传递NULL会关闭日志打印。

log_cb会被RdKafka内部线程自发调用,除非日志被转发到rd_kafka_set_log_queue()设置的轮询队列。应用程序禁止在log_cb函数内调用RdKafka API,或是任何耗时长的工作。

void?rd_kafka_conf_set_stats_cb( rd_kafka_conf_t?*conf,? int(*stats_cb)( rd_kafka_t?*rk,? char?*json,? size_t?json_len,? void?*opaque));

在配置对象中设置统计回调函数

stats_cb回调函数每隔statistics.interval.ms时间间隔在rd_kafka_poll()内被触发。

如果应程序序需要持有json指针并释放其资源,必须在stats_cb函数返回1;如果应程序序需要RdKafka释放立即json指针资源,必须stats_cb函数返回0。

void?rd_kafka_conf_set_socket_cb( rd_kafka_conf_t?*conf,? int(*socket_cb)(int?domain,? int?type,? int?protocol,? void?*opaque));

设置socket回调函数

socket_cb回调函数用于根据提供的domain、type、protocol打开Socket连接。Socket应尽可能使用CLOEXEC进行创建。

socket_cb回调函数会在RdKafka内部线程调用。

void?rd_kafka_conf_set_connect_cb?( rd_kafka_conf_t?*conf, int?(*connect_cb)?( int?sockfd, const?struct?sockaddr?*addr, int?addrlen, const?char?*id, void?*opaque));

设置socket连接回调函数

connect_cb回调函数用于连接sockfd连接到addr地址,成功返回0,错误返回错误码。

connect_cb回调函数会在RdKafka内部线程调用。

void?rd_kafka_conf_set_closesocket_cb?( rd_kafka_conf_t?*conf, int?(*closesocket_cb)?( int?sockfd, void?*opaque));

设置closesocket_cb关闭socket连接回调函数。

closesocket_cb回调函数会在RdKafka内部线程调用。

void?rd_kafka_conf_set_open_cb( rd_kafka_conf_t?*conf,? int(*open_cb)( const?char?*pathname,? int?flags,? mode_t?mode,? void?*opaque));

设置open回调函数,open回调函数用于使用pathname、flags、mode指定打开的文件。

应尽可能使用CLOEXEC打开文件。

open_cb回调函数会在RdKafka内部线程调用。

void?rd_kafka_conf_set_opaque(rd_kafka_conf_t?*conf,?void?*opaque);

设置传递到回调函数的应用程序的opaque指针。

void?*? rd_kafka_opaque(const?rd_kafka_t?*rk);

获取opaque指针

void?rd_kafka_conf_set_default_topic_conf(rd_kafka_conf_t?*conf,?rd_kafka_topic_conf_t?*tconf);

设置用于自动订阅Topic的默认Topic配置对象,本函数调用后Topic配置对象将不可用。

rd_kafka_conf_res_t?rd_kafka_conf_get(const?rd_kafka_conf_t?*conf,? const?char?*name,?char?*dest,?size_t?*dest_size);

获取属性名称的配置值。如果有属性名称匹配,返回RD_KAFKA_CONF_OK,否则返回RD_KAFKA_CONF_UNKNOWN。

rd_kafka_conf_res_t?rd_kafka_topic_conf_get(const?rd_kafka_topic_conf_t?*conf,? const?char?*name,?char?*dest,?size_t?*dest_size);

获取Topic配置对象中属性名称的配置值。

const?char**?rd_kafka_conf_dump(rd_kafka_conf_t?*conf,?size_t?*cntp);

使用键值对数组的方式备份配置对象的属性和值,数组的元素个数通过cntp输出。

const?char?**?rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t?*conf,?size_t?*cntp);

使用键值对数组的方式备份Topic配置对象的属性和值,数组的元素个数通过cntp输出。

void?rd_kafka_conf_dump_free(const?char?**arr,?size_t?cnt);

释放配置对象的备份实例。

6、Topic配置 rd_kafka_topic_conf_t*?rd_kafka_topic_conf_new?(void);

创建Topic配置对象

rd_kafka_topic_conf_t*?rd_kafka_topic_conf_dup(const?rd_kafka_topic_conf_t?*conf);

创建Topic配置对象的拷贝。

rd_kafka_topic_conf_t?*rd_kafka_default_topic_conf_dup?(rd_kafka_t?*rk);

创建Kafka客户端实例的默认Topic配置实例的副本

void?rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t?*topic_conf);

销毁Topic配置对象

rd_kafka_conf_res_t?rd_kafka_topic_conf_set(rd_kafka_topic_conf_t?*conf,? ???????????????????????????????????????????const?char?*name,?const?char?*value, ???????????????????????????????????????????char?*errstr,?size_t?errstr_size);

通过属性明设置Topic配置对象的值

void?rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t?*conf,?void?*opaque);

设置Topic配置对象的opaque指针,opaque指针会被作为rkt_opaque参数传递到所有的Topic回调函数。

void?rd_kafka_topic_conf_set_partitioner_cb( rd_kafka_topic_conf_t?*topic_conf,? int32_t(*partitioner)( const?rd_kafka_topic_t?*rkt,? const?void?*keydata,?size_t?keylen,? int32_t?partition_cnt,? void?*rkt_opaque,? void?*msg_opaque));

在Topic配置对象设置partitioner回调函数

partitioner回调函数可能会在任何时时刻被任何线程调用,并可能会针对同一条消息被调用多次。

partitioner回调函数的rkt_opaque参数通过rd_kafka_topic_conf_set_opaque()函数进行设置。

partitioner回调函数的使用约束:

(1)禁止调用除rd_kafka_topic_partition_available()外的任何d_kafka_*()函数。

(2)禁止阻塞或执行长时间操作。

(3)必须返回0到partition_cnt-1间的值。如果分区没有执行,返回RD_KAFKA_PARTITION_UA。

int?rd_kafka_topic_partition_available(const?rd_kafka_topic_t?*rkt,?int32_t?partition);

检查分区是否可用,分区可用返回1,否则返回0。

void?rd_kafka_topic_conf_set_msg_order_cmp?( rd_kafka_topic_conf_t?*topic_conf, int?(*msg_order_cmp)?( const?rd_kafka_message_t?*a, const?rd_kafka_message_t?*b));

设置消息队列排序比较回调函数

msg_order_cmp回调函数可能会在任何时时刻被任何线程调用,并可能会针对同一条消息被调用多次。

msg_order_cmp回调函数使用约束:

(1)必须时稳定排序

(2)禁止调用任何rd_kafka_*()函数

(3)禁止阻塞或执行长时间操作。

msg_order_cmp回调函数会比较两条消息并返回:

(1)如果消息a被插入消息b前,返回小于0值。

(2)如果消息a被插入消息b后,返回大与等于0值。

插入排序可以用于消息入队到正确位置,并且时间复杂度为O(N)。

如果设置queuing.strategy=fifo,新的消息会入队到消息队列尾部,但获取消息仍旧会受到msg_order_cmp影响。

int32_t?rd_kafka_msg_partitioner_random(const?rd_kafka_topic_t?*rkt,? const?void?*key,? size_t?keylen,? int32_t?partition_cnt,? void?*opaque,? void?*msg_opaque);

Random分区策略,不会返回不可用分区。

int32_t?rd_kafka_msg_partitioner_consistent?(const?rd_kafka_topic_t?*rkt, const?void?*key,?size_t?keylen, ? int32_t?partition_cnt, void?*rkt_opaque,? void?*msg_opaque);

Consistent分区策略,基于key的CRC值返回0到partition_cnt间的随机化分区。

int32_t?rd_kafka_msg_partitioner_consistent_random?(const?rd_kafka_topic_t?*rkt, ??????? const?void?*key,? size_t?keylen, ???????????int32_t?partition_cnt, ???????????void?*rkt_opaque,? void?*msg_opaque);

Consistent-Random分区策略。RdKafka默认分区策略。如果消息指定key,使用一致性分区策略;如果消息没有指定key,使用随机化分区策略。

int32_t?rd_kafka_msg_partitioner_murmur2?(const?rd_kafka_topic_t?*rkt, ??????????????????????????????????????????const?void?*key,?size_t?keylen, ??????????????????????????????????????????int32_t?partition_cnt, ??????????????????????????????????????????void?*rkt_opaque, ??????????????????????????????????????????void?*msg_opaque);

Murmur2分区策略。

int32_t?rd_kafka_msg_partitioner_murmur2_random?(const?rd_kafka_topic_t?*rkt, ?????????????????????????????????????????????????const?void?*key,?size_t?keylen, ?????????????????????????????????????????????????int32_t?partition_cnt, ?????????????????????????????????????????????????void?*rkt_opaque, ?????????????????????????????????????????????????void?*msg_opaque);

Consistent-Random Murmur2分区策略。

int32_t?rd_kafka_msg_partitioner_fnv1a?(const?rd_kafka_topic_t?*rkt, ????????????????????????????????????????const?void?*key,?size_t?keylen, ????????????????????????????????????????int32_t?partition_cnt, ????????????????????????????????????????void?*rkt_opaque, ????????????????????????????????????????void?*msg_opaque);

FNV-1a分区策略。

int32_t?rd_kafka_msg_partitioner_fnv1a_random?(const?rd_kafka_topic_t?*rkt, ???????????????????????????????????????????????const?void?*key,?size_t?keylen, ???????????????????????????????????????????????int32_t?partition_cnt, ???????????????????????????????????????????????void?*rkt_opaque, ???????????????????????????????????????????????void?*msg_opaque);

Consistent-Random FNV-1a分区策略。

7、Kafka客户端 #define?RD_KAFKA_PARTITION_UA??((int32_t)-1)

未赋值分区

rd_kafka_t?*?rd_kafka_new(rd_kafka_type_t?type,? rd_kafka_conf_t?*conf,? char?*errstr,? size_t?errstr_size);

根据指定类型RD_KAFKA_CONSUMER或RD_KAFKA_PRODUCER创建Kafka句柄并启动。失败返回NULL。

void?rd_kafka_destroy?(rd_kafka_t?*rk);

销毁Kafka句柄,阻塞操作。函数内会调用rd_kafka_consumer_close()函数。如果是Consumer实例,并配置group.id属性,应用程序不能显式调用rd_kafka_consumer_close()函数。可能会轮流触发消费者回调函数,如rebalance_cb。

void?rd_kafka_destroy_flags?(rd_kafka_t?*rk,?int?flags);

根据指定销毁flags销毁Kafka实例

const?char*?rd_kafka_name(const?rd_kafka_t?*rk);

返回Kafka句柄名称

rd_kafka_type_t?rd_kafka_type(const?rd_kafka_t?*rk);

获取Kafka实例类型

char?*?rd_kafka_memberid(const?rd_kafka_t?*rk);

返回Broker分配给客户端的分组ID

应用程序必须使用rd_kafka_mem_free()释放返回的字符串。

char?*rd_kafka_clusterid?(rd_kafka_t?*rk,?int?timeout_ms);

返回Broker元数据的ClusterId

要求Kafka版本大于0.10.0,并且api.version.request=true。

应用程序必须使用rd_kafka_mem_free()释放返回的字符串。

int32_t?rd_kafka_controllerid?(rd_kafka_t?*rk,?int?timeout_ms);

返回Broker元数据中当前Controller的ID

要求Kafka版本大于0.10.0,并且api.version.request=true。

应用程序必须使用rd_kafka_mem_free()释放返回的字符串。

8、Topic rd_kafka_topic_t?*?rd_kafka_topic_new(rd_kafka_t?*rk,? const?char?*topic,? rd_kafka_topic_conf_t?*conf);

使用topic名称创建一个新的Topic句柄,conf用于替换默认的Topic配置对象。

void?rd_kafka_topic_destroy(rd_kafka_topic_t?*rkt);

销毁Topic句柄对象

const?char?*?rd_kafka_topic_name(const?rd_kafka_topic_t?*rkt);

返回Topic名称。

void?*?rd_kafka_topic_opaque(const?rd_kafka_topic_t?*rkt);

获取Topic配置对象的rkt_opaque指针

int?rd_kafka_poll(rd_kafka_t?*rk,?int?timeout_ms);

Poll指定的Kafka句柄的事件,事件会触发回调函数。

timeout_ms参数用于指定阻塞等待事件的最小时间,非阻塞调用设置参数为0,无限期等待事件设置为-1。

应用程序应该确保在一定时间内调用poll函数,为了触发执行队列化的回调函数。如果Producer实例没有设置回调函数,则可以选择不调用poll函数,虽然不推荐。

rd_kafka_resp_err_t?rd_kafka_pause_partitions?(rd_kafka_t?*rk, ??? rd_kafka_topic_partition_list_t?*partitions);

暂停生产、消费partitions

rd_kafka_resp_err_t?rd_kafka_resume_partitions?(rd_kafka_t?*rk, ???? rd_kafka_topic_partition_list_t?*partitions);

恢复生产、消费partitions

rd_kafka_resp_err_t?rd_kafka_query_watermark_offsets?(rd_kafka_t?*rk, ?????? const?char?*topic,? int32_t?partition, ?????? int64_t?*low,? int64_t?*high,? int?timeout_ms);

查询Broker的指定Topic、分区的高、低水位

成功返回RD_KAFKA_RESP_ERR_NO_ERROR。

rd_kafka_resp_err_t?rd_kafka_get_watermark_offsets?(rd_kafka_t?*rk, const?char?*topic,? int32_t?partition, int64_t?*low,? int64_t?*high);

获取指定Topic、分区的高、低水位,成功返回RD_KAFKA_RESP_ERR_NO_ERROR,只能用于活跃Consumer实例。

rd_kafka_resp_err_t?rd_kafka_offsets_for_times?(rd_kafka_t?*rk, ????????????????????????????rd_kafka_topic_partition_list_t?*offsets, ????????????????????????????int?timeout_ms);

根据时间戳查询指定分区的位移。

void?rd_kafka_mem_free?(rd_kafka_t?*rk,?void?*ptr);

释放RdKakfka返回的指针

9、Kafka消息队列

消息队列允许应用程序从多个Topic+Partions路由已经消费的消息到一个队列。

rd_kafka_queue_t?*?rd_kafka_queue_new(rd_kafka_t?*rk);

创建新的消息队列

void?rd_kafka_queue_destroy(rd_kafka_queue_t?*rkqu);

销毁消息队列,清理队列内所有消息

rd_kafka_queue_t?*rd_kafka_queue_get_main?(rd_kafka_t?*rk);

获取RdKafka事件队列的引用

rd_kafka_queue_t?*rd_kafka_queue_get_consumer?(rd_kafka_t?*rk);

返回RdKafka消费者队列的引用

调用rd_kafka_consumer_close()前必须先调用rd_kafka_queue_destroy()。

rd_kafka_queue_t?*rd_kafka_queue_get_partition?(rd_kafka_t?*rk, ????????????????????????????????????????????????const?char?*topic, ????????????????????????????????????????????????int32_t?partition);

返回分区的队列的引用,如果分区无效,返回NULL,只针对Consumer。

必须调用rd_kafka_queue_destroy()销毁分区队列。

rd_kafka_queue_t?*rd_kafka_queue_get_background?(rd_kafka_t?*rk);

返回后台线程队列,如果没有开启后台队列,返回NULL。

通过rd_kafka_conf_set_background_event_cb()开启后台线程队列。

后台队列会被RdKakfa轮询、服务,禁止应用程序对其进行轮询、转发或其它管理,只能传递给用于开启队列的API,如Admin API。

后台线程队列为应用程序提供一个在后台线程触发事件回调的自动轮询队列,后台线程完全由RdKafka管理。

void?rd_kafka_queue_forward?(rd_kafka_queue_t?*src,?rd_kafka_queue_t?*dst);

将src队列数据转发到dst队列

无论dst是否为NULL,调用本函数后,src都不会转发所获取的队列到消费者队列。

rd_kafka_resp_err_t?rd_kafka_set_log_queue?(rd_kafka_t?*rk, ????????????????????????????????????????????rd_kafka_queue_t?*rkqu);

转发RdKafka的log到指定队列。

配置属性log.queue必须设置为true。

size_t?rd_kafka_queue_length?(rd_kafka_queue_t?*rkqu);

返回队列中元素的数量

void?rd_kafka_queue_io_event_enable?(rd_kafka_queue_t?*rkqu,?int?fd, ????? const?void?*payload,? size_t?size);

开启IO事件队列触发

指定fd=-1可以删除事件队列触发。

当使用转发队列,目的队列的IO事件必须开启。

void?rd_kafka_queue_cb_event_enable?(rd_kafka_queue_t?*rkqu, ?????????????????????????????????????void?(*event_cb)?(rd_kafka_t?*rk, ???????????????????????????????????????????????????????void?*qev_opaque), ?????????????????????????????????????void?*qev_opaque);

为队列开启回调事件触发

当新元素入队一个控队列时,回调函数会在RdKafka内部线程调用。

设置event_cb=NULL可以删除事件触发。

由于回调函数可能被RdKafks内部线程触发,因此应用程序禁止在event_cb执行耗时工作或调用RdKafka API。

10、Consumer API #define?RD_KAFKA_OFFSET_BEGINNING?-2#?从Kafka分区队列头部开始消费? #define?RD_KAFKA_OFFSET_END???????-1#?从Kafka分区队列的尾部开始消费? #define?RD_KAFKA_OFFSET_STORED?-1000#?从位移存储中获取的位移开始消费? #define?RD_KAFKA_OFFSET_INVALID?-1001?#?无效位移 int?rd_kafka_consume_start(rd_kafka_topic_t?*rkt,?int32_t?partition,?int64_t?offset);

启动消费消息,从rkt主题partition分区的offset位移开始消费

RdKakfa会试图在本地队列持有queued.min.messages条消息,通过重复从Broker批量获取消息,直到达到阈值。

应用程序应使用rd_kafka_consume*()族函数从本地队列消费消息。

rd_kafka_consume_start函数在没有停止消费的情况下禁止对同一Topic的同一分区调用多次。

成功返回0,失败返回错误。

RD_KAFKA_RESP_ERR__CONFLICT:与前一个订阅或当前订阅存在冲突。

RD_KAFKA_RESP_ERR__INVALID_ARG:非法位移或不完整配置,如缺group.id配置。

RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION:请求分区非法

RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC:topic未被Kafka集群时别。

int?rd_kafka_consume_start_queue(rd_kafka_topic_t?*rkt,? int32_t?partition,? int64_t?offset,? rd_kafka_queue_t?*rkqu);

开始消费消息,但会将收到的消息转发到rkqu消息队列

int?rd_kafka_consume_stop(rd_kafka_topic_t?*rkt,?int32_t?partition);

停止从rkt主题partition分区消费消息,并清除本地队列的所有当前消息。

rd_kafka_resp_err_t?rd_kafka_seek(rd_kafka_topic_t?*rkt,? int32_t?partition,? int64_t?offset,? int?timeout_ms);

定位到Consumer到rkt主题的partition分区的offset位移

rd_kafka_message_t?*?rd_kafka_consume?(rd_kafka_topic_t?*rkt,? int32_t?partition,? int?timeout_ms);

从rkt主题的partition分区消费一条消息

ssize_t?rd_kafka_consume_batch(rd_kafka_topic_t?*rkt,? int32_t?partition,? int?timeout_ms,? rd_kafka_message_t?**rkmessages,? size_t?rkmessages_size);

从rkt主题的partition分区批量消费rkmessages_size消息,将消费的消息赋值到rkmessages指针数组的指针。

int?rd_kafka_consume_callback(rd_kafka_topic_t?*rkt,? int32_t?partition,? int?timeout_ms,? void(*consume_cb)(rd_kafka_message_t?*rkmessage,?void?*opaque),? void?*opaque);

从rkt主题的partition分区消费消息,对消费的每一条消息调用指定的回调函数进行处理。

rd_kafka_consume_callback接口比rd_kafka_consume和rd_kafka_consume_batch可以提供更高的吞吐量性能。

rd_kafka_message_t?*rd_kafka_consume_queue(rd_kafka_queue_t?*rkqu, ???????????????????????????????????????????int?timeout_ms);

从指定队列消费消息。

ssize_t?rd_kafka_consume_batch_queue(rd_kafka_queue_t?*rkqu, ??????int?timeout_ms, ??????rd_kafka_message_t?**rkmessages, ??????size_t?rkmessages_size);

从指定队列批量消费消息。

int?rd_kafka_consume_callback_queue(rd_kafka_queue_t?*rkqu,? int?timeout_ms,? void(*consume_cb)(rd_kafka_message_t?*rkmessage,?void?*opaque),? void?*opaque);

使用回调函数从消息队列消费多条消息

11、Topic+Partition位移提交 rd_kafka_resp_err_t?rd_kafka_offset_store(rd_kafka_topic_t?*rkt,? int32_t?partition,? int64_t?offset);

存储rkt主题的partion分区的位移。使用本函数时auto.commit.enable参数需要设置为false,位移将会根据auto.commit.interval.ms参数值进行提交。如果auto.commit.enable参数为true,位移会在被消费函数返回消息前提交。

使用本函数时,enable.auto.offset.store必须设置值为false。

成功返回RD_KAFKA_RESP_ERR_NO_ERROR,错误返回错误码。

rd_kafka_resp_err_t?rd_kafka_offsets_store?(rd_kafka_t?*rk, ????????????????????????rd_kafka_topic_partition_list_t?*offsets);

对下一个或多个要自动提交位移的分区存储位移,存储的位移是offset原值,不是offset+1。

使用本函数时,enable.auto.offset.store必须设置值为false。

12、KafkaConsumer API rd_kafka_resp_err_t?rd_kafka_subscribe(rd_kafka_t?*rk,? const?rd_kafka_topic_partition_list_t?*topics);

使用平衡Consumer Group订阅Topic集

支持通配符匹配。在topics链表中,任何Topic名称如果以”^”开头,会对Kafka集群中所有的Topic进行正则匹配,匹配的Topic会被加入到订阅Topic集。

每隔topic.metadata.refresh.interval.ms时间,会对订阅Topic集进行更新。

异步操作,会立即返回。后台线程会加入Consumer Group并等待Rebalance。

rd_kafka_resp_err_t?rd_kafka_unsubscribe(rd_kafka_t?*rk);

取消当前订阅的Topic

rd_kafka_resp_err_t?rd_kafka_subscription(rd_kafka_t?*rk,? rd_kafka_topic_partition_list_t?**topics);

返回当前订阅的Topic

rd_kafka_message_t?*?rd_kafka_consumer_poll(rd_kafka_t?*rk,?int?timeout_ms);

Poll消费者消息或事件,会阻塞最多timeout_ms毫秒

为了执行等待被调用的队列化回调函数,即使没有消息,应用程序应该确保在一定时间内调用rd_kafka_consumer_poll。对于注册rebalance_cb回调函数,需要在其内部处理Consumer同步状态来说极其重要。

rd_kafka_resp_err_t?rd_kafka_consumer_close(rd_kafka_t?*rk);

关闭Kafka消费者,会阻塞直到Consumer废除其分区策略、调用rebalance_cb、提交位移到Broker、离开Consumer Group。最大阻塞时间为session.timeout.ms设置的值。

应用程序需要调用rd_kafka_destroy()清理底层资源。

rd_kafka_resp_err_t?rd_kafka_assign(rd_kafka_t?*rk,? const?rd_kafka_topic_partition_list_t?*partitions);

指定要消费的Topic+Partition

当使用rebalance_cb回调函数时,应用程序应该传递分区链表到rebalance_cb函数,新的partitions将会取代已有的分区。

rd_kafka_resp_err_t?rd_kafka_assignment(rd_kafka_t?*rk,? rd_kafka_topic_partition_list_t?**partitions);

返回当前的分区赋值,应用程序需要使用rd_kafka_topic_partition_list_destroy处理返回的TopicPartition链表。

rd_kafka_resp_err_t?rd_kafka_commit(rd_kafka_t?*rk,? const?rd_kafka_topic_partition_list_t?*offsets,?int?async);

提交指定的分区的位移到Broker,offsets包含Topic、Partition、offset以及元数据。要提交的位移是最近处理的位移+1。

如果offsets是NULL,使用当前分区的assignment代替。

如果使用rd_kafka_conf_set_offset_commit_cb()设置位移提交回调函数,位移回调函数会入队等待被rd_kafka_poll()或rd_kafka_consumer_poll()调用。

async为0表示会阻塞直到Broker位移提交完成,返回相应错误码。

rd_kafka_resp_err_t?rd_kafka_commit_message(rd_kafka_t?*rk,? const?rd_kafka_message_t?*rkmessage,?int?async);

为消息的分区提交消息位移,提交的位移是消息的位移+1。

rd_kafka_resp_err_t?rd_kafka_commit_queue?(rd_kafka_t?*rk, ??????? const?rd_kafka_topic_partition_list_t?*offsets, ??????? rd_kafka_queue_t?*rkqu, ??????? void?(*cb)?(rd_kafka_t?*rk, ??? rd_kafka_resp_err_t?err, ??? rd_kafka_topic_partition_list_t?*offsets, ???????????????void?*commit_opaque), ??????? void?*commit_opaque);

为指定partitions提交位移,位移提交结果会被发送到指定队列。

如果rkqu是NULL,会创建一个临时队列,回调函数会在本函数内调用。

rd_kafka_resp_err_t?rd_kafka_committed?(rd_kafka_t?*rk, ???? rd_kafka_topic_partition_list_t?*partitions, ???? int?timeout_ms);

获取Partitions的提交位移。

成功返回RD_KAFKA_RESP_ERR_NO_ERROR,TopicPartition的offset和err字段会被填充存储位移和分区指定错误。

rd_kafka_resp_err_t?rd_kafka_position?(rd_kafka_t?*rk, ??? rd_kafka_topic_partition_list_t?*partitions);

获取TopicPartition的当前位移。

每个TopicPartition的offset字段会被填充最近消费消息的位移+1,如果没有最近消费消息,则填充RD_KAFKA_OFFSET_INVALID。

rd_kafka_consumer_group_metadata_t*?rd_kafka_consumer_group_metadata?(rd_kafka_t?*rk);

获取Consumer的当前Consumer Group元数据。如果是独立Consumer,返回NULL。返回的指针指向资源必须被rd_kafka_consumer_group_metadata_destroy()释放。

void?rd_kafka_consumer_group_metadata_destroy?(rd_kafka_consumer_group_metadata_t?*);

释放Consumer Group元数据实例

13、Producer API #define? RD_KAFKA_MSG_F_FREE???0x1

将消息的释放委托给rdkafka

#define? RD_KAFKA_MSG_F_COPY???0x2

生成一份消息的拷贝

int?rd_kafka_produce(rd_kafka_topic_t?*rkt,? int32_t?partitition,? int?msgflags,? void?*payload,? size_t?len,? const?void?*key,? size_t?keylen,? void?*msg_opaque);

生成和发送一条消息到Broker的rkt主题的partition分区,是一个异步非阻塞API。

partition如果为RD_KAFKA_PARTITION_UA将使用Topic的自动分区函数进行分区。

发生临时错误时,RdKakfa会自动重试生产消息,会在Leader获取分区可用后的retry.backoff.ms后进行重试。否则,RdKakfa会轮询Topic元数据对Leader选举进行监控,当新的Leader选举产生时进行重试。如果消息没有在message.timeout.ms内生成,会产生投递错误。

如果指定RD_KAFKA_MSG_F_FREE,返回-1,则payload相关的内存资源需要调用者进行释放。

int?rd_kafka_produce_batch(rd_kafka_topic_t?*rkt,? int32_t?partition,? int?msgflags,? rd_kafka_message_t?*rkmessages,? int?message_cnt);

批量生产消息,是异步API。

partition是RD_KAFKA_PARTITION_UA,Topic配置的partitioner会针对每条消息进行分区处理,否则消息将会被直接入对指定分区。

rkmessages消息数组提供所有message_cnt条消息,partition参数和msgflags针对所有的消息。

librdkafka提供了异步的生产接口,没有提供同步的生产接口。

rd_kafka_resp_err_t?rd_kafka_flush?(rd_kafka_t?*rk,?int?timeout_ms);

等待所有未完成Produce请求完成。

本函数会调用rd_kafka_poll(),因此会触发回调函数。

rd_kafka_resp_err_t?rd_kafka_purge?(rd_kafka_t?*rk,?int?purge_flags);

清理Producer实例处理的当前消息

当后台线程队列被清理时,可能会阻塞短时间。

14、元数据API rd_kafka_resp_err_t?rd_kafka_metadata(rd_kafka_t?*rk,? int?all_topics,? rd_kafka_topic_t?*only_rkt,? const?struct?rd_kafka_metadata?**metadatap,? int?timeout_ms);

从Broker请求元数据。

all_topics为非0,请求集群中所有Topic的信息;all_topics为0,只请求本地已知的Topic的信息。

only_rkt:只请求本Topic的信息

metadatap指针用于接收元数据,必须使用rd_kafka_metadata_destroy函数进行释放。

timeout_ms指定超时时间

返回?RD_KAFKA_RESP_ERR_NO_ERROR表示成功,返回RD_KAFKA_RESP_ERR__TIMED_OUT表示超时,返回其它错误码表示错误。

void?rd_kafka_metadata_destroy(const?struct?rd_kafka_metadata?*metadata);

释放metadata内存。

15、其它API int?rd_kafka_brokers_add(rd_kafka_t?*rk,?const?char?*brokerlist);

增加一个或多个Broker到Kafka客户端实例

int?rd_kafka_thread_cnt(void);

获取librdkafka当前在用的线程数量

int?rd_kafka_wait_destroyed(int?timeout_ms);

等待所有的Kafka客户端实例被销毁,如果返回0,表示所有Kafka客户端已经被销毁;如果返回-1,表示超时。由于?rd_kafka_destroy()函数是异步操作,本函数用于应用程序关闭时清理。

const?char?*rd_kafka_get_debug_contexts(void);

获取Kafka调试环境,运行时

RdKafka还提供了拦截器、事务、事件、Admin相关的API。

三、Kafka Producer C API封装 1、RdKafka生产消息流程

(1)创建Kafka配置实例

rd_kafka_conf_t?*rd_kafka_conf_new?(void)

(2)设置Kafka Broker属性

rd_kafka_conf_res_t?rd_kafka_conf_set?(rd_kafka_conf_t?*conf, ???????????????????????????????????????const?char?*name, ???????????????????????????????????????const?char?*value, ???????????????????????????????????????char?*errstr,?size_t?errstr_size)

(3)设置回调函数

void?rd_kafka_conf_set_dr_msg_cb?(rd_kafka_conf_t?*conf, ??????????????????????????????????void?(*dr_msg_cb)?(rd_kafka_t?*rk, ??????????????????????????????????const?rd_kafka_message_t?*?rkmessage,? ??????????????????????????????????void?*opaque))

(4)创建Kafka Producer客户端

rd_kafka_t?*rd_kafka_new?(rd_kafka_type_t?type,? rd_kafka_conf_t?*conf, char?*errstr,? size_t?errstr_size)

(5)为Producer实例增加Brokers

int?rd_kafka_brokers_add?(rd_kafka_t?*rk,?const?char?*brokerlist)

(6)创建Topic实例

rd_kafka_topic_t?*rd_kafka_topic_new?(rd_kafka_t?*rk,? const?char?*topic,? rd_kafka_topic_conf_t?*conf)

(7)生产消息

int?rd_kafka_produce?(rd_kafka_topic_t?*rkt,? int32_t?partition,?int?msgflags, void?*payload,? size_t?len,? const?void?*key,? size_t?keylen, ?????? void?*msg_opaque)

向Topic写入消息,异步操作。

(8)阻塞等待消息发送完成

int?rd_kafka_poll?(rd_kafka_t?*rk,?int?timeout_ms)

(9)等待Producer请求完成

rd_kafka_resp_err_t?rd_kafka_flush?(rd_kafka_t?*rk,?int?timeout_ms)

(10)销毁Topic实例

void?rd_kafka_topic_destroy?(rd_kafka_topic_t?*app_rkt)

(11)销毁Kafka Producer客户端实例

void?rd_kafka_destroy?(rd_kafka_t?*rk) 2、CKafkaProducer

CKafkaProducer.h文件:

#ifndef?CKAFKAPRODUCER_H #define?CKAFKAPRODUCER_H #pragma?once #include?<stdio.h> #include?<string.h> #include?"rdkafka.h" class?CKafkaProducer { public: ????explicit?CKafkaProducer(); ????~CKafkaProducer(); ????int?init(const?char?*topic,?const?char?*brokers,?int?partition); ????/** ?????*?@brief?生产消息 ?????*?@param?str,消息数据 ?????*?@param?len,消息数长度 ?????*?@return ?????*/ ????int?sendMessage(char?*str,?int?len); ????static?void?err_cb(rd_kafka_t?*rk,?int?err,?const?char?*reason,?void?*opaque); ????static?void?throttle_cb(rd_kafka_t?*rk,?const?char?*broker_name, ????????????????????????????int32_t?broker_id,?int?throttle_time_ms,?void?*opaque); ????static?void?offset_commit_cb(rd_kafka_t?*rk,?rd_kafka_resp_err_t?err, ?????????????????????????????????rd_kafka_topic_partition_list_t?*offsets, ?????????????????????????????????void?*opaque); ????static?int?stats_cb(rd_kafka_t?*rk,?char?*json,?size_t?json_len,?void?*opaque); protected: ????rd_kafka_t*?m_kafka_handle;??//Kafka生产者实例 ????rd_kafka_topic_t*?m_kafka_topic;???//Kafka?Topic实例 ????rd_kafka_conf_t*?m_kafka_conf;????//kafka?Config实例 ????rd_kafka_topic_conf_t*?m_kafka_topic_conf;//?Kafka?Topic配置实例 ????rd_kafka_topic_partition_list_t*?m_kafka_topic_partition_list; ????int?m_partition; }; #endif?//?CKAFKAPRODUCER_H

CKafkaProducer.cpp文件:

#include?"CKafkaProducer.h" CKafkaProducer::CKafkaProducer() { ????m_kafka_handle =?NULL; ????m_kafka_topic =?NULL; ????m_kafka_conf =?NULL; ????m_kafka_topic_conf =?NULL; ????m_kafka_topic_partition_list =?NULL; ????m_partition =?RD_KAFKA_PARTITION_UA; } CKafkaProducer::~CKafkaProducer() { ????//wait?for?max?10?seconds ????rd_kafka_flush(m_kafka_handle,?10?*?1000); ????rd_kafka_topic_destroy(m_kafka_topic); ????rd_kafka_destroy(m_kafka_handle); ????rd_kafka_topic_partition_list_destroy(m_kafka_topic_partition_list); } int?CKafkaProducer::init(const?char?*topic,?const?char?*brokers,?int?partition) { ????int?ret?=?0; ????rd_kafka_conf_res_t?ret_conf?=?RD_KAFKA_CONF_OK; ????char?errstr[512]?=?{0}; ????m_kafka_conf =?rd_kafka_conf_new(); ????rd_kafka_conf_set_error_cb(m_kafka_conf,?err_cb); ????rd_kafka_conf_set_throttle_cb(m_kafka_conf,?throttle_cb); ????rd_kafka_conf_set_offset_commit_cb(m_kafka_conf,?offset_commit_cb); ????rd_kafka_conf_set_stats_cb(m_kafka_conf,?stats_cb); ????//---------Producer?config------------------- ????ret_conf?=?rd_kafka_conf_set(m_kafka_conf,?"queue.buffering.max.messages", ?????????????????????????????????"500000",?errstr,?sizeof(errstr)); ????if(ret_conf?!=?RD_KAFKA_CONF_OK) ????{ ????????printf("Error:?rd_kafka_conf_set()?failed?1;?ret_conf=%d;?errstr:%s\n", ???????????????ret_conf,?errstr); ????????return?-1; ????} ????ret_conf?=?rd_kafka_conf_set(m_kafka_conf,?"message.send.max.retries",?"3", ?????????????????????????????????errstr,?sizeof(errstr)); ????if(ret_conf?!=?RD_KAFKA_CONF_OK) ????{ ????????printf("Error:?rd_kafka_conf_set()?failed?2;?ret_conf=%d;?errstr:%s\n", ???????????????ret_conf,?errstr); ????????return?-1; ????} ????ret_conf?=?rd_kafka_conf_set(m_kafka_conf,?"retry.backoff.ms",?"500",?errstr, ?????????????????????????????????sizeof(errstr)); ????if(ret_conf?!=?RD_KAFKA_CONF_OK) ????{ ????????printf("Error:?rd_kafka_conf_set()?failed?3;?ret_conf=%d;?errstr:%s\n", ???????????????ret_conf,?errstr); ????????return?-1; ????} ????//---------Kafka?topic?config------------------- ????m_kafka_topic_conf =?rd_kafka_topic_conf_new(); ????ret_conf?=?rd_kafka_topic_conf_set(m_kafka_topic_conf,?"auto.offset.reset", ???????????????????????????????????????"earliest",?errstr,?sizeof(errstr)); ????if(ret_conf?!=?RD_KAFKA_CONF_OK) ????{ ????????printf("Error:?rd_kafka_conf_set()?failed?4;?ret_conf=%d;?errstr:%s\n", ???????????????ret_conf,?errstr); ????????return?-1; ????} ????m_kafka_topic_partition_list =?rd_kafka_topic_partition_list_new(1); ????rd_kafka_topic_partition_list_add(m_kafka_topic_partition_list,?topic, ??????????????????????????????????????partition); ????m_partition =?partition; ????//---------Create?Kafka?handle------------------- ????m_kafka_handle =?rd_kafka_new(RD_KAFKA_PRODUCER,?m_kafka_conf,?errstr, ??????????????????????????????????sizeof(errstr)); ????if(m_kafka_handle ==?NULL) ????{ ????????printf("Error:?Failed?to?create?Kafka?producer:?%s\n",?errstr); ????????return?-1; ????} ????//---------Add?broker(s)------------------- ????if(brokers?&&?rd_kafka_brokers_add(m_kafka_handle,?brokers)?<?1) ????{ ????????printf("Error:?No?valid?brokers?specified\n"); ????????return?-2; ????} ????m_kafka_topic =?rd_kafka_topic_new(m_kafka_handle,?topic,?m_kafka_topic_conf); ????return?ret; } int?CKafkaProducer::sendMessage(char?*str,?int?len) { ????int?ret?=?0; ????if(str?==?NULL) ????{ ????????return?-1; ????} ????if(len?<=?0) ????{ ????????return?-2; ????} ????char?*?topic?=?m_kafka_topic_partition_list->elems[0].topic; ????int?partition?=?m_kafka_topic_partition_list->elems[0].partition; ????//------------向kafka服务器发送消息---------------- ????ret?=?rd_kafka_produce(m_kafka_topic,?partition,?RD_KAFKA_MSG_F_COPY?|?RD_KAFKA_MSG_F_FREE,?str,?len,?NULL,?0,?NULL); ????if(ret?==?-1) ????{ ????????rd_kafka_resp_err_t?err?=?rd_kafka_last_error(); ????????if(err?==?RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION) ????????{ ????????????printf("Error:?No?such?partition:?%d\n",?partition); ????????} ????????else ????????{ ????????????printf("Error:?produce?error:?%s%s\n",?rd_kafka_err2str(err), ???????????????????err?==?RD_KAFKA_RESP_ERR__QUEUE_FULL???"?(backpressure)"?:?""); ????????} ????????rd_kafka_poll(m_kafka_handle,?1000);?//Poll?to?handle?delivery?reports ????} ????else ????{ ????????rd_kafka_poll(m_kafka_handle,?0); ????} ????return?ret; } void?CKafkaProducer::err_cb(rd_kafka_t?*rk,?int?err,?const?char?*reason,?void?*opaque) { ????printf("ERROR?CALLBACK:?%s:?%s:?%s\n",?rd_kafka_name(rk), ???????????rd_kafka_err2str((rd_kafka_resp_err_t)err),?reason); } void?CKafkaProducer::throttle_cb(rd_kafka_t?*rk,?const?char?*broker_name, ?????????????????????????????????int32_t?broker_id,?int?throttle_time_ms,?void?*opaque) { ????printf("THROTTLED?%dms?by?%s?(%d)\n",?throttle_time_ms,?broker_name,?broker_id); } void?CKafkaProducer::offset_commit_cb(rd_kafka_t?*rk,?rd_kafka_resp_err_t?err, ??????????????????????????????????????rd_kafka_topic_partition_list_t?*offsets, ??????????????????????????????????????void?*opaque) { ????int?i; ????int?verbosity?=?1; ????if(err?||?verbosity?>=?2) ????{ ????????printf("Offset?commit?of?%d?partition(s):?%s\n",?offsets->cnt,?rd_kafka_err2str(err)); ????} ????for(i?=?0;?i?<?offsets->cnt;?i++) ????{ ????????rd_kafka_topic_partition_t?*?rktpar?=?&offsets->elems[i]; ????????if(rktpar->err ||?verbosity?>=?2) ????????{ ????????????printf("%%??%s?[%d]?@?%d:?%s\n",?rktpar->topic,?rktpar->partition, ???????????????????rktpar->offset,?rd_kafka_err2str(err)); ????????} ????} } int?CKafkaProducer::stats_cb(rd_kafka_t?*rk,?char?*json,?size_t?json_len, ?????????????????????????????void?*opaque) { ????printf("%s\n",?json); ????return?0; }

main.cpp文件:

#include?"CKafkaProducer.h" const?int?N?=?10000; int?main(int?argc,?char?*argv[]) { ????CKafkaProducer?producer; ????char?topic[]?=?"test"; ????char?brokers[]?=?"192.168.0.105:9092"; ????int?partition?=?0; ????int?ret??=?producer.init(topic,?brokers,?partition); ????if(ret?!=?0) ????{ ????????printf("Error:?CKafkaProducer.init():?ret=%d;\n",?ret); ????????return?0; ????} ????for(int?i?=?0;?i?<?N;?i++) ????{ ????????char?str_msg[64]?=?{0}; ????????sprintf(str_msg,?"Hello?Kafka%4d",?i); ????????ret?=?producer.sendMessage(str_msg,?strlen(str_msg));?//向kafka服务器发送消息 ????????if(ret?!=?0) ????????{ ????????????printf("Error:?CKafkaProducer.sendMessage():?ret=%d;\n",?ret); ????????????return?0; ????????} ????} ????return?0; }

CMakeList.txt文件:

cmake_minimum_required(VERSION?2.8) project(CKafkaProducer) set(CMAKE_CXX_STANDARD?11) set(CMAKE_CXX_COMPILER?"g++") set(CMAKE_CXX_FLAGS?"-std=c++11?${CMAKE_CXX_FLAGS}") set(CMAKE_INCLUDE_CURRENT_DIR?ON) #?Kafka头文件路径 include_directories(/usr/local/include/librdkafka) #?Kafka库路径 link_directories(/usr/local/lib) aux_source_directory(.?SOURCE) add_executable(${PROJECT_NAME}?${SOURCE}) TARGET_LINK_LIBRARIES(${PROJECT_NAME}?rdkafka) 3、Kafka消息查看

进入kafka容器:

docker exec -it kafka-test /bin/bash

查看Topic的消息:

kafka-console-consumer.sh --bootstrap-server kafka-test:9092 --topic test --from-beginning 四、Kafka Consumer C API封装 1、RdKafka消费消息流程

(1)创建Kafka配置实例

rd_kafka_conf_t?*rd_kafka_conf_new?(void)

(2)设置Kafka Broker属性

rd_kafka_conf_res_t?rd_kafka_conf_set?(rd_kafka_conf_t?*conf, ???????????????????????????????????????const?char?*name, ???????????????????????????????????????const?char?*value, ???????????????????????????????????????char?*errstr,?size_t?errstr_size)

(3)创建Kakfa Topic配置实例

rd_kafka_topic_conf_t?*rd_kafka_topic_conf_new?(void)?

(4)设置Topic属性

rd_kafka_conf_res_t?rd_kafka_topic_conf_set?(rd_kafka_topic_conf_t?*conf, ?????const?char?*name, ?????const?char?*value, ?????char?*errstr,? size_t?errstr_size)

(5)创建Kakfa Consumer客户端实例

rd_kafka_t?*rd_kafka_new?(rd_kafka_type_t?type,? rd_kafka_conf_t?*conf,? char?*errstr,? size_t?errstr_size)

(6)为Cosnumer实例增加Brokers

int?rd_kafka_brokers_add?(rd_kafka_t?*rk,?const?char?*brokerlist)

(7)创建TopicPartition链表实例

rd_kafka_topic_partition_list_t?*rd_kafka_topic_partition_list_new?(int?size)

(8)增加TopicPartition实例

rd_kafka_topic_partition_t*?rd_kafka_topic_partition_list_add?( ????????rd_kafka_topic_partition_list_t?*rktparlist, ????????const?char?*topic,? int32_t?partition);

(9)为Consumer实例订阅Topic

rd_kafka_subscribe?(rd_kafka_t?*rk,?const?rd_kafka_topic_partition_list_t?*topics)

(10)轮询消息或事件并调用回调函数

rd_kafka_message_t?*rd_kafka_consumer_poll?(rd_kafka_t?*rk,int?timeout_ms)

(11)关闭Consumer实例

rd_kafka_resp_err_t?rd_kafka_consumer_close?(rd_kafka_t?*rk)

(12)释放TopicPartition链表

rd_kafka_topic_partition_list_destroy?(rd_kafka_topic_partition_list_t?*rktparlist)

(13)销毁Consumer实例

void?rd_kafka_destroy?(rd_kafka_t?*rk)?

(14)等待Consumer实例销毁

int?rd_kafka_wait_destroyed?(int?timeout_ms) 2、CKafkaConsumer

CKafkaConsumer.h文件:

#ifndef?CKAFKACONSUMER_H #define?CKAFKACONSUMER_H #pragma?once #include?<stdio.h> #include?<string.h> #include?<stdlib.h> #include?"rdkafka.h" typedef?void?(*?consumer_callback)(rd_kafka_message_t?*message,?void?*opaque); class?CKafkaConsumer { public: ????CKafkaConsumer(); ????~CKafkaConsumer(); ????int?init(char?*topic,?char?*brokers,?char?*partitions,?char?*groupId); ????void?registerConsumerCall(consumer_callback?consumer_cb,?void?*?param_cb); ????int?pullMessage();?//从kafka服务器接收消息 ????static?void?err_cb(rd_kafka_t?*rk,?int?err,?const?char?*reason,?void?*opaque); ????static?void?throttle_cb(rd_kafka_t?*rk,?const?char?*broker_name, ????????????????????????????int32_t?broker_id,?int?throttle_time_ms,?void?*opaque); ????static?void?offset_commit_cb(rd_kafka_t?*rk,?rd_kafka_resp_err_t?err, ?????????????????????????????????rd_kafka_topic_partition_list_t?*offsets,?void?*opaque); ????static?int?stats_cb(rd_kafka_t?*rk,?char?*json,?size_t?json_len,?void?*opaque); ????static?void?logger(const?rd_kafka_t?*rk,?int?level,?const?char?*fac,?const?char?*buf); ????static?void?msg_consume(rd_kafka_message_t?*rkmessage,?void?*opaque); protected: ????rd_kafka_t*?m_kafka_handle;??//kafka消费者实例 ????rd_kafka_topic_t*?m_kafka_topic;???//kafka?Topic实例 ????rd_kafka_conf_t*?m_kafka_conf;????//kafka?Config实例 ????rd_kafka_topic_conf_t*?m_kafka_topic_conf;//Kakfa?topic?config ????rd_kafka_topic_partition_list_t*?m_kafka_topic_partition_list; ????rd_kafka_queue_t*?m_kafka_queue;// ????consumer_callback??m_consumer_callback;?//消息回调函数 ????void*?m_consumer_callback_param;?//消息回调函数的参数 }; #endif?//?CKAFKACONSUMER_H

CKafkaConsumer.cpp文件:

#include?"CKafkaConsumer.h" CKafkaConsumer::CKafkaConsumer() { ????m_consumer_callback =?NULL; ????m_consumer_callback_param =?NULL; ????m_kafka_handle =?NULL; ????m_kafka_topic =?NULL; ????m_kafka_conf =?NULL; ????m_kafka_topic_conf =?NULL; ????m_kafka_topic_partition_list =?NULL; ????m_kafka_queue =?NULL; } CKafkaConsumer::~CKafkaConsumer() { ????rd_kafka_flush(m_kafka_handle,?10?*?1000); ????rd_kafka_topic_partition_list_destroy(m_kafka_topic_partition_list); ????rd_kafka_queue_destroy(m_kafka_queue); ????rd_kafka_topic_destroy(m_kafka_topic); ????rd_kafka_destroy(m_kafka_handle); } int?CKafkaConsumer::init(char?*topic,?char?*brokers,?char?*partitions,?char?*groupId) { ????m_kafka_conf =?rd_kafka_conf_new(); ????rd_kafka_conf_set_error_cb(m_kafka_conf,?err_cb); ????rd_kafka_conf_set_throttle_cb(m_kafka_conf,?throttle_cb); ????rd_kafka_conf_set_offset_commit_cb(m_kafka_conf,?offset_commit_cb); ????rd_kafka_conf_set_stats_cb(m_kafka_conf,?stats_cb); ????rd_kafka_conf_set_log_cb(m_kafka_conf,?logger); ????rd_kafka_conf_res_t?ret_conf?=?RD_KAFKA_CONF_OK; ????char?errstr[512]?=?{0}; ????//---------Consumer?config------------------- ????ret_conf?=?rd_kafka_conf_set(m_kafka_conf,?"queued.min.messages",?"1000000", ?????????????????????????????????errstr,?sizeof(errstr)); ????if(ret_conf?!=?RD_KAFKA_CONF_OK) ????{ ????????printf("Error:?rd_kafka_conf_set()?failed?1;?ret_conf=%d;?errstr:%s\n", ???????????????ret_conf,?errstr); ????????return?ret_conf; ????} ????ret_conf?=?rd_kafka_conf_set(m_kafka_conf,?"session.timeout.ms",?"6000", ?????????????????????????????????errstr,?sizeof(errstr)); ????if(ret_conf?!=?RD_KAFKA_CONF_OK) ????{ ????????printf("Error:?rd_kafka_conf_set()?failed?2;?ret_conf=%d;?errstr:%s\n", ???????????????ret_conf,?errstr); ????????return?ret_conf; ????} ????ret_conf?=?rd_kafka_conf_set(m_kafka_conf,?"group.id",?groupId,?errstr, ?????????????????????????????????sizeof(errstr)); ????if(ret_conf?!=?RD_KAFKA_CONF_OK) ????{ ????????printf("Error:?rd_kafka_conf_set()?failed?3;?ret_conf=%d;?errstr:%s\n", ???????????????ret_conf,?errstr); ????????return?ret_conf; ????} ????//---------Kafka?topic?config------------------- ????m_kafka_topic_conf =?rd_kafka_topic_conf_new(); ????ret_conf?=?rd_kafka_topic_conf_set(m_kafka_topic_conf,?"auto.offset.reset", ???????????????????????????????????????"earliest",?errstr,?sizeof(errstr)); ????if(ret_conf?!=?RD_KAFKA_CONF_OK) ????{ ????????printf("Error:?rd_kafka_topic_conf_set()?failed?4;?ret_conf=%d;?errstr:%s\n", ???????????????ret_conf,?errstr); ????????return?ret_conf; ????} ????m_kafka_topic_partition_list =?rd_kafka_topic_partition_list_new(1); ????int?len?=?strlen(partitions); ????char?*buffer?=?new?char[len?+?1]; ????char*?temp?=?buffer; ????sprintf(buffer,?"%s",?partitions);?//partitions="0,1,2"; ????while(*buffer?!=?'\0') ????{ ????????char*?s?=?strstr(buffer,?","); ????????if(s?!=?NULL) ????????{ ????????????*s?=?'\0'; ????????????int?partition?=?atoi(buffer); ????????????rd_kafka_topic_partition_list_add(m_kafka_topic_partition_list,?topic,?partition); ????????????buffer?=?s?+?1; ????????} ????????else ????????{ ????????????break; ????????} ????} ????delete?[]?temp; ????buffer?=?NULL; ????//---------Create?Kafka?handle------------------- ????m_kafka_handle =?rd_kafka_new(RD_KAFKA_CONSUMER,?m_kafka_conf,?errstr,?sizeof(errstr)); ????if(m_kafka_handle ==?NULL) ????{ ????????printf("Error:?Failed?to?create?Kafka?producer:?%s\n",?errstr); ????????return?-1; ????} ????rd_kafka_poll_set_consumer(m_kafka_handle);?//Redirect?rd_kafka_poll()?to?consumer_poll() ????//---------Add?broker(s)------------------- ????if(brokers?&&?rd_kafka_brokers_add(m_kafka_handle,?brokers)?<?1) ????{ ????????printf("Error:?No?valid?brokers?specified\n"); ????????return?-1; ????} ????int?partition?=?m_kafka_topic_partition_list->elems[0].partition; ????int?partition_cnt?=?m_kafka_topic_partition_list->cnt; ????m_kafka_topic =?rd_kafka_topic_new(m_kafka_handle,?topic,?m_kafka_topic_conf); ????m_kafka_queue =?rd_kafka_queue_new(m_kafka_handle); ????return?ret_conf; } void?CKafkaConsumer::registerConsumerCall(consumer_callback?consumer_cb,?void*?param_cb) { ????m_consumer_callback =?consumer_cb; ????m_consumer_callback_param =?param_cb; } int?CKafkaConsumer::pullMessage() { ????int?ret?=?0; ????char?*?topic?=?m_kafka_topic_partition_list->elems[0].topic; ????int?partition?=?m_kafka_topic_partition_list->elems[0].partition; ????int?partition_cnt?=?m_kafka_topic_partition_list->cnt; ????//?RD_KAFKA_OFFSET_BEGINNING?|?RD_KAFKA_OFFSET_END?|?RD_KAFKA_OFFSET_STORED ????int64_t?start_offset?=?RD_KAFKA_OFFSET_END; ????//------------从kafka服务器接收消息---------------- ????for(int?i?=?0;?i?<?partition_cnt;?i++) ????{ ????????int?partition?=?m_kafka_topic_partition_list->elems[i].partition; ????????int?r?=?rd_kafka_consume_start_queue(m_kafka_topic,?partition,?start_offset,?m_kafka_queue); ????????if(r?==?-1) ????????{ ????????????printf("Error:?creating?queue:?%s\n",?rd_kafka_err2str(rd_kafka_last_error())); ????????????return?-1; ????????} ????} ????while(1) ????{ ????????int?r?=?rd_kafka_consume_callback_queue(m_kafka_queue,?1000,?msg_consume,?this);?//Queue?mode ????????if(r?<=?0) ????????{ ????????????rd_kafka_poll(m_kafka_handle,?1000); ????????????continue; ????????} ????????rd_kafka_poll(m_kafka_handle,?0);?//Poll?to?handle?stats?callbacks ????} ????//----------Stop?consuming------------------------------ ????for(int?i?=?0;?i?<?partition_cnt;?i++) ????{ ????????int?r?=?rd_kafka_consume_stop(m_kafka_topic,?(int32_t)i); ????????if(r?==?-1) ????????{ ????????????printf("Error:?in?consume_stop:?%s\n",?rd_kafka_err2str(rd_kafka_last_error())); ????????} ????} ????return?ret; } void?CKafkaConsumer::err_cb(rd_kafka_t?*rk,?int?err,?const?char?*reason,?void?*opaque) { ????printf("ERROR?CALLBACK:?%s:?%s:?%s\n",?rd_kafka_name(rk), ???????????rd_kafka_err2str((rd_kafka_resp_err_t)err),?reason); } void?CKafkaConsumer::throttle_cb(rd_kafka_t?*rk,?const?char?*broker_name, ?????????????????????????????????int32_t?broker_id,?int?throttle_time_ms, ?????????????????????????????????void?*opaque) { ????printf("THROTTLED?%dms?by?%s?(%d)\n",?throttle_time_ms,?broker_name,?broker_id); } void?CKafkaConsumer::offset_commit_cb(rd_kafka_t?*rk,?rd_kafka_resp_err_t?err, ??????????????????????????????????????rd_kafka_topic_partition_list_t?*offsets, ??????????????????????????????????????void?*opaque) { ????int?i; ????int?verbosity?=?1; ????if(err?||?verbosity?>=?2) ????{ ????????printf("Offset?commit?of?%d?partition(s):?%s\n",?offsets->cnt, ???????????????rd_kafka_err2str(err)); ????} ????for(i?=?0;?i?<?offsets->cnt;?i++) ????{ ????????rd_kafka_topic_partition_t?*?rktpar?=?&offsets->elems[i]; ????????if(rktpar->err ||?verbosity?>=?2) ????????{ ????????????printf("%s?[%d]?@?%ld:?%s\n",?rktpar->topic,?rktpar->partition, ???????????????????rktpar->offset,?rd_kafka_err2str(err)); ????????} ????} } int?CKafkaConsumer::stats_cb(rd_kafka_t?*rk,?char?*json,?size_t?json_len,?void?*opaque) { ????printf("%s\n",?json); ????return?0; } void?CKafkaConsumer::logger(const?rd_kafka_t?*rk,?int?level,?const?char?*fac,?const?char?*buf) { ????fprintf(stdout,?"RDKAFKA-%i-%s:?%s:?%s\n",?level,?fac,?rd_kafka_name(rk),?buf); } void?CKafkaConsumer::msg_consume(rd_kafka_message_t?*rkmessage,?void?*opaque) { ????CKafkaConsumer*?consumer?=?(CKafkaConsumer?*)opaque; ????if(consumer?&&?consumer->m_consumer_callback) ????{ ????????consumer->m_consumer_callback(rkmessage,?consumer->m_consumer_callback_param); ????????return; ????} ????if(rkmessage->err) ????{ ????????if(rkmessage->err ==?RD_KAFKA_RESP_ERR__PARTITION_EOF) ????????{ ????????????printf("[INFO]?Consumer?reached?end?of?%s?[%d]?message?queue?at?offset?%ld\n",?rd_kafka_topic_name(rkmessage->rkt),?rkmessage->partition,?rkmessage->offset); ????????????return; ????????} ????????printf("Error:?Consume?error?for?topic?\"%s\"?[%d]?offset?%ld:?%s\n",?rkmessage->rkt ??rd_kafka_topic_name(rkmessage->rkt)?:?"",?rkmessage->partition,?rkmessage->offset,?rd_kafka_message_errstr(rkmessage)); ????????return; ????} ????if(rkmessage->key_len) ????{ ????????printf("Key:?%d:?%s\n",?(int)rkmessage->key_len,?(char?*)rkmessage->key); ????} ????printf("%d:?%s\n",?(int)rkmessage->len,?(char?*)rkmessage->payload); }

main.cpp文件:

#include?"CKafkaConsumer.h" static?void?msg_consume(rd_kafka_message_t?*rkmessage,?void?*opaque) { ????printf("[MSG]?%d:?%s\n",?(int)rkmessage->len,?(char*)rkmessage->payload); } int?main(int?argc,?char?*argv[]) { ????char?topic[]?=?"test"; ????char?brokers[]?=?"192.168.0.105:9092"; ????char?partitions[]?=?"0,1,2"; ????char?groupId[]?=?"testGroup"; ????CKafkaConsumer?consumer; ????consumer.init(topic,?brokers,?partitions,?groupId); ????//注册消息回调函数,用户可以自定义此函数 ????consumer_callback?consumer_cb?=?msg_consume; ????void?*?param_cb?=?NULL;?//param_cb=this; ????consumer.registerConsumerCall(consumer_cb,?param_cb); ????int?ret?=?consumer.pullMessage();?//从kafka服务器接收消息 ????if(ret?!=?0) ????{ ????????printf("Error:?CKafkaConsumer.pullMessage():?ret=%d;\n",?ret); ????} ????return?0; }

CMakeList.txt文件:

cmake_minimum_required(VERSION?2.8) project(CKafkaConsumer) set(CMAKE_CXX_STANDARD?11) set(CMAKE_CXX_COMPILER?"g++") #set(CMAKE_CXX_FLAGS?"-std=c++11?${CMAKE_CXX_FLAGS}") set(CMAKE_INCLUDE_CURRENT_DIR?ON) #?Kafka头文件路径 include_directories(/usr/local/include/librdkafka) #?Kafka库路径 link_directories(/usr/local/lib) aux_source_directory(.?SOURCE) add_executable(${PROJECT_NAME}?${SOURCE}) TARGET_LINK_LIBRARIES(${PROJECT_NAME}?rdkafka) 五、RdKafka性能测试 1、RdKafka性能测试工具

RdKafka提供了性能测试工具,源码位于librdkafka/examples/rdkafka_performance.c,编译构建后执行文件为rdkafka_performance,rdkafka_performance命令选项如下:

./rdkafka_performance?[-C|-P]?-t?<topic>?[-p?<partition>]? [-b?<broker,broker..>]?[options..]

-C:消费者模式

-P:生产者模式

-t:Topic

-p:分区

-b:Broker,多个使用逗号分隔

-s:生产者生产消息的大小

-c:生产或消费消息的总数

-B:批量消费的数量

-z:压缩算法,可选:none,snappy,gzip,lz4

测试环境:RHEL7.3 8G Intel i5 ?SSD

Kafka集群:Docker容器、单分区

2、Producer性能测试 rdkafka_performance -P -t test -p 0 -b 192.168.0.105:9092 -s 100

rdkafka_performance -P -t test -p 0 -b 192.168.0.105:9092 -s 300

rdkafka_performance -P -t test -p 0 -b 192.168.0.105:9092 -s 500

rdkafka_performance -P -t test -p 0 -b 192.168.0.105:9092 -s 100 -z lz4

rdkafka_performance -P -t test -p 0 -b 192.168.0.105:9092 -s 300 -z lz4

rdkafka_performance -P -t test -p 0 -b 192.168.0.105:9092 -s 500 -z lz4

?从测试结果看,不使用压缩算法,消息大小为100、300、500字节时,消息生产速度分别为1100000/s、600000/s、400000/s;使用lz4压缩算法时,消息大小100、300、500字节时,消息生产速度分别为1300000/s、1300000/s、1200000/s。

3、Consumer性能测试 rdkafka_performance -C -t test -p 0 -b 192.168.0.105:9092

rdkafka_performance -C -t test -p 0 -b 192.168.0.105:9092 -z lz4

?从测试结果看,不使用压缩算法,消息消费速度为1600000/s;使用lz4压缩算法时,消息消费速度为1500000/s。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #C #kafka客户端 #install #librdkafka