irpas技术客

python-kafka多线程快速读取consumer消费者数据,同时使用批读取与无限流读取改进_呆萌的代Ma_kafkaconsumer python

网络投稿 4086

python单线程循环读取consumer会很浪费时间,而且速率远远低于生产者可容纳的速率,因此我们使用多线程来处理IO密集型的读取操作

文章目录 极简的示例1. 生产者(先运行)2. 消费者部分多线程读取 消费者改进 1:批次读取,并将读取到的数据返回消费者改进 2:无限读取kafka数据

极简的示例

我们直接上一个极简示例,没有任何花里胡哨的部分:

1. 生产者(先运行)

先运行生产者,再运行消费者部分哈:

from kafka import KafkaProducer import datetime import json # 启动生产者 producer = KafkaProducer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2)) my_topic = "python_test" for i in range(100): data = {'num': i, 'data': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} producer.send(my_topic, json.dumps(data).encode('utf-8')).get(timeout=30) 2. 消费者部分多线程读取 from kafka import KafkaConsumer import time import threading from concurrent.futures import ThreadPoolExecutor from kafka.structs import TopicPartition class MultiThreadKafka(object): def __init__(self): self.seek = 0 # 偏移量 def operate(self): consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2)) tp = TopicPartition("python_test", 0) consumer.assign([tp]) for i in range(10): consumer.seek(tp, self.seek) self.seek += 1 consumer_data = next(consumer) print(threading.current_thread().name) # 打印线程名 print(consumer_data) # 打印 time.sleep(1) def main(self): thread_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="threading_") # 我们使用线程池统一管理线程 for i in range(4): thread_pool.submit(self.operate, ) # 把四个线程全部投入读取数据的部分 if __name__ == '__main__': thread_kafka = MultiThreadKafka() thread_kafka.main()

打印效果如下,我们可以看到每个线程都在运行,而且根据offset可以发现没有重复读取的问题,因此我们可以放心的使用这个模板来进行数据读取

threading__0 threading__3 ConsumerRecord(topic='python_test', partition=0, offset=1, timestamp=1646103355117, timestamp_type=0, key=None, value=b'{"num": 1, "ts": "2022-03-01 10:55:55"}', headers=[], checksum=1621741028, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1) ConsumerRecord(topic='python_test', partition=0, offset=3, timestamp=1646103355120, timestamp_type=0, key=None, value=b'{"num": 3, "ts": "2022-03-01 10:55:55"}', headers=[], checksum=3977437705, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1) threading__2 ConsumerRecord(topic='python_test', partition=0, offset=0, timestamp=1646103355013, timestamp_type=0, key=None, value=b'{"num": 0, "ts": "2022-03-01 10:55:55"}', headers=[], checksum=4023108697, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1) threading__1 ConsumerRecord(topic='python_test', partition=0, offset=2, timestamp=1646103355119, timestamp_type=0, key=None, value=b'{"num": 2, "ts": "2022-03-01 10:55:55"}', headers=[], checksum=149040431, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1) threading__0 ConsumerRecord(topic='python_test', partition=0, offset=4, timestamp=1646103355121, timestamp_type=0, key=None, value=b'{"num": 4, "ts": "2022-03-01 10:55:55"}', headers=[], checksum=4162098402, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1) threading__3 threading__2 ConsumerRecord(topic='python_test', partition=0, offset=6, timestamp=1646473931267, timestamp_type=0, key=None, value=b'{"num": 1, "data": "2022-03-05 17:52:11"}', headers=[], checksum=2205992463, serialized_key_size=-1, serialized_value_size=41, serialized_header_size=-1) ConsumerRecord(topic='python_test', partition=0, offset=5, timestamp=1646473931131, timestamp_type=0, key=None, value=b'{"num": 0, "data": "2022-03-05 17:52:11"}', headers=[], checksum=13992341, serialized_key_size=-1, serialized_value_size=41, serialized_header_size=-1) threading__1 ConsumerRecord(topic='python_test', partition=0, offset=7, timestamp=1646473931268, timestamp_type=0, key=None, value=b'{"num": 2, "data": "2022-03-05 17:52:11"}', headers=[], checksum=4062066255, serialized_key_size=-1, serialized_value_size=41, serialized_header_size=-1) threading__0 ConsumerRecord(topic='python_test', partition=0, offset=8, timestamp=1646473931269, timestamp_type=0, key=None, value=b'{"num": 3, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1088274253, serialized_key_size=-1, serialized_value_size=41, serialized_header_size=-1) ....
消费者改进 1:批次读取,并将读取到的数据返回

上面的程序是打印结果,但是我们往往不需要打印出来,而是通过一个函数获取消费端的数据:

from kafka import KafkaConsumer from concurrent.futures import ThreadPoolExecutor, as_completed from kafka.structs import TopicPartition class MultiThreadKafka(object): def __init__(self): self.seek = 0 # 偏移量 self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2)) self.tp = TopicPartition("python_test", 0) self.consumer.assign([self.tp]) def operate(self): data_list = [] for i in range(3): # 每个批次读取3条数据 self.consumer.seek(self.tp, self.seek) self.seek += 1 consumer_data = next(self.consumer) consumer_value: dict = eval(consumer_data.value.decode("utf-8")) # 这里把核心数据提取出来 data_list.append(consumer_value) return data_list # 读取3个数据了 def main(self): thread_pool = ThreadPoolExecutor(max_workers=4) thread_mission_list = [] for i in range(20): run_thread = thread_pool.submit(self.operate) thread_mission_list.append(run_thread) for mission in as_completed(thread_mission_list): # 这里会等待线程执行完毕,先完成的会先显示出来 yield mission.result() # 获取线程执行的结果,打包成一个迭代器返回 if __name__ == '__main__': thread_kafka = MultiThreadKafka() kafka_data_generator = thread_kafka.main() # 迭代器 for i in kafka_data_generator: # 这里可以打印出结果,每个批次的值都附在了这个i上 print(i)

这样所有的批处理数据都会保存在变量i

消费者改进 2:无限读取kafka数据

在main函数中使用while True即可:

from kafka import KafkaConsumer from concurrent.futures import ThreadPoolExecutor, as_completed from kafka.structs import TopicPartition class MultiThreadKafka(object): def __init__(self): self.seek = 0 # 偏移量 self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2)) self.tp = TopicPartition("python_test", 0) self.consumer.assign([self.tp]) def operate(self): data_list = [] for i in range(3): # 每个批次读取3条数据 self.consumer.seek(self.tp, self.seek) self.seek += 1 consumer_data = next(self.consumer) consumer_value: dict = eval(consumer_data.value.decode("utf-8")) # 这里把核心数据提取出来 data_list.append(consumer_value) return data_list # 读取3个数据了 def main(self): thread_pool = ThreadPoolExecutor(max_workers=4) thread_mission_list = [] while True: # 这里无限循环,就可以无限读取 for i in range(5): run_thread = thread_pool.submit(self.operate) thread_mission_list.append(run_thread) for mission in as_completed(thread_mission_list): # 这里会等待线程执行完毕,先完成的会先显示出来 yield mission.result() if __name__ == '__main__': thread_kafka = MultiThreadKafka() kafka_data_generator = thread_kafka.main() # 迭代器 for i in kafka_data_generator: print(i)


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #kafkaconsumer #Python #生产者先运行2 #消费者部分多线程读取消费者改进