irpas技术客

Python 并发编程_Adenialzz_python 并发

irpas 1065

Python 并发编程

本文为 https://·blogs.com' urls = [base_url+f'/#p{i}' for i in range(1, 51)] def craw(url): response = requests.get(url) print(url, len(response.text)) def single_thread(): for url in urls: craw(url) def multi_thread(): threads = [] for url in urls: threads.append( threading.Thread(target=craw, args=(url, )) ) for thread in threads: thread.start() for thread in threads: thread.join() if __name__ == '__main__': start = time.time() single_thread() end = time.time() print("Single Thread Crawling Cost", end-start, "seconds.") start = time.time() multi_thread() end = time.time() print("Multi Thread Crawling Cost", end-start, "seconds.")

输出:

https://·blogs.com/#p2 69631 ... https://·blogs.com/#p50 69631 Single Thread Crawling Cost 3.4501893520355225 seconds. https://·blogs.com/#p3 69631 ... https://·blogs.com/#p48 69631 https://·blogs.com' urls = [base_url+f'/#p{i}' for i in range(1, 51)] def craw(url): # 消费者 response = requests.get(url) return response.text def parse(html): # 生产者 soup = BeautifulSoup(html, 'html.parser') links = soup.find_all('a', class_='post-item-title') return [(link['href'], link.get_text()) for link in links] def do_craw(url_queue: queue.Queue, html_queue: queue.Queue): while True: url = url_queue.get() html = craw(url) html_queue.put(html) print(threading.current_thread().name, f"craw {url}", "url_queue.size = ", url_queue.qsize()) time.sleep(random.randint(1, 2)) def do_parse(html_queue: queue.Queue, fout): while True: html = html_queue.get() results = parse(html) for result in results: fout.write(str(result) + '\n') print(threading.current_thread().name, f"results.size {len(results)}", "html_queue.size = ", html_queue.qsize()) time.sleep(random.randint(1, 2)) if __name__ == '__main__': url_queue = queue.Queue() html_queue = queue.Queue() for url in urls: url_queue.put(url) for idx in range(3): t = threading.Thread( target=do_craw, args=(url_queue, html_queue), name=f"craw {idx}" ) t.start() fout = open('result.txt', 'w') for idx in range(2): t = threading.Thread( target=do_parse, args=(html_queue, fout), name=f"parse {idx}" ) t.start()

输出 :

craw 1 craw https://·blogs.com/#p3 url_queue.size = 47 parse 1 results.size 20 html_queue.size = 0 ... parse 0 results.size 20 html_queue.size = 5 parse 1 results.size 20 html_queue.size = 4 parse 1 results.size 20 html_queue.size = 3 parse 0 results.size 20 html_queue.size = 2 parse 1 results.size 20 html_queue.size = 1 parse 0 results.size 20 html_queue.size = 0

可以看到,由于生产者有3个线程,消费者有2个线程,即页面的爬取比解析的要快,故 html_queue 中的元素个数有升有降,但总体是在增加。最终生产者已经将 url_queue 中的 url 全部爬取完毕,随后消费者才将 html_queue 中的 html 全部解析。

Python并发编程中的线程安全问题 线程安全概念的介绍

线程安全指的是某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程间的的共享变量,是程序功能正确完成。

由于线程的执行随时会发生切换,就会造成不可预料的后果,出现线程的不安全。

注意线程并发不安全的问题只有在线程切换时机恰好导致了数据竞争时才会出现,也就是说有时会出现,有时又不会出现。

Lock用于保证线程安全

用法1:try-finally 模式

import threading lock = threading.Lock() lock.acquire() try: # do something finally: lock,release()

用法2:with模式

import threading lock = threading.Lock() with lock: # do something 代码示例

我们先看不加锁的版本:

import threading import time class Account: def __init__(self, balance): self.balance = balance def draw(account, number): if account.balance >= number: print(threading.current_thread().name, '取钱成功') # time.sleep(0.1) account.balance -= number print(threading.current_thread().name, '余额: ', account.balance) else: print(threading.current_thread().name, '余额不足') if __name__ == '__main__': acc = Account(1000) ta = threading.Thread(name='ta', target=draw, args=(acc, 800)) tb = threading.Thread(name='tb', target=draw, args=(acc, 800)) ta.start() tb.start()

这是不加锁的版本,有可能会出现数据竞争导致出错:

ta 取钱成功 tb 取钱成功 tb 余额: 200 ta 余额: -600

但实际上,笔者在实测的时候很难出错,试了很多次都是“正常的”:

ta 取钱成功 ta 余额: 200 tb 余额不足

