在开始具体介绍各种并发方法之前,这里简单总结一下选择建议:
方法 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
threading | I/O密集型任务 | 创建线程开销小,共享内存空间便于数据交换,适合处理网络请求等I/O操作 | Python的GIL限制导致CPU密集型任务无法真正并行,多线程切换也会带来额外开销 |
multiprocessing | CPU密集型任务 | 可以充分利用多核CPU实现真正的并行计算,每个进程有独立的GIL | 进程创建和管理开销大,进程间通信复杂,内存占用高,启动时间较长 |
concurrent.futures | 简单并行任务 | 提供统一的高级接口同时支持线程池和进程池,使用map等函数式API更直观 | 无法细粒度控制任务调度,不支持复杂的并发模式,功能相对基础 |
asyncio | I/O密集型任务 | 单线程实现并发,避免线程切换开销,支持大量并发连接,性能优异 | 需要使用async/await特殊语法,第三方库需要专门支持,不适合CPU密集型任务 |
joblib | 科学计算 | 针对科学计算优化,支持结果缓存和进度条显示,与numpy/scipy等库集成好 | 主要面向数据处理和机器学习场景,通用并发功能有限,不适合复杂的并发控制 |
适用于I/O密集型任务的简单示例,并发下载多个文件:
import threading
import requests
def download_file(url, filename):
response = requests.get(url)
with open(filename, 'wb') as f:
f.write(response.content)
print(f"Downloaded {filename}")
# 创建多个线程下载文件
urls = ['<http://example.com/file1>', '<http://example.com/file2>']
threads = []
for i, url in enumerate(urls):
thread = threading.Thread(target=download_file, args=(url, f'file{i}.txt'))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
适用于CPU密集型任务的示例,并行计算数字平方:
from multiprocessing import Process, Queue
def calculate_squares(numbers, queue):
for n in numbers:
queue.put(n * n)
if __name__ == '__main__':
numbers_list = [[1,2,3], [4,5,6], [7,8,9]]
queue = Queue()
processes = []
# 创建多个进程
for numbers in numbers_list:
p = Process(target=calculate_squares, args=(numbers, queue))
processes.append(p)
p.start()
# 等待所有进程完成
for p in processes:
p.join()
# 获取结果
results = []
while not queue.empty():
results.append(queue.get())
print(results)
使用线程池处理多个任务的示例:
from concurrent.futures import ThreadPoolExecutor
import time
def process_item(item):
time.sleep(1)
return f"Processed {item}"
with ThreadPoolExecutor(max_workers=3) as executor:
items = ['A', 'B', 'C', 'D', 'E']
results = list(executor.map(process_item, items))
print(results)
异步处理多个网络请求的示例:
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'<http://example.com/1>',
'<http://example.com/2>',
'<http://example.com/3>'
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 运行异步任务
results = asyncio.run(main())
用joblib和tqdm可以实现非常好的并发数据处理:
from joblib import Parallel, delayed
from tqdm.auto import tqdm
import numpy as np
def process_array(arr):
return np.sum(arr ** 2)
# 创建示例数据
data = [np.random.rand(1000) for _ in range(100)]
# 并行处理数据
results = Parallel(n_jobs=4)(
delayed(process_array)(x) for x in tqdm(data)
)
© 2025 Binbin Shen.