在开始具体介绍各种并发方法之前,这里简单总结一下选择建议:

方法 适用场景 优点 缺点
threading I/O密集型任务 创建线程开销小,共享内存空间便于数据交换,适合处理网络请求等I/O操作 Python的GIL限制导致CPU密集型任务无法真正并行,多线程切换也会带来额外开销
multiprocessing CPU密集型任务 可以充分利用多核CPU实现真正的并行计算,每个进程有独立的GIL 进程创建和管理开销大,进程间通信复杂,内存占用高,启动时间较长
concurrent.futures 简单并行任务 提供统一的高级接口同时支持线程池和进程池,使用map等函数式API更直观 无法细粒度控制任务调度,不支持复杂的并发模式,功能相对基础
asyncio I/O密集型任务 单线程实现并发,避免线程切换开销,支持大量并发连接,性能优异 需要使用async/await特殊语法,第三方库需要专门支持,不适合CPU密集型任务
joblib 科学计算 针对科学计算优化,支持结果缓存和进度条显示,与numpy/scipy等库集成好 主要面向数据处理和机器学习场景,通用并发功能有限,不适合复杂的并发控制

1. threading - 多线程

适用于I/O密集型任务的简单示例,并发下载多个文件:

import threading
import requests

def download_file(url, filename):
    response = requests.get(url)
    with open(filename, 'wb') as f:
        f.write(response.content)
    print(f"Downloaded {filename}")

# 创建多个线程下载文件
urls = ['<http://example.com/file1>', '<http://example.com/file2>']
threads = []
for i, url in enumerate(urls):
    thread = threading.Thread(target=download_file, args=(url, f'file{i}.txt'))
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

2. multiprocessing - 多进程

适用于CPU密集型任务的示例,并行计算数字平方:

from multiprocessing import Process, Queue

def calculate_squares(numbers, queue):
    for n in numbers:
        queue.put(n * n)

if __name__ == '__main__':
    numbers_list = [[1,2,3], [4,5,6], [7,8,9]]
    queue = Queue()
    processes = []
    
    # 创建多个进程
    for numbers in numbers_list:
        p = Process(target=calculate_squares, args=(numbers, queue))
        processes.append(p)
        p.start()
    
    # 等待所有进程完成
    for p in processes:
        p.join()
    
    # 获取结果
    results = []
    while not queue.empty():
        results.append(queue.get())
    print(results)

3. concurrent.futures - 线程池和进程池

使用线程池处理多个任务的示例:

from concurrent.futures import ThreadPoolExecutor
import time

def process_item(item):
    time.sleep(1)
    return f"Processed {item}"

with ThreadPoolExecutor(max_workers=3) as executor:
    items = ['A', 'B', 'C', 'D', 'E']
    results = list(executor.map(process_item, items))
    
print(results)

4. asyncio - 异步编程

异步处理多个网络请求的示例:

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        '<http://example.com/1>',
        '<http://example.com/2>',
        '<http://example.com/3>'
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# 运行异步任务
results = asyncio.run(main())

5. joblib - 并行计算

用joblib和tqdm可以实现非常好的并发数据处理:

from joblib import Parallel, delayed
from tqdm.auto import tqdm
import numpy as np

def process_array(arr):
    return np.sum(arr ** 2)

# 创建示例数据
data = [np.random.rand(1000) for _ in range(100)]

# 并行处理数据
results = Parallel(n_jobs=4)(
    delayed(process_array)(x) for x in tqdm(data)
)

© 2025 Binbin Shen.