协程
约 1694 字大约 6 分钟
2025-02-25
Python 中的 协程(Coroutine) 是一种 异步编程(Asynchronous Programming) 技术,是一种轻量级的线程,它可以在单个线程中实现并发执行,适用于高并发任务。如 I/O 操作、网络请求、数据库查询 等耗时操作。协程通过 async
和 await
关键字来实现,可以让代码在等待耗时操作时不阻塞其他任务,从而提高程序效率。
Python 中使用协程来实现的 异步编程 。主要用于 I/O 密集型任务,如:网络请求、数据库查询、文件 I/O、爬虫任务等。
基本概念
- 同步(Synchronous):任务按顺序执行,一个任务没完成,后续任务无法执行。
- 多线程 / 多进程(Concurrency):通过多个线程/进程并发执行多个任务,但创建线程/进程有一定开销。
- 协程(Coroutine / Async Programming):基于 async 和 await,在单线程中实现并发执行,适合 I/O 密集型任务(如网络请求、数据库操作)。
使用
Python 使用 async def
定义协程函数,使用 await
调用其他协程。
简单的协程
import asyncio
async def my_coroutine():
print("任务开始")
await asyncio.sleep(2) # 模拟 I/O 操作
print("任务结束")
# 运行协程
asyncio.run(my_coroutine())
# 输出:
# 任务开始
# (等待 2 秒)
# 任务结束
- asyncio.sleep(2) 模拟 I/O 操作(如等待网络请求),但不会阻塞其他任务。
多个协程并发执行
可以使用 asyncio.gather() 或 asyncio.create_task() 让多个协程同时执行。
import asyncio
async def task(name, delay):
print(f"{name} 开始")
await asyncio.sleep(delay) # 模拟 I/O 操作
print(f"{name} 结束")
async def main():
# 同时运行三个任务
await asyncio.gather(
task("任务1", 2),
task("任务2", 1),
task("任务3", 3)
)
asyncio.run(main())
# 输出:
# 任务1 开始
# 任务2 开始
# 任务3 开始
# 任务2 结束
# 任务1 结束
# 任务3 结束
- asyncio.gather() 让所有任务并行运行,而不是一个任务执行完再执行下一个。
- 任务2 只需 1 秒,因此先完成。
对比一下下面两种方式,一种是直接使用的异步函数,一种是使用asyncio.create_task
包装成任务。
直接使用异步函数。
import asyncio
# 假设这是你的异步任务函数
async def fetch(i, sem):
async with sem: # 控制并发
print(f"开始任务 {i}")
await asyncio.sleep(1) # 模拟异步请求
print(f"完成任务 {i}")
return f"结果 {i}"
async def main():
sem = asyncio.Semaphore(5) # 限制最多同时并发 5 个任务
tasks = [fetch(i, sem) for i in range(20)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
这段代码不会启动任务,只是创建了 20 个 协程对象 coroutine 对象。只有当你执行:
await asyncio.gather(*tasks)
这些协程才会被调度执行
使用asyncio.create_task
包装成任务
import asyncio
# 假设这是你的异步任务函数
async def fetch(i, sem):
async with sem: # 控制并发
print(f"开始任务 {i}")
await asyncio.sleep(1) # 模拟异步请求
print(f"完成任务 {i}")
return f"结果 {i}"
async def main():
sem = asyncio.Semaphore(5) # 限制最多同时并发 5 个任务
tasks = [asyncio.create_task(fetch(i, sem)) for i in range(20)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
使用asyncio.create_task
,会立即将协程注册为 Task 并提交给事件循环执行,即使你还没有 await
它。
特性 | fetch(i, sem) | asyncio.create_task(fetch(i, sem)) |
---|---|---|
类型 | Coroutine object | asyncio.Task 对象 |
是否立即调度执行 | 否,必须 await 或 create_task | 是,立即调度 |
需要 await 启动执行 | 是 | 建议 await,否则无法得知结果 |
可用于 asyncio.gather | 是 | 是 |
是否必须加到事件循环里 | 是 | 已经注册到事件循环 |
- 如果你是 一次性用 gather 统一调度所有任务,直接写 fetch(i) 协程即可。
- 如果你想让任务一边调度一边执行,比如提交后立即开始做事,可以用 create_task。
创建后台任务
asyncio.create_task()
让任务后台运行
- asyncio.gather() 等待所有任务完成后返回结果。
- asyncio.create_task() 让任务并发运行,但不等待它完成。
import asyncio
async def background_task():
print("后台任务开始")
await asyncio.sleep(2)
print("后台任务结束")
async def main():
task = asyncio.create_task(background_task()) # 启动任务但不等待
print("主任务继续运行")
await asyncio.sleep(1)
print("主任务完成")
asyncio.run(main())
# 输出
# 后台任务开始
# 主任务继续运行
# 主任务完成
# 后台任务结束
asyncio.create_task(background_task())
让任务并发执行,但不等待它完成。- main() 继续执行自己的代码,而 background_task() 在后台运行。
异步上下文管理
某些对象(如数据库连接、文件操作、网络请求)支持异步操作,使用 async with 让它们自动管理资源。
import asyncio
import aiohttp # pip install aiohttp
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
url = "https://www.example.com"
content = await fetch(url)
print("网页内容获取成功")
asyncio.run(main())
- async with 确保请求完成后关闭连接,避免资源泄露。
- await response.text() 异步获取网页内容,不会阻塞其他任务。
asyncio.Queue 实现生产者-消费者
在异步编程中,asyncio.Queue 可用于任务调度、消息传递。
import asyncio
queue = asyncio.Queue()
async def producer():
for i in range(5):
await queue.put(i) # 生产数据
print(f"生产: {i}")
await asyncio.sleep(1) # 模拟生产间隔
async def consumer():
while True:
item = await queue.get() # 获取数据
print(f"消费: {item}")
await asyncio.sleep(2) # 模拟处理时间
queue.task_done()
async def main():
# 启动生产者和消费者
producer_task = asyncio.create_task(producer())
consumer_task = asyncio.create_task(consumer())
await producer_task # 等待生产者完成
await queue.join() # 等待队列处理完成
consumer_task.cancel() # 取消消费者(因为它是无限循环)
asyncio.run(main())
# 输出
# 生产: 0
# 消费: 0
# 生产: 1
# 生产: 2
# 消费: 1
# 生产: 3
# 消费: 2
# ...
- queue.put() 异步生产数据,queue.get() 异步消费数据。
- queue.join() 确保队列所有数据被消费。
- consumer_task.cancel() 停止消费者。
asyncio.run() 的替代方案
在 asyncio.run() 之外,也可以用 事件循环 手动执行协程。
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(my_coroutine()) # 运行协程
loop.close()
- asyncio.run() 是 Python 3.7+ 的推荐方式,自动管理事件循环。
总结
关键字 | 作用 |
---|---|
async def | 定义协程函数 |
await | 调用协程(必须在 async 内部) |
asyncio.run() | 运行主协程 |
asyncio.gather() | 并发运行多个协程 |
asyncio.create_task() | 启动协程任务(不等待) |
async with | 异步上下文管理(文件、网络请求等) |
asyncio.Queue | 任务队列(生产者-消费者模型) |
下列场景适合使用协程:
- I/O 密集型任务(网络请求、数据库、文件 I/O)。
- 并发执行多个任务(如爬虫、批量 API 调用)。
- 服务器端应用(如 FastAPI、Tornado)。
下列场景不适合使用协程:
- 计算密集型任务,如数学计算、加密、图像处理(推荐 multiprocessing)。
- 需要真正的多线程,如 CPU 任务(推荐 threading)。