Code

事件循环 event loop

1asyncio.run(main())

一个“调度员”,专门负责运行异步任务,并在任务等待的时候切换去做别的任务。

在事件循环中,反复做以下事情:

  • 看看有没有协程要继续执行
  • 执行一小段
  • 如果遇到 await,就把这个协程暂停
  • 去执行别的已经准备好的协程
  • 等某个 I/O、定时器、网络请求完成后,再恢复对应协程
  • 一直循环,直到任务都完成

大概就是看哪些事情可以做了,就去做,否则就等着

协程函数

协程 coroutine 可以理解成一种“可以暂停、以后再继续执行的函数”。

1async def f():
2    print("A")
3    await asyncio.sleep(1)
4    print("B")

这样定义的协程函数是可暂停的

也就是可以让程序有能力在等待 I/O 的时候切换去做别的事

其中:

1await asyncio.sleep(1)

暂停当前的协程,将主导权交还给事件循环

让事件循环去做别的能做的事情

之后在恢复继续执行

并行

 1import asyncio
 2
 3
 4async def fetch(url: str) -> str:
 5    print(f"  → 开始请求 {url}")
 6    await asyncio.sleep(1)   # 模拟网络 I/O——控制权交还给事件循环
 7    print(f"  ← {url} 完成")
 8    return f"[{url} 的响应]"
 9
10
11async def main():
12    print("=" * 50)
13    print("async 串行(注意:还是串行!)")
14
15    t0 = asyncio.get_event_loop().time()
16
17    # 三次 await 是顺序执行的——每个都要等前一个完成
18    r1 = await fetch("a.com")
19    r2 = await fetch("b.com")
20    r3 = await fetch("c.com")
21
22    elapsed = asyncio.get_event_loop().time() - t0
23    print(f"\n结果: {r1}, {r2}, {r3}")
24    print(f"⏱ 耗时: {elapsed:.1f}s  ← 还是 3 秒,因为 await 在等它完成!")
25
26
27# ── 启动入口 
28asyncio.run(main())

输出:

 1async 串行(注意:还是串行!)
 2  → 开始请求 a.com
 3  ← a.com 完成
 4  → 开始请求 b.com
 5  ← b.com 完成
 6  → 开始请求 c.com
 7  ← c.com 完成
 8
 9结果: [a.com 的响应], [b.com 的响应], [c.com 的响应]
10⏱ 耗时: 3.0s  ← 还是 3 秒,因为 await 在等它完成!

这里虽然是await

1    r1 = await fetch("a.com")
2    r2 = await fetch("b.com")
3    r3 = await fetch("c.com")

但实际执行逻辑是

 1main()
 2  └─ await fetch("a.com")
 3        └─ 等 a.com 完成
 4
 5main()
 6  └─ await fetch("b.com")
 7        └─ 等 b.com 完成
 8
 9main()
10  └─ await fetch("c.com")
11        └─ 等 c.com 完成

事件循环中,没有同时在await的其他任务

a在await的时候,卡在这里,还没执行到b、c的await

自然无法等待到已完成、能够先执行的任务


因此,为了达成并行的效果,显然我们需要同时await几个协程

 1import asyncio
 2
 3
 4async def fetch(url: str) -> str:
 5    """模拟异步网络请求,每个耗时 1 秒"""
 6    print(f"  → 开始请求 {url}")
 7    await asyncio.sleep(1)
 8    print(f"  ← {url} 完成")
 9    return f"[{url} 的响应]"
10
11
12async def main():
13    print("=" * 50)
14    print("asyncio.gather 并发请求:")
15
16    t0 = asyncio.get_event_loop().time()
17
18    # gather:同时启动,一起等 ── 关键!
19    results = await asyncio.gather(
20        fetch("a.com"),
21        fetch("b.com"),
22        fetch("c.com"),
23    )
24
25    elapsed = asyncio.get_event_loop().time() - t0
26    print(f"\n结果: {results}")
27    print(f"⏱ 耗时: {elapsed:.1f}s  ← 三个同时跑,只用了 1 秒!")
28
29
30# ── 启动 
31asyncio.run(main())

来仔细走一遍流程:

1asyncio.run(main())
  • 创建一个事件循环
  • 把 main 协程放进事件循环,开始运行main
