Code
事件循环 event loop
1asyncio.run(main())
一个“调度员”,专门负责运行异步任务,并在任务等待的时候切换去做别的任务。
在事件循环中,反复做以下事情:
- 看看有没有协程要继续执行
- 执行一小段
- 如果遇到 await,就把这个协程暂停
- 去执行别的已经准备好的协程
- 等某个 I/O、定时器、网络请求完成后,再恢复对应协程
- 一直循环,直到任务都完成
大概就是看哪些事情可以做了,就去做,否则就等着
协程函数
协程 coroutine 可以理解成一种“可以暂停、以后再继续执行的函数”。
1async def f():
2 print("A")
3 await asyncio.sleep(1)
4 print("B")
这样定义的协程函数是可暂停的
也就是可以让程序有能力在等待 I/O 的时候切换去做别的事
其中:
1await asyncio.sleep(1)
暂停当前的协程,将主导权交还给事件循环
让事件循环去做别的能做的事情
之后在恢复继续执行
并行
1import asyncio
2
3
4async def fetch(url: str) -> str:
5 print(f" → 开始请求 {url}")
6 await asyncio.sleep(1) # 模拟网络 I/O——控制权交还给事件循环
7 print(f" ← {url} 完成")
8 return f"[{url} 的响应]"
9
10
11async def main():
12 print("=" * 50)
13 print("async 串行(注意:还是串行!)")
14
15 t0 = asyncio.get_event_loop().time()
16
17 # 三次 await 是顺序执行的——每个都要等前一个完成
18 r1 = await fetch("a.com")
19 r2 = await fetch("b.com")
20 r3 = await fetch("c.com")
21
22 elapsed = asyncio.get_event_loop().time() - t0
23 print(f"\n结果: {r1}, {r2}, {r3}")
24 print(f"⏱ 耗时: {elapsed:.1f}s ← 还是 3 秒,因为 await 在等它完成!")
25
26
27# ── 启动入口
28asyncio.run(main())
输出:
1async 串行(注意:还是串行!)
2 → 开始请求 a.com
3 ← a.com 完成
4 → 开始请求 b.com
5 ← b.com 完成
6 → 开始请求 c.com
7 ← c.com 完成
8
9结果: [a.com 的响应], [b.com 的响应], [c.com 的响应]
10⏱ 耗时: 3.0s ← 还是 3 秒,因为 await 在等它完成!
这里虽然是await
1 r1 = await fetch("a.com")
2 r2 = await fetch("b.com")
3 r3 = await fetch("c.com")
但实际执行逻辑是
1main()
2 └─ await fetch("a.com")
3 └─ 等 a.com 完成
4
5main()
6 └─ await fetch("b.com")
7 └─ 等 b.com 完成
8
9main()
10 └─ await fetch("c.com")
11 └─ 等 c.com 完成
事件循环中,没有同时在await的其他任务
a在await的时候,卡在这里,还没执行到b、c的await
自然无法等待到已完成、能够先执行的任务
因此,为了达成并行的效果,显然我们需要同时await几个协程
1import asyncio
2
3
4async def fetch(url: str) -> str:
5 """模拟异步网络请求,每个耗时 1 秒"""
6 print(f" → 开始请求 {url}")
7 await asyncio.sleep(1)
8 print(f" ← {url} 完成")
9 return f"[{url} 的响应]"
10
11
12async def main():
13 print("=" * 50)
14 print("asyncio.gather 并发请求:")
15
16 t0 = asyncio.get_event_loop().time()
17
18 # gather:同时启动,一起等 ── 关键!
19 results = await asyncio.gather(
20 fetch("a.com"),
21 fetch("b.com"),
22 fetch("c.com"),
23 )
24
25 elapsed = asyncio.get_event_loop().time() - t0
26 print(f"\n结果: {results}")
27 print(f"⏱ 耗时: {elapsed:.1f}s ← 三个同时跑,只用了 1 秒!")
28
29
30# ── 启动
31asyncio.run(main())
来仔细走一遍流程:
1asyncio.run(main())
- 创建一个事件循环
- 把 main 协程放进事件循环,开始运行
main
1results = await asyncio.gather(
2 fetch("a.com"),
3 fetch("b.com"),
4 fetch("c.com"),
5)
创建三个协程
fetch("a.com")-> coroutine Afetch("b.com")-> coroutine Bfetch("c.com")-> coroutine C
由
gather打包成一个任务Task,交给事件循环调度对于
await asyncio.gather- 需要等待
gather中的任务完成,才会继续往下执行 - 事件循环发现
fetch("a.com")可以启动await 1s,交还事件循环
- 事件循环发现
fetch("b.com")可以启动await 1s,交还事件循环
- 事件循环发现
fetch("c.com")可以启动await 1s,交还事件循环
- 然后等待三个协程阻塞结束,收集结果到
gather gather的阻塞也结束
- 需要等待
回到
main
1全局代码
2└── asyncio.run(main())
3 └── 事件循环启动
4 └── main()
5 ├── print(...)
6 ├── 记录 t0
7 └── await asyncio.gather(...)
8 ├── 创建/调度 fetch("a.com")
9 │ ├── print("开始请求 a.com")
10 │ └── await asyncio.sleep(1) 暂停
11 │
12 ├── 创建/调度 fetch("b.com")
13 │ ├── print("开始请求 b.com")
14 │ └── await asyncio.sleep(1) 暂停
15 │
16 └── 创建/调度 fetch("c.com")
17 ├── print("开始请求 c.com")
18 └── await asyncio.sleep(1) 暂停
19
20 约 1 秒后:
21
22 ├── fetch("a.com") 恢复,return 结果
23 ├── fetch("b.com") 恢复,return 结果
24 └── fetch("c.com") 恢复,return 结果
25
26 gather 收集三个结果
27 └── main() 恢复
28 ├── 计算 elapsed
29 ├── print(results)
30 └── main() 结束
31
32 asyncio.run() 关闭事件循环
Task
1import asyncio
2
3
4async def fetch(url: str, delay: float = 2) -> str:
5 """模拟请求,delay 秒后返回"""
6 print(f" [{url}] 开始 (需 {delay}s)")
7 await asyncio.sleep(delay)
8 print(f" [{url}] 完成")
9 return f"[{url}]"
10
11
12async def main():
13 print("=" * 50)
14 print("create_task 演示:\n")
15
16 # ── 发射两个后台任务 ──────────────────────────
17 print("主: 发射任务 a 和 b...")
18 task_a = asyncio.create_task(fetch("a.com", 2))
19 task_b = asyncio.create_task(fetch("b.com", 1.5))
20
21 # task_a 和 task_b 已经在后台跑了!
22 # 主协程可以继续干自己的事:
23 print("主: 任务在后台跑着,我干点别的...")
24 await asyncio.sleep(0.5)
25 print("主: 0.5s 过去了,任务还在跑\n")
26
27 # 现在需要 b 的结果了——await 它
28 print("主: 等等 b 的结果...")
29 result_b = await task_b
30 print(f"主: b 的结果 → {result_b}\n")
31
32 # 再等 a
33 print("主: 等等 a 的结果...")
34 result_a = await task_a
35 print(f"主: a 的结果 → {result_a}\n")
36
37 print("主: 全部完成!")
38
39
40# ── 启动
41asyncio.run(main())
重点在
1 task_a = asyncio.create_task(fetch("a.com", 2))
2 task_b = asyncio.create_task(fetch("b.com", 1.5))
和以下代码做出区别
1 task_x = await fetch("a.com", 2)
后者是停下当前的协程函数,必须马上阻塞,等待fetch完成后才能继续执行
前者是先创建一个任务,注册到事件循环中
但是我此时先不等他们完成
我可以继续往下做我这个协程需要做的事情
一旦后续有机会,事件循环自然会趁机去把注册好的任务调度完成
1全局代码
2└── asyncio.run(main())
3 └── event loop
4 └── main()
5 ├── print(...)
6 ├── task_a = create_task(fetch("a.com", 2))
7 │ └── Task A 被注册到事件循环
8 │
9 ├── task_b = create_task(fetch("b.com", 1.5))
10 │ └── Task B 被注册到事件循环
11 │
12 ├── print("主: 任务在后台...")
13 └── await asyncio.sleep(0.5)
14 └── main 暂停,事件循环调度其他 Task
15
16 Task A:
17 └── fetch("a.com", 2)
18 ├── print("[a.com] 开始")
19 └── await asyncio.sleep(2)
20 └── Task A 暂停
21
22 Task B:
23 └── fetch("b.com", 1.5)
24 ├── print("[b.com] 开始")
25 └── await asyncio.sleep(1.5)
26 └── Task B 暂停
27
28 0.5 秒后:
29
30 main 恢复
31 ├── print("0.5s 过去了")
32 └── await task_b
33 └── main 暂停,等待 Task B
34
35 1.5 秒后:
36
37 Task B 恢复
38 ├── print("[b.com] 完成")
39 └── return "[b.com]"
40
41 main 恢复
42 ├── result_b = "[b.com]"
43 ├── print("b 的结果")
44 └── await task_a
45 └── main 暂停,等待 Task A
46
47 2.0 秒后:
48
49 Task A 恢复
50 ├── print("[a.com] 完成")
51 └── return "[a.com]"
52
53 main 恢复
54 ├── result_a = "[a.com]"
55 ├── print("a 的结果")
56 └── main 结束
gather()适合:
- 我已经知道要并发哪些任务,并且要一起等它们全部完成。
- 批量完成任务(任务明确可知)
create_task()适合:
- 我想先启动任务,让它在后台跑;主协程继续做别的事,需要结果时再回来等它。
- 动态添加任务(任务数量随着时间变化)
Exception
协程函数的异常,可以由await处进行捕获
1 try:
2 result = await fetch("bad.com")
3 except ValueError as e:
4 print(f" 捕获到异常: {e}")
5 # 输出:捕获到异常: 💥 bad.com 炸了!
由gather发起的多个协程,可以分别获取
1 # ── 2. gather 默认行为:异常传播 ────────────────
2 print("\n2. gather 默认——有异常就中断:")
3 try:
4 results = await asyncio.gather(
5 fetch("a.com"),
6 fetch("bad.com"), # ⚠️ 这个炸了
7 fetch("c.com"), # 这个还是会跑完,但结果被丢弃
8 )
9 except ValueError as e:
10 print(f" gather 抛出异常: {e}")
11 # 注意:a.com 和 c.com 不会被取消,会继续跑完
12
13 # ── 3. return_exceptions=True ──────────────────
14 print("\n3. return_exceptions=True——不中断:")
15 results = await asyncio.gather(
16 fetch("a.com"),
17 fetch("bad.com"),
18 fetch("c.com"),
19 return_exceptions=True, # ← 关键!
20 )
21 for i, r in enumerate(results):
22 if isinstance(r, Exception):
23 print(f" 第 {i} 个失败了: {r}")
24 else:
25 print(f" 第 {i} 个成功: {r}")
同时可以加上timeout机制
1 try:
2 result = await asyncio.wait_for(
3 fetch("slow.com", delay=5), # 要 5 秒
4 timeout=1.0, # 但只等 1 秒
5 )
6 except asyncio.TimeoutError:
7 print(" ⏰ 等了 1 秒还没结果,放弃了!")
8 result = None
同步
在协程函数里使用同步的代码
1async def bad_fetch(url: str) -> str:
2 """❌ 用了 time.sleep——同步阻塞!"""
3 print(f" [{url}] 开始")
4 time.sleep(1) # ⚠️ 卡死整个事件循环 1 秒!
5 print(f" [{url}] 完成")
6 return f"[{url}]"
整个事件循环的线程,都会被卡死1s
因此必须使用支持异步版本的代码
如果实在没有异步版本
1 results = await asyncio.gather(
2 asyncio.to_thread(blocking_io),
3 asyncio.to_thread(blocking_io),
4 asyncio.to_thread(blocking_io),
5 )
用 asyncio.to_thread() 把同步调用扔到线程池
Pattern
实际使用可以采用现成的异步pattern
# 模式 1:限量并发 → Semaphore
# 模式 2:生产者-消费者 → asyncio.Queue
# 模式 3:竞速 → asyncio.wait(FIRST_COMPLETED)
# 模式 4:优雅取消 → try/except CancelledError
这里GPT一下就知道
