最大并发数控制
约 1127 字大约 4 分钟
2025-02-25
Python 是单线程的,但是可以通过多进程、多线程、异步编程等方式实现并发。协程是 Python 中一种轻量级的并发实现方式,它可以在单个线程中实现多个任务的切换执行。
如果想要实现类似的任务并发执行和等待的功能,而又不想用 ThreadPoolExecutor
,可以使用 semaphore
(信号量) 或 queue.Queue
来控制线程的并发数量。这些方法更轻量,适用于简单的线程控制场景。
使用 threading.Semaphore 控制最大并发数
Semaphore
允许你限制同时运行的线程数量,超出的任务会自动等待。
使用asyncio
运行示例:
import asyncio
# 假设这是你的异步任务函数
async def fetch(i, sem):
async with sem: # 控制并发
print(f"开始任务 {i}")
await asyncio.sleep(1) # 模拟异步请求
print(f"完成任务 {i}")
return f"结果 {i}"
async def main():
sem = asyncio.Semaphore(5) # 限制最多同时并发 5 个任务
tasks = [fetch(i, sem) for i in range(20)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
使用Thread
运行示例:
import threading
import time
# 限制最多 3 个线程同时运行
semaphore = threading.Semaphore(3)
def task(n):
with semaphore: # 获取信号量
print(f"任务 {n} 开始")
time.sleep(2) # 模拟任务执行 2 秒
print(f"任务 {n} 结束")
# 启动多个线程
threads = []
for i in range(5): # 提交 5 个任务
t = threading.Thread(target=task, args=(i+1,))
t.start()
threads.append(t)
# 等待所有任务完成
for t in threads:
t.join()
输出示例:
任务 1 开始
任务 2 开始
任务 3 开始
(过了 2 秒)
任务 1 结束
任务 4 开始
任务 2 结束
任务 5 开始
任务 3 结束
(再过 2 秒)
任务 4 结束
任务 5 结束
- semaphore = threading.Semaphore(3) 控制最多 3 个任务同时运行。
- 超过 3 个的任务会等待前面任务完成后才执行。
使用 queue.Queue 作为任务队列
如果你的任务是动态添加的,也可以使用 queue.Queue 让工作线程从队列里获取任务并执行。
代码示例
import threading
import queue
import time
task_queue = queue.Queue()
# 任务 worker
def worker():
while True:
task = task_queue.get() # 获取任务
if task is None: # 终止信号
break
print(f"任务 {task} 开始")
time.sleep(2)
print(f"任务 {task} 结束")
task_queue.task_done()
# 启动 3 个 worker 线程
num_workers = 3
threads = []
for _ in range(num_workers):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
# 添加 5 个任务
for i in range(5):
task_queue.put(i + 1)
# 等待所有任务完成
task_queue.join()
# 终止 worker 线程
for _ in range(num_workers):
task_queue.put(None) # 发送终止信号
# 等待所有 worker 线程结束
for t in threads:
t.join()
输出示例
任务 1 开始
任务 2 开始
任务 3 开始
(过了 2 秒)
任务 1 结束
任务 4 开始
任务 2 结束
任务 5 开始
任务 3 结束
(再过 2 秒)
任务 4 结束
任务 5 结束
- queue.Queue() 作为任务队列,支持任务动态添加。
- task_queue.get() 取任务,task_queue.task_done() 标记任务完成。
- 线程池大小固定为 3(num_workers = 3)。
- None 作为终止信号,告诉 worker 线程任务结束。
使用 ThreadPoolExecutor 控制最大并发数
ThreadPoolExecutor
是 Python 标准库 concurrent.futures
模块中的一个类,它提供了一个高级接口来管理线程池。
使用asyncio
运行示例:
import asyncio
from concurrent.futures import ThreadPoolExecutor
# 假设这是你的异步任务函数
async def fetch(i, executor):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, long_running_task, i)
return result
def long_running_task(i):
print(f"开始任务 {i}")
time.sleep(1) # 模拟耗时操作
print(f"完成任务 {i}")
return f"结果 {i}"
async def main():
with ThreadPoolExecutor(max_workers=5) as executor: # 限制最多 5 个任务同时运行
tasks = [fetch(i, executor) for i in range(20)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
对比
方法 | 适用场景 | 线程控制 | 复杂度 |
---|---|---|---|
ThreadPoolExecutor(max_workers=N) | 适合大多数场景,内置线程池 | 自动管理 | 简单 |
threading.Semaphore(N) | 适用于轻量控制并发的场景 | 通过信号量控制 | 简单 |
queue.Queue() + Thread | 适用于任务动态添加、消费者模式 | 任务队列控制 | 中等 |
如果你的需求只是限制最大线程数,Semaphore
更轻量。如果需要任务队列和动态管理,queue.Queue()
是更合适的方案。
- 如果是轻量级并发控制,用
Semaphore
。 - 如果是任务队列、动态调度,用
queue.Queue()
。 - 如果是自动管理线程池,用
ThreadPoolExecutor
。