各位老铁,大家好!今天咱们聊聊Python并发编程那些事儿,从multiprocess
一路走到asyncio
,看看Python是怎么一步步解决并发难题的。
开场白:别再让你的CPU闲着了!
话说,各位写Python代码的,有没有觉得你的CPU有时候闲得发慌?明明服务器配置挺高,跑个程序慢得跟蜗牛爬似的。这很可能就是因为你没用好并发编程。单线程的Python就像一个厨师一次只能炒一道菜,即使他有十个炉子也只能眼巴巴地看着九个炉子空着。并发编程呢,就是让你的厨师学会同时炒多道菜,或者干脆多雇几个厨师(多进程),这样才能充分利用资源,让你的程序跑得飞起。
第一章:多进程(Multiprocessing):人多力量大!
最简单的并发方式,莫过于多进程了。每个进程都有自己独立的内存空间,就像开了好几家餐馆,互不干扰,各自负责。
- 原理: 利用操作系统的多进程机制,创建多个独立的Python解释器实例。
- 优点:
- 充分利用多核CPU,并行执行计算密集型任务。
- 进程间相互隔离,一个进程崩溃不会影响其他进程。
- 缺点:
- 进程创建和销毁开销大,占用更多内存。
- 进程间通信复杂,需要使用
Queue
、Pipe
等机制。
- 适用场景: CPU密集型任务,例如科学计算、图像处理、视频编码等。
代码示例:
import multiprocessing
import time
def worker(num):
"""模拟一个耗时任务"""
print(f"进程 {num} 开始工作...")
time.sleep(2) # 模拟耗时操作
print(f"进程 {num} 完成工作!")
if __name__ == '__main__':
start_time = time.time()
processes = []
for i in range(4):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join() # 等待所有进程结束
end_time = time.time()
print(f"所有进程完成,耗时 {end_time - start_time:.2f} 秒")
代码解释:
multiprocessing.Process
创建一个新的进程。target
参数指定进程要执行的函数。args
参数传递给函数的参数。p.start()
启动进程。p.join()
等待进程执行完毕。
进程间通信:
进程之间不能直接共享内存,所以需要一些特殊的机制来传递数据。
- Queue: 类似于队列,可以安全地在进程之间传递数据。
- Pipe: 管道,提供双向通信能力。
- Shared Memory: 共享内存,允许进程直接访问同一块内存区域(需要同步机制)。
代码示例(使用Queue):
import multiprocessing
import time
def producer(queue):
"""生产者,向队列中放入数据"""
for i in range(5):
time.sleep(1)
message = f"消息 {i}"
print(f"生产者放入:{message}")
queue.put(message)
def consumer(queue):
"""消费者,从队列中取出数据"""
while True:
message = queue.get()
print(f"消费者取出:{message}")
if message == "消息 4": # 假设收到最后一条消息后退出
break
if __name__ == '__main__':
queue = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(queue,))
p2 = multiprocessing.Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
print("Done!")
第二章:多线程(Threading):轻量级选手,但要小心GIL!
多线程在一个进程内创建多个线程,共享进程的内存空间,比多进程更轻量级。
- 原理: 利用操作系统的多线程机制,在一个进程中创建多个执行流。
- 优点:
- 线程创建和销毁开销小,占用内存少。
- 线程间共享内存,通信方便。
- 缺点:
- GIL(Global Interpreter Lock) 限制了同一时刻只能有一个线程执行Python字节码,无法充分利用多核CPU进行CPU密集型任务。
- 线程间共享内存,需要注意线程安全问题,使用锁等同步机制。
- 适用场景: I/O密集型任务,例如网络请求、文件读写等。
代码示例:
import threading
import time
def worker(num):
"""模拟一个耗时任务"""
print(f"线程 {num} 开始工作...")
time.sleep(2) # 模拟耗时操作
print(f"线程 {num} 完成工作!")
if __name__ == '__main__':
start_time = time.time()
threads = []
for i in range(4):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join() # 等待所有线程结束
end_time = time.time()
print(f"所有线程完成,耗时 {end_time - start_time:.2f} 秒")
GIL的限制:
由于GIL的存在,多线程在CPU密集型任务中并不能带来性能提升,甚至可能更慢,因为线程切换也会消耗资源。
绕过GIL:
- 使用多进程: 每个进程都有独立的GIL,可以充分利用多核CPU。
- 使用C扩展: 将CPU密集型任务交给C扩展处理,C代码可以释放GIL。
- 使用
concurrent.futures
模块: 提供了线程池和进程池,可以方便地进行并发编程。
代码示例(使用concurrent.futures
):
import concurrent.futures
import time
def worker(num):
"""模拟一个耗时任务"""
print(f"任务 {num} 开始工作...")
time.sleep(2) # 模拟耗时操作
print(f"任务 {num} 完成工作!")
return f"任务 {num} 完成!"
if __name__ == '__main__':
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: # 使用线程池
futures = [executor.submit(worker, i) for i in range(4)]
for future in concurrent.futures.as_completed(futures):
print(future.result())
end_time = time.time()
print(f"所有任务完成,耗时 {end_time - start_time:.2f} 秒")
线程安全:
由于线程间共享内存,需要注意线程安全问题。
- 锁(Lock): 保证同一时刻只有一个线程可以访问共享资源。
- 信号量(Semaphore): 限制同时访问共享资源的线程数量。
- 条件变量(Condition): 允许线程等待特定条件满足。
- 队列(Queue): 线程安全的数据结构,可以方便地在线程之间传递数据。
代码示例(使用锁):
import threading
import time
count = 0
lock = threading.Lock()
def increment():
global count
with lock: # 获取锁
local_count = count
local_count += 1
time.sleep(0.1) # 模拟耗时操作
count = local_count
# 释放锁
def worker():
for _ in range(100000):
increment()
if __name__ == '__main__':
start_time = time.time()
threads = []
for _ in range(2):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
for t in threads:
t.join()
end_time = time.time()
print(f"Count: {count}")
print(f"Time: {end_time - start_time:.2f} seconds")
第三章:异步编程(Asyncio):单线程的逆袭!
asyncio
是Python 3.4引入的异步I/O框架,可以在单线程中实现并发执行。
- 原理: 使用事件循环(Event Loop)来调度协程(Coroutine),利用
async
和await
关键字实现非阻塞I/O操作。 - 优点:
- 单线程,避免了线程切换的开销。
- 高并发,适用于I/O密集型任务。
- 代码可读性好,使用
async
和await
关键字,使异步代码更像同步代码。
- 缺点:
- 需要使用异步库,例如
aiohttp
、asyncpg
等。 - 代码调试相对复杂。
- 需要使用异步库,例如
- 适用场景: I/O密集型任务,例如网络爬虫、Web服务器、实时通信等。
核心概念:
- 事件循环(Event Loop):
asyncio
的核心,负责调度协程的执行。 - 协程(Coroutine): 使用
async
定义的函数,可以暂停和恢复执行。 async
和await
:async
用于定义协程,await
用于等待一个协程执行完成,并在等待期间释放控制权给事件循环,允许其他协程执行。
代码示例:
import asyncio
import aiohttp
import time
async def fetch_url(url):
"""异步获取URL的内容"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
"""主函数"""
start_time = time.time()
urls = [
"https://www.example.com",
"https://www.google.com",
"https://www.python.org"
]
tasks = [fetch_url(url) for url in urls] # 创建任务列表
results = await asyncio.gather(*tasks) # 并发执行任务
for i, result in enumerate(results):
print(f"URL {urls[i]} 的长度:{len(result)}")
end_time = time.time()
print(f"总耗时:{end_time - start_time:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
代码解释:
async def fetch_url(url)
定义了一个协程函数,用于异步获取URL的内容。async with aiohttp.ClientSession() as session:
使用aiohttp
创建一个异步HTTP会话。async with session.get(url) as response:
发送一个异步GET请求。await response.text()
等待响应返回,并获取响应的文本内容。asyncio.gather(*tasks)
并发执行多个协程任务。asyncio.run(main())
运行主协程。
与线程的比较:
特性 | 多线程 (Threading) | 异步编程 (Asyncio) |
---|---|---|
并发模型 | 并发执行 | 协作式并发 |
上下文切换 | 操作系统内核 | 事件循环 |
并发数量 | 受限于系统资源 | 理论上更高 |
适用场景 | I/O密集型 | I/O密集型 |
CPU密集型 | 不适合 (GIL) | 不适合 |
代码复杂度 | 较高 (线程安全) | 较高 (异步思维) |
最佳实践:
- 使用合适的并发模型: 根据任务类型选择多进程、多线程或异步编程。
- 避免阻塞操作: 在异步代码中,尽量使用非阻塞I/O操作,避免阻塞事件循环。
- 处理异常: 在并发代码中,要妥善处理异常,避免程序崩溃。
- 监控性能: 监控并发程序的性能,及时发现和解决问题。
总结:
Python的并发模型经历了从multiprocess
到asyncio
的演进,每种模型都有其优缺点和适用场景。选择合适的并发模型,可以充分利用系统资源,提高程序的性能和响应速度。 多进程适合CPU密集型,多线程适合IO密集型但要注意GIL,异步编程适合高并发IO密集型。记住,没有银弹,选择适合自己的才是最好的。
好了,今天的讲座就到这里,希望对大家有所帮助! 各位,下课!