1results = await asyncio.gather(
2    fetch("a.com"),
3    fetch("b.com"),
4    fetch("c.com"),
5)
  • 创建三个协程

    • fetch("a.com") -> coroutine A
    • fetch("b.com") -> coroutine B
    • fetch("c.com") -> coroutine C
  • gather打包成一个任务Task,交给事件循环调度

  • 对于await asyncio.gather

    • 需要等待gather中的任务完成,才会继续往下执行
    • 事件循环发现fetch("a.com")可以启动
      • await 1s,交还事件循环
    • 事件循环发现fetch("b.com")可以启动
      • await 1s,交还事件循环
    • 事件循环发现fetch("c.com")可以启动
      • await 1s,交还事件循环
    • 然后等待三个协程阻塞结束,收集结果到gather
    • gather的阻塞也结束
  • 回到main

 1全局代码
 2└── asyncio.run(main())
 3    └── 事件循环启动
 4        └── main()
 5            ├── print(...)
 6            ├── 记录 t0
 7            └── await asyncio.gather(...)
 8                ├── 创建/调度 fetch("a.com")
 9                │   ├── print("开始请求 a.com")
10                │   └── await asyncio.sleep(1)  暂停
1112                ├── 创建/调度 fetch("b.com")
13                │   ├── print("开始请求 b.com")
14                │   └── await asyncio.sleep(1)  暂停
1516                └── 创建/调度 fetch("c.com")
17                    ├── print("开始请求 c.com")
18                    └── await asyncio.sleep(1)  暂停
19
20        约 1 秒后:
21
22        ├── fetch("a.com") 恢复,return 结果
23        ├── fetch("b.com") 恢复,return 结果
24        └── fetch("c.com") 恢复,return 结果
25
26        gather 收集三个结果
27        └── main() 恢复
28            ├── 计算 elapsed
29            ├── print(results)
30            └── main() 结束
31
32    asyncio.run() 关闭事件循环

Task

 1import asyncio
 2
 3
 4async def fetch(url: str, delay: float = 2) -> str:
 5    """模拟请求,delay 秒后返回"""
 6    print(f"    [{url}] 开始 (需 {delay}s)")
 7    await asyncio.sleep(delay)
 8    print(f"    [{url}] 完成")
 9    return f"[{url}]"
10
11
12async def main():
13    print("=" * 50)
14    print("create_task 演示:\n")
15
16    # ── 发射两个后台任务 ──────────────────────────
17    print("主: 发射任务 a 和 b...")
18    task_a = asyncio.create_task(fetch("a.com", 2))
19    task_b = asyncio.create_task(fetch("b.com", 1.5))
20
21    # task_a 和 task_b 已经在后台跑了!
22    # 主协程可以继续干自己的事:
23    print("主: 任务在后台跑着,我干点别的...")
24    await asyncio.sleep(0.5)
25    print("主: 0.5s 过去了,任务还在跑\n")
26
27    # 现在需要 b 的结果了——await 它
28    print("主: 等等 b 的结果...")
29    result_b = await task_b
30    print(f"主: b 的结果 → {result_b}\n")
31
32    # 再等 a
33    print("主: 等等 a 的结果...")
34    result_a = await task_a
35    print(f"主: a 的结果 → {result_a}\n")
36
37    print("主: 全部完成!")
38
39
40# ── 启动 
41asyncio.run(main())

重点在

1    task_a = asyncio.create_task(fetch("a.com", 2))
2    task_b = asyncio.create_task(fetch("b.com", 1.5))

和以下代码做出区别

1    task_x = await fetch("a.com", 2)

后者是停下当前的协程函数,必须马上阻塞,等待fetch完成后才能继续执行

前者是先创建一个任务,注册到事件循环中

但是我此时先不等他们完成

我可以继续往下做我这个协程需要做的事情

一旦后续有机会,事件循环自然会趁机去把注册好的任务调度完成

 1全局代码
 2└── asyncio.run(main())
 3    └── event loop
 4        └── main()
 5            ├── print(...)
 6            ├── task_a = create_task(fetch("a.com", 2))
 7            │   └── Task A 被注册到事件循环
 8 9            ├── task_b = create_task(fetch("b.com", 1.5))