当然,前面也提到了,这种 ”正常“ 其实是一种假象,只是刚好线程的调度没有导致数据竞争的问题而已。为了复现出错误,可以像笔者一样,在 if 判断之后,加上0.1s的暂停,它会导致线程的阻塞,从而导致线程的切换,而如果在这里发生了线程的切换的话,是一定会出现数据竞争的问题的。

加锁保护临界区:

import threading import time lock = Lock = threading.Lock() class Account: def __init__(self, balance): self.balance = balance def draw(account, number): with lock: # 加锁保护 if account.balance >= number: print(threading.current_thread().name, '取钱成功') # time.sleep(0.1) account.balance -= number print(threading.current_thread().name, '余额: ', account.balance) else: print(threading.current_thread().name, '余额不足') if __name__ == '__main__': acc = Account(1000) ta = threading.Thread(name='ta', target=draw, args=(acc, 800)) tb = threading.Thread(name='tb', target=draw, args=(acc, 800)) ta.start() tb.start()

加速保护操作临界区变量的代码段之后,不会再出现问题。

线程池ThreadPoolExecutor 线程池的原理

新建线程需要系统分配资源,终止线程需要系统回收资源,如果可以重用线程,则可以省去新建/终止的开销。

使用线程池的好处:

提升性能:省去大量新建、终止线程的开销,重用了线程资源防御功能:能够有效避免因为创建线程过多,而导致系统符合过大而响应变慢的问题代码优势:使用线程池的语法相比于自己新建线程执行线程更加简洁

使用场景:适合处理突发性大量请求或需要大量线程完成任务,但实际任务处理时间较短

ThreadPoolExecutor的语法

用法1:map

from concurrent.futures import ThreadPoolExecutor with TheadPoolExecutor() as pool: results = pool.map(craw, urls) for result in results: print(result)

map的结果和入参的顺序是对应的

用法2:future

from concurrent.futures import ThreadPoolExecutor, as_completed with ThreadPoolExecutor() as pool: futures = [pool.submit(craw, url) for url in urls] for future in futures: print(future.result()) for future in as_completed(futures): print(future.result())

future模式,更强大,注意 as_completed 包裹执行的话会先完成,先返回,而非按照入参的顺序,是否要用 as_completed 需要看实际情况。

代码示例:使用线程池改造爬虫程序

我们接下来使用线程池的方法实现上面的爬虫的例子:

import concurrent.futures import requests from bs4 import BeautifulSoup base_url = 'https://·blogs.com' urls = [base_url+f'/#p{i}' for i in range(1, 51)] async def async_craw(url): print("craw url: ", url) async with aiohttp.ClientSession() as session: async with session.get(url) as resp: result = await resp.text() print("craw url: ", url) loop = asyncio.get_event_loop() tasks = [ loop.create_task(async_craw(url)) for url in urls ] import time start = time.time() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print(f"async spider cost time: {end-start} seconds")

输出:

... craw url: https://·blogs.com/#p38 craw url: https://·blogs.com/#p33 async spider cost time: 0.18497681617736816 seconds

可以看到,由于没有线程切换的开销,在IO密集型计算中,协程比多线程技术还要快。

在异步IO中使用信号量来控制并发度

信号量(semaphore)是一个同步对象,用于保持在0至指定最大值之间的一个计数值

当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一当线程完成一次对该semaphore对象的释放(realease)时,该计数值加一当计数值为0时,则线程等待该seamphore对象不再能成功直至该semaphore对象变为signaled状态seamphore对象的计数值为0时,为signaled状态;等于0时为nonsignaled状态 使用方法

与锁类似,有两种用法:

用法1

sem = asyncio.Semaphore(10) # ... later async with sem: # work with shared resource

用法2

sem = asyncio.Semaphore(10) # ...later await sem.acquire() try: # work with shared resource finally: sem.release() 爬虫代码中使用sem控制并发度

仅需有三处调整:

import asyncio import aiohttp base_url = 'https://·' urls = [base_url+f'/#p{i}' for i in range(1, 51)] # sem控制并发度 sem = asyncio.Semaphore(10) async def async_craw(url): async with sem: # sem控制并发度,并调整缩进 print("craw url: ", url) async with aiohttp.ClientSession() as session: async with session.get(url) as resp: result = await resp.text() await asyncio.sleep(5) # 更直观地体现并发度 print("craw url: ", url) loop = asyncio.get_event_loop() tasks = [ loop.create_task(async_craw(url)) for url in urls ] import time start = time.time() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print(f"async spider cost time: {end-start} seconds")

Ref:

https://blog.csdn.net/xili2532/article/details/117489248

https://·/p/cbf9588b2afb

https://·/video/BV1bK411A7tV?p=1


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Python #并发 #并发编程本文为 #课程笔记