以 QQ 机器人为梳理解异步编程
以 QQ 机器人为例讲解异步编程
从「异步是什么」到「事件循环怎么转」,再到一个真实框架(NcatBot)里的应用,最后用一个真实的定时任务 bug 复盘,把抽象的概念落到能跑的代码上。
一、为什么需要异步
先想一个最朴素的问题:一个 QQ 机器人在干什么?
它绝大部分时间在等——等 WebSocket 收到下一条消息、等 LLM API 返回、等沙箱里的命令跑完、等一张图片下载完。这些操作的共同点是 I/O 密集:CPU 几乎不工作,只是干等外部资源。
如果用同步阻塞的写法:
msg = ws.recv() # 阻塞,等消息(可能等几秒,也可能几分钟)
reply = llm.chat(msg) # 阻塞,等 LLM 返回(好几秒)
ws.send(reply) # 阻塞,等发送完成
在 ws.recv() 等消息的那几分钟里,整个程序什么都做不了——来了第二个群的消息也只能排队干瞪眼。要并发处理多个会话,传统办法是「一个连接一个线程」,但线程有真实的内存和上下文切换开销,几千个会话就会把机器拖垮。
异步(async)的核心思想:既然大部分时间都在等 I/O,那就在「等」的时候让出 CPU,去干别的活,等 I/O 好了再回来接着干。这样单个线程就能同时推进成百上千个「等待中」的任务。
关键区分:
| 含义 | 适合场景 | |
|---|---|---|
| 并发(concurrency) | 多个任务交替推进,宏观上「同时」进行 | I/O 密集(机器人就是典型) |
| 并行(parallelism) | 多个任务在多核上真正同时执行 | CPU 密集(大量计算) |
Python 的 asyncio 提供的是单线程并发:不靠多核,靠在一个线程里飞快地切换任务来「假装同时」。对 I/O 密集的机器人来说,这恰好是最划算的模型。
二、事件循环(Event Loop):异步的心脏
异步代码不会自己跑。背后必须有一个调度器在不停地做这件事:
「谁的 I/O 好了 / 谁的定时器到点了 → 让谁继续往下执行;谁遇到
await要等待了 → 把它挂起,切去跑别的。」
这个调度器就是 event loop(事件循环)。
三个核心概念
| 概念 | 是什么 | 类比 |
|---|---|---|
| coroutine(协程) | async def 函数,调用后返回一个「待执行的计划」,本身不会自动运行 | 一份写好的菜谱 |
| Task(任务) | asyncio.create_task(coro) 把协程注册到当前 loop,loop 才会调度它 | 把菜谱交给厨师排进待办单 |
| event loop | 真正的调度者,运行期间不断切换、推进所有 Task | 厨房里那个不停切换炉灶的厨师 |
await 到底发生了什么
await some_io() 这行代码的语义是:
- 「我要等这个 I/O,现在还没好。」
- 把当前协程的执行挂起,记下它停在哪一行。
- 把控制权交还给 event loop。
- loop 转去推进其他就绪的任务。
- 等这个 I/O 完成,loop 再把这个协程从挂起处恢复执行。
这套「挂起—交还—恢复」就是单线程能并发的全部秘密。注意:await 让出的前提是它等的东西本身是异步的(如 asyncio.sleep、异步网络库)。如果你在协程里写了同步阻塞的 time.sleep(5),loop 会被这一行整个卡死 5 秒,所有其他任务一起停摆——这是异步编程最常见的坑。
三个必须记住的认知(后面 bug 复盘的关键)
- 协程不会自己跑。
f()只是创建协程对象,必须有一个正在运行的 loop 去驱动。 create_task绑定到「当前正在运行的那个 loop」——调用那一刻asyncio.get_running_loop()返回的 loop。- loop 一旦停止/关闭,挂在它上面的所有 Task 都不再被调度。哪怕协程是个
while True,loop 不转,它就永远卡在某个await上不动。
三、原理对比:TypeScript / 浏览器 与 Python 的事件循环
很多人是先在前端接触异步的。JS/TS 和 Python 的 async/await 语法几乎一样,但底层调度模型有一个值得一提的区别。
JS / 浏览器:宏任务 + 微任务两个队列
浏览器的事件循环把待执行的回调分成两类队列:
- 宏任务(macrotask)队列:
setTimeout、setInterval、I/O、UI 渲染事件等。 - 微任务(microtask)队列:
Promise.then回调、await之后的续体、queueMicrotask、MutationObserver等。
调度规则是:每执行完一个宏任务,就把当前微任务队列里的任务全部清空,然后才考虑渲染、再取下一个宏任务。
取一个宏任务执行
└─ 执行过程中产生的微任务进微任务队列
执行完这个宏任务
└─ 把微任务队列「一次性全部排空」(期间新产生的微任务也一起处理掉)
└─ (浏览器)此时可能进行一次渲染
取下一个宏任务……
为什么要分两个队列?很大程度上正是为了浏览器渲染。浏览器希望「一组逻辑上相关的、原子的更新」在同一帧内一起完成,不要被渲染打断到中间状态。微任务队列保证了:一个宏任务触发的所有连锁 Promise 回调,会在下一次渲染之前一次性跑完,从而把这些更新「绑定到一起」原子地呈现,避免画面闪烁或撕裂。await 的续体走的是微任务,所以它的优先级高于 setTimeout(0) 这种宏任务——这就是经典面试题「Promise.then 比 setTimeout 先执行」的根源。
Python asyncio:单一就绪队列 + 定时器堆
Python 的 asyncio 没有「宏/微任务」这种二分。它的模型更直接:
- 一个 ready 队列:所有当前就绪、可以立刻继续跑的回调/Task。
- 一个按时间排序的定时器结构(最小堆):
call_later/asyncio.sleep注册的「到点才就绪」的回调。 - 一个 selector:通过操作系统的 I/O 多路复用(epoll/kqueue/IOCP)监听 fd 是否就绪。
每一轮循环(一次 "tick")大致是:
1. 算出最近的定时器还有多久到点 → 作为本轮 select 的超时时间
2. select(timeout):等 I/O 事件,或等到最近的定时器到点
3. 把「I/O 就绪的回调」和「已到点的定时器回调」放进 ready 队列
4. 把 ready 队列里当前的回调「逐个执行完」
5. 回到第 1 步
和 JS 的根本区别:
| 维度 | JS / 浏览器 | Python asyncio |
|---|---|---|
| 队列结构 | 宏任务队列 + 微任务队列(二分) | 单一 ready 队列 + 定时器堆 |
| 设计动机 | 与渲染协调,把原子更新绑定在一帧内 | 纯后端调度,无渲染概念,模型更扁平 |
await 续体优先级 | 走微任务,高于 setTimeout 等宏任务 | 就是普通就绪回调,进同一个 ready 队列 |
| 「下一个 tick」语义 | 微任务排空 → (渲染)→ 下个宏任务 | 一轮 select + 执行 ready 回调 |
JS 的两队列模型是被浏览器渲染需求「逼」出来的;Python 没有渲染负担,所以用了一个更扁平的单队列 + 定时器堆模型。 但两者对应用层开发者暴露的 async/await 心智模型是高度一致的——遇到 await 让出、I/O 好了恢复。
四、异步的应用:一个真实的 QQ 机器人框架
来看异步在真实项目里怎么用。下面以我正在开发的基于 NcatBot 的 QQ AI Agent 框架(AEsirClaw)为例。
4.1 整条链路都是异步的
一条 QQ 消息从进来到回复,几乎每一环都在 await:
QQ 消息 ──→ WebSocket 收包(await recv)
│
▼
事件分发 → 防抖调度(await sleep 等窗口)
│
▼
组装上下文 → Agent Loop ◄──► LLM API(await chat)
│
├── execute_task → Docker 沙箱(await 执行)
└── send_group_msg → QQ(await 发送 + await 打字延迟)
每个 await 都是一次「让出 CPU 去处理别的会话」的机会。正因如此,单线程就能同时服务多个群和私聊:A 群在等 LLM 返回时,loop 顺手就把 B 群刚到的消息推进了。
4.2 异步让「拟人化」变得自然
机器人为了模拟真人打字节奏,会分段发送、每段之间停顿。同步写法里「停顿」意味着卡死整个程序;异步里它只是一次 await asyncio.sleep,停顿期间 loop 照样服务别人:
for seg in messages:
await send(seg)
await asyncio.sleep(字数 * delay + 随机波动) # 停顿,但不阻塞其他会话
4.3 防抖:用异步实现「听完再说」
人聊天习惯是连发好几条,机器人不该逐条机械回复。框架用一个异步防抖器:首条消息到达后启动一个 5 秒窗口(await asyncio.sleep(5)),窗口期间的新消息只刷新「候选」,窗口结束才取最新候选统一处理。整个等待逻辑就是一个挂在 loop 上的协程,等待期间完全不占用 CPU。
4.4 框架如何托管 event loop
应用层开发者通常不自己写 asyncio.run,而是框架替你把 loop 跑起来。NcatBot 的入口极简:
from ncatbot.core import BotClient
bot = BotClient()
bot.run_frontend() # 一切从这里开始
run_frontend() 内部最终会进入一个长驻的事件循环,由它统一驱动所有异步任务。理解「框架在哪里、用哪个 loop 跑你的代码」,是用好异步框架的关键——下一节的 bug 正是栽在这上面。
这里先埋一个伏笔:NcatBot 在启动过程中其实会创建不止一个 event loop,它们各司其职、互相独立。记住这一点,往下看。
五、实践:定时任务的实现与一次真实 bug 复盘
需求很常见:让 QQ 机器人能自主设定定时任务(「每天早上 8 点拉一遍新闻」「每隔一小时报时」「10 分钟后提醒我」),到点后机器人主动在原会话里发言。
5.1 调度器的设计
核心是一个 TaskScheduler:用 JSON 文件持久化任务,挂一个常驻的异步扫描循环,每隔 N 秒扫一遍,把到期的任务触发掉。扫描循环长这样:
async def _scan_loop(self) -> None:
"""常驻循环:周期性扫描到期任务并触发。"""
while True:
try:
await asyncio.sleep(self.scan_interval)
await self._tick() # 扫描 + 触发到期任务
except asyncio.CancelledError:
break
这是个标准的异步后台循环:while True + await asyncio.sleep。它必须被某个持续运行的 loop 调度,才能一轮一轮地转下去。启动它的代码也很直觉:
def start(self) -> None:
self._loop_task = asyncio.create_task(self._scan_loop())
到点触发时,它调用一个回调,往会话里注入一条「定时任务触发」系统消息,再唤醒一次 Agent Loop,让机器人自己决定怎么发言。整个机制干净利落——逻辑测试全过,任务也能正常写进 JSON 文件。
然后部署上去,到点了……什么都没发生。
5.2 现象
- 任务确实被创建了:
schedules.json里躺着两条daily任务。 - 时间早就过了,
next_run时间戳算得也对。 - 但日志里没有任何
[Scheduler] 触发任务的记录,连「扫描器已启动」之后的活动都看不到。
任务能存、时间能算,就是不触发。问题不在逻辑,而在那个扫描循环根本没在转。
5.3 根因:create_task 挂错了 loop
我最初把 scheduler.start() 放在了插件的 on_load(插件加载时的初始化钩子)里。看起来天经地义——加载时启动调度器,对吧?
错。关键在于框架怎么调用 on_load。扒开 NcatBot 的插件加载器:
# ncatbot/plugin_system/loader.py
def _run_init():
loop = asyncio.new_event_loop() # ← 新建一个临时 loop
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(plugin.__onload__()) # ← 跑你的 on_load
finally:
loop.close() # ← 跑完立刻关闭这个 loop!
而且这一切还被丢进一个独立线程里执行。问题链条于是浮现:
on_load运行在这个临时 loop(Loop ①)上。start()里的asyncio.create_task(_scan_loop())把扫描循环绑定到了 Loop ①(回忆第二节的认知 2:create_task 绑定当前正在跑的 loop)。on_load一返回,run_until_complete结束,紧接着loop.close()—— Loop ① 死亡。- 扫描循环卡在第一个
await asyncio.sleep(...)上,再也没有 loop 去唤醒它(认知 3)。 - 而真正长驻的业务主 loop(Loop ②,由
run_frontend内部的asyncio.run(connect_websocket())驱动)上,压根没有这个扫描任务。
任务被挂在了一个用完即弃的 loop 上,那个 loop 关了之后,任务就成了无人调度的僵尸。
5.4 框架里到底有几个 loop
把 run_frontend() 的链路摊开看,就能数清楚:
run_frontend()
└─ start()
├─ 加载插件
│ └─ 独立线程
│ └─ Loop ① = asyncio.new_event_loop() ← 临时
│ └─ run_until_complete(on_load) ← ❌ 我最初在这里 create_task
│ └─ loop.close() ← Loop ① 死亡,扫描循环冻结
│
└─ asyncio.run(connect_websocket())
└─ Loop ② = 长驻主循环 ← ✅ 应该挂在这里
├─ while True: recv() → 分发 QQ 消息
└─ 触发 STARTUP 事件 → _on_bot_ready ← 这里 start() 才对
- Loop ①(临时):只为跑一次性的
on_load初始化,run_until_complete完就close。 - Loop ②(长驻):bot 的命脉,跑
while True收消息,所有 QQ 事件、启动回调都在它身上。
5.5 loop.run_until_complete vs run_frontend() 的本质区别
这两者处在完全不同的抽象层级,正好对照理解:
| 维度 | loop.run_until_complete(coro) | run_frontend() |
|---|---|---|
| 本质 | event loop 的底层方法:跑到指定协程完成就停 | 框架的顶层入口:启动整个 bot 并阻塞到退出 |
| 生命周期 | 短暂,跑完一个协程就返回(随后被 close) | 长驻,内部进 while True 收消息,正常永不返回 |
| 用的 loop | 临时新建的 Loop ①,用完即弃 | 内部 asyncio.run 独占运行的 Loop ② |
| 适合放什么 | 一次性、跑完就结束的初始化 | 需要长期存活的后台任务应挂在它的 loop 上 |
| 类比 | 临时点火烧开一壶水就熄火 | 常明的灶火,整晚不灭 |
补一个常被忽略的关系:asyncio.run(coro) ≈ 「新建 loop + run_until_complete(coro) + 最后 close()」。所以二者底层是同一类操作,区别只在跑的协程会不会很快结束——connect_websocket 是死循环(loop 长驻),on_load 很快结束(loop 短命)。
5.6 修复
把 start() 从 on_load 挪到 _on_bot_ready(OFFICIAL_STARTUP_EVENT 的回调,运行在长驻主 loop Loop ② 上):
async def _on_bot_ready(self, event):
"""Bot 连接成功后加载历史消息,并在主 loop 上启动定时任务调度器。"""
await self._load_recent_messages()
# 在 bot 主 event loop 上启动调度器(此回调即运行于该 loop)
self.scheduler.start()
而 on_load 里只保留「构造对象、注册回调」这类不依赖 loop 存活的同步初始化,并留下注释警示后人:
# 注意:on_load 由框架在独立线程的临时 event loop 上执行
# (loader.py: loop.run_until_complete(...) 后 loop.close()),
# 因此这里只构造调度器,绝不能在此 start()——否则扫描协程会被挂到
# 那个用完即弃的 loop 上,永远不会被调度。真正的启动放在 _on_bot_ready 中。
self.scheduler = TaskScheduler(...)
self.scheduler.set_callback(self._on_scheduled_trigger)
5.7 最小复现:一眼看穿
为了把这个机制钉死,写一个不依赖框架的十几行 demo(项目里 test/test_async.py),对比两种 loop:
async def background_loop(tag):
while True:
await asyncio.sleep(0.2)
fired.append(tag)
# 场景 A:临时 loop —— 等价于 on_load
def scenario_temp_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(fake_on_load()) # 内部 create_task(background_loop)
loop.close() # ← loop 关闭
time.sleep(1.0) # 等一秒,看后台循环跑没跑
# 场景 B:长驻 loop —— 等价于主 loop
def scenario_long_running_loop():
async def main():
asyncio.create_task(background_loop("B"))
await asyncio.sleep(1.0) # loop 持续运转 1 秒
asyncio.run(main())
运行结果:
【场景 A:临时 loop(用完即弃)】
>>> 结果:后台循环执行了 0 次(loop 已关闭,无人调度)
【场景 B:长驻 loop】
[B-长驻loop] 后台循环执行了一次 ... (×4)
>>> 结果:后台循环执行了 4 次(loop 持续运转,正常调度)
完全相同的 create_task(background_loop),差别只在「它绑定的 loop 是否持续运行」。 场景 A 的 0 次,就是线上「定时任务永不触发」的精确缩影;场景 B 的 4 次,就是修复后扫描循环每 20 秒正常扫一遍的样子。
5.8 经验法则
在「框架托管 event loop」的场景下,判断「后台任务该在哪 create_task」只需一条:
create_task必须在「那个会长期活着的 loop」正在运行时调用。
具体到这类机器人框架:
- ❌ 不要在
on_load/__init__这类「初始化完就结束」的地方起长驻后台任务。 - ✅ 在「bot 就绪 / 连接成功」的事件回调里起——那才是长驻主 loop。
- 快速自检:如果你的任务需要「一直跑」,但它所在的函数是「跑完就返回」的,那它大概率挂错了 loop。
六、延伸思考:一个程序能有几个 event loop?
bug 复盘里反复出现「临时 loop」「长驻 loop」,自然引出三个递进的问题。
6.1 异步只能有一个 event loop 吗?
不是。但有一条铁律:每个线程在同一时刻,最多只能有一个「正在运行」的 event loop。
「能创建几个」和「能同时运行几个」是两回事:
| 维度 | 答案 |
|---|---|
| 一个程序能创建几个 loop? | 任意多个,asyncio.new_event_loop() 想建几个建几个 |
| 一个线程能同时运行几个 loop? | 只能 1 个,loop 是阻塞式运行的,loop.run_xxx() 没返回前线程被它占着 |
| 多个 loop 能在不同线程里同时跑吗? | 可以,每个线程跑自己的 loop,互不干扰 |
| loop 能先后在同一线程跑吗? | 可以,跑完一个 close 掉,再建一个跑——这正是 NcatBot 干的事 |
关键限制来自一个线程局部的概念「当前线程的 event loop」:
asyncio.get_running_loop()返回此刻正在跑的那个 loop。asyncio.create_task(coro)把任务绑定到它返回的那个 loop——这就是定时任务 bug 的命门:你在哪个 loop 运行时调create_task,任务就绑给哪个 loop。
6.2 new_event_loop() 和 asyncio.run(main()) 的区别
这俩不在一个层级——一个是「底层手动挡」,一个是「高层自动挡」。
asyncio.new_event_loop():只「造」一个 loop,不运行任何东西,生命周期全靠你:
loop = asyncio.new_event_loop() # 1. 造
asyncio.set_event_loop(loop) # 2. 设为本线程当前 loop
try:
loop.run_until_complete(main()) # 3. 手动驱动
finally:
loop.close() # 4. 手动关(不关就泄漏)
asyncio.run(main()):造 + 跑 + 清理 + 关,一条龙。等价于(简化):
def run(main):
loop = asyncio.new_event_loop() # 造
try:
asyncio.set_event_loop(loop)
return loop.run_until_complete(main) # 跑到完成
finally:
loop.run_until_complete(loop.shutdown_asyncgens()) # 清理残留 task / async gen
asyncio.set_event_loop(None)
loop.close() # 关
new_event_loop() | asyncio.run(main()) | |
|---|---|---|
| 抽象层级 | 底层,手动挡 | 高层,自动挡 |
| 做什么 | 只创建 loop 对象 | 创建 + 运行 + 清理 + 关闭 |
| 谁负责 close | 你自己 | 自动 |
| 能否复用 loop | 能(建一次跑多次) | 不能(每次新建新关) |
| 能否嵌套 | —(你自己管) | 不能,运行中的 loop 里再调会 RuntimeError |
| 典型用途 | 框架/库内部精细控制、子线程建 loop | 应用程序入口,跑一个顶层协程 |
一句话:asyncio.run ≈ new_event_loop + run_until_complete + close 的安全封装。
6.3 机器人框架设计「两个 loop」是否合理?
回看 NcatBot 的 start(),它确实先后用了两个:
run_coroutine(self.plugin_loader.load_plugins) # 临时 loop ①:插件加载,跑完即弃
...
asyncio.run(self.adapter.connect_websocket()) # 长驻 loop ②:业务主循环
我的判断分两层:
✅ 合理的部分:生命周期不同的阶段用不同 loop 是正当的。「一次性初始化」和「长期运行」本就是两个阶段,只要两个 loop 不同时运行、而是先后接力,工程上完全站得住脚。
⚠️ 不优雅的部分:NcatBot 这个具体实现是历史包袱,而非精心设计。两个味道:
run_coroutine自己都标了「已废弃」——其源码注释直言「NcatBot 已重构为纯异步架构,此函数使用asyncio.run()会阻塞线程,违背异步并发原则」。即作者自己都认为「插件加载另起一个线程 + 临时 loop」不该保留。- 正是这个设计埋了定时任务的坑:
on_load跑在临时 loop ① 上,create_task起的扫描循环随 ① 关闭而冻结,而长驻的是 ②。统一一个 loop,这坑根本不存在。
更优雅的设计是「单一长驻 loop」:
asyncio.run(main())
└─ loop(唯一且长驻)
├─ await 初始化(加载插件、连数据库……) # 同一个 loop
├─ create_task(后台任务们) # 绑定到这个长驻 loop ✅
└─ await 主循环(while True 收消息) # 同一个 loop
一个 loop 从头跑到尾,所有初始化、后台任务、业务循环都挂在它身上,就不会「挂错 loop」。
那「多 loop」什么时候才是真正的合理设计、而非妥协?——多线程隔离(阻塞/CPU 密集活丢子线程跑独立 loop)、库与宿主集成(某库内部要跑自己的 loop,靠 run_coroutine_threadsafe 跨 loop 通信)、多进程隔离(每进程一个 loop,靠 IPC)。这些是目的明确的架构选择;而 NcatBot 的两个 loop 更像「重构没做完的残留」。
七、小结
- 异步用「等 I/O 时让出 CPU」换来单线程的高并发,天生契合 QQ 机器人这种 I/O 密集场景。
- 事件循环是异步的心脏;协程靠它驱动,Task 绑定它调度,loop 一停任务即冻结。
- JS/TS 因渲染需求采用宏/微任务双队列,Python 用更扁平的单就绪队列 + 定时器堆,但上层心智一致。
- 真实框架往往托管多个 loop,分清「临时初始化 loop」与「长驻业务 loop」,是把长驻后台任务挂对地方的前提——这次定时任务 bug 就是活生生的教训。
- 一个程序可以有多个 loop,但同线程同一刻只跑一个;
asyncio.run是new_event_loop + run + close的安全封装;「单一长驻 loop」通常是比多 loop 更省心的设计。
异步不难,难的是搞清楚「此刻我的代码,跑在哪个 loop 上,那个 loop 还活着吗」。