10            │   └── Task B 被注册到事件循环
1112            ├── print("主: 任务在后台...")
13            └── await asyncio.sleep(0.5)
14                └── main 暂停,事件循环调度其他 Task
15
16        Task A:
17        └── fetch("a.com", 2)
18            ├── print("[a.com] 开始")
19            └── await asyncio.sleep(2)
20                └── Task A 暂停
21
22        Task B:
23        └── fetch("b.com", 1.5)
24            ├── print("[b.com] 开始")
25            └── await asyncio.sleep(1.5)
26                └── Task B 暂停
27
28        0.5 秒后:
29
30        main 恢复
31        ├── print("0.5s 过去了")
32        └── await task_b
33            └── main 暂停,等待 Task B
34
35        1.5 秒后:
36
37        Task B 恢复
38        ├── print("[b.com] 完成")
39        └── return "[b.com]"
40
41        main 恢复
42        ├── result_b = "[b.com]"
43        ├── print("b 的结果")
44        └── await task_a
45            └── main 暂停,等待 Task A
46
47        2.0 秒后:
48
49        Task A 恢复
50        ├── print("[a.com] 完成")
51        └── return "[a.com]"
52
53        main 恢复
54        ├── result_a = "[a.com]"
55        ├── print("a 的结果")
56        └── main 结束

gather() 适合:

  • 我已经知道要并发哪些任务,并且要一起等它们全部完成。
  • 批量完成任务(任务明确可知)

create_task() 适合:

  • 我想先启动任务,让它在后台跑;主协程继续做别的事,需要结果时再回来等它。
  • 动态添加任务(任务数量随着时间变化)

Exception

协程函数的异常,可以由await处进行捕获

1    try:
2        result = await fetch("bad.com")
3    except ValueError as e:
4        print(f"   捕获到异常: {e}")
5    # 输出:捕获到异常: 💥 bad.com 炸了!

gather发起的多个协程,可以分别获取

 1    # ── 2. gather 默认行为:异常传播 ────────────────
 2    print("\n2. gather 默认——有异常就中断:")
 3    try:
 4        results = await asyncio.gather(
 5            fetch("a.com"),
 6            fetch("bad.com"),   # ⚠️ 这个炸了
 7            fetch("c.com"),     # 这个还是会跑完,但结果被丢弃
 8        )
 9    except ValueError as e:
10        print(f"   gather 抛出异常: {e}")
11    # 注意:a.com 和 c.com 不会被取消,会继续跑完
12
13    # ── 3. return_exceptions=True ──────────────────
14    print("\n3. return_exceptions=True——不中断:")
15    results = await asyncio.gather(
16        fetch("a.com"),
17        fetch("bad.com"),
18        fetch("c.com"),
19        return_exceptions=True,   # ← 关键!
20    )
21    for i, r in enumerate(results):
22        if isinstance(r, Exception):
23            print(f"   第 {i} 个失败了: {r}")
24        else:
25            print(f"   第 {i} 个成功: {r}")

同时可以加上timeout机制

1	try:
2        result = await asyncio.wait_for(
3            fetch("slow.com", delay=5),  # 要 5 秒
4            timeout=1.0,                  # 但只等 1 秒
5        )
6    except asyncio.TimeoutError:
7        print("  ⏰ 等了 1 秒还没结果,放弃了!")
8        result = None

同步

在协程函数里使用同步的代码

1async def bad_fetch(url: str) -> str:
2    """❌ 用了 time.sleep——同步阻塞!"""
3    print(f"    [{url}] 开始")
4    time.sleep(1)          # ⚠️ 卡死整个事件循环 1 秒!
5    print(f"    [{url}] 完成")
6    return f"[{url}]"

整个事件循环的线程,都会被卡死1s

因此必须使用支持异步版本的代码

如果实在没有异步版本

1    results = await asyncio.gather(
2        asyncio.to_thread(blocking_io),
3        asyncio.to_thread(blocking_io),
4        asyncio.to_thread(blocking_io),
5    )

用 asyncio.to_thread() 把同步调用扔到线程池

Pattern

实际使用可以采用现成的异步pattern

# 模式 1:限量并发 → Semaphore

# 模式 2:生产者-消费者 → asyncio.Queue

# 模式 3:竞速 → asyncio.wait(FIRST_COMPLETED)

# 模式 4:优雅取消 → try/except CancelledError

这里GPT一下就知道