使用 asyncio 进行开发 — Python 文档
使用 asyncio 开发
异步编程不同于经典的“顺序”编程。
此页面列出了常见的错误和陷阱,并说明了如何避免它们。
调试模式
默认情况下 asyncio 在生产模式下运行。 为了方便开发asyncio有一个调试模式。
有几种方法可以启用 asyncio 调试模式:
- 将 PYTHONASYNCIODEBUG 环境变量设置为
1
。 - 使用Python开发模式。
- 将
debug=True
传递给 asyncio.run()。 - 调用 loop.set_debug()。
除了启用调试模式外,还要考虑:
将 asyncio logger 的日志级别设置为
logging.DEBUG
,例如可以在应用程序启动时运行以下代码片段:logging.basicConfig(level=logging.DEBUG)
配置 warnings 模块以显示 ResourceWarning 警告。 一种方法是使用 -W
default
命令行选项。
启用调试模式时:
- asyncio 检查未等待的 协程 并记录它们; 这减轻了“忘记等待”的陷阱。
- 许多非线程安全的异步 API(例如 loop.call_soon() 和 loop.call_at() 方法)如果从错误的线程调用它们会引发异常。
- 如果执行 I/O 操作所需的时间过长,则会记录 I/O 选择器的执行时间。
- 记录超过 100 毫秒的回调。
loop.slow_callback_duration
属性可用于设置被视为“慢”的最短执行持续时间(以秒为单位)。
并发和多线程
事件循环在一个线程(通常是主线程)中运行,并在其线程中执行所有回调和任务。 当一个任务在事件循环中运行时,没有其他任务可以在同一个线程中运行。 当 Task 执行 await
表达式时,正在运行的 Task 被挂起,事件循环执行下一个 Task。
要从另一个操作系统线程调度 callback,应使用 loop.call_soon_threadsafe() 方法。 例子:
loop.call_soon_threadsafe(callback, *args)
几乎所有的 asyncio 对象都不是线程安全的,这通常不是问题,除非有代码从 Task 或回调外部使用它们。 如果需要这样的代码来调用低级异步 API,则应使用 loop.call_soon_threadsafe() 方法,例如:
loop.call_soon_threadsafe(fut.cancel)
要从不同的 OS 线程调度协程对象,应使用 run_coroutine_threadsafe() 函数。 它返回一个 concurrent.futures.Future 来访问结果:
async def coro_func():
return await asyncio.sleep(1, 42)
# Later in another OS thread:
future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()
为了处理信号和执行子进程,事件循环必须在主线程中运行。
loop.run_in_executor()
方法可以与 concurrent.futures.ThreadPoolExecutor 一起使用,以在不同的操作系统线程中执行阻塞代码,而不阻塞运行事件循环的操作系统线程。
目前没有办法直接从不同的进程(例如以 multiprocessing 启动的进程)调度协程或回调。 事件循环方法 部分列出了可以从管道读取和观察文件描述符而不阻塞事件循环的 API。 此外,asyncio 的 Subprocess API 提供了一种从事件循环启动进程并与其通信的方法。 最后,前面提到的 loop.run_in_executor()
方法也可以与 concurrent.futures.ProcessPoolExecutor 一起使用,以在不同的进程中执行代码。
运行阻塞代码
不应直接调用阻塞(CPU 绑定)代码。 例如,如果一个函数执行 CPU 密集型计算 1 秒,则所有并发 asyncio 任务和 IO 操作都会延迟 1 秒。
执行器可用于在不同线程甚至不同进程中运行任务,以避免使用事件循环阻塞操作系统线程。 有关更多详细信息,请参阅 loop.run_in_executor()
方法。
日志记录
asyncio 使用 logging 模块,所有日志记录都通过 "asyncio"
记录器执行。
默认日志级别为logging.INFO
,可以轻松调整:
logging.getLogger("asyncio").setLevel(logging.WARNING)
检测从未等待的协程
当调用协程函数但未等待时(例如 coro()
而不是 await coro()
) 或者协程没有用 asyncio.create_task() 调度,asyncio 将发出 RuntimeWarning:
import asyncio
async def test():
print("never scheduled")
async def main():
test()
asyncio.run(main())
输出:
test.py:7: RuntimeWarning: coroutine 'test' was never awaited
test()
调试模式下的输出:
test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
File "../t.py", line 9, in <module>
asyncio.run(main(), debug=True)
< .. >
File "../t.py", line 7, in main
test()
test()
通常的解决方法是等待协程或调用 asyncio.create_task() 函数:
async def main():
await test()
检测从未检索到的异常
如果调用了 Future.set_exception() 但从未等待 Future 对象,则异常将永远不会传播到用户代码。 在这种情况下,当 Future 对象被垃圾回收时,asyncio 会发出一条日志消息。
未处理异常的示例:
import asyncio
async def bug():
raise Exception("not consumed")
async def main():
asyncio.create_task(bug())
asyncio.run(main())
输出:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
exception=Exception('not consumed')>
Traceback (most recent call last):
File "test.py", line 4, in bug
raise Exception("not consumed")
Exception: not consumed
启用调试模式以获取创建任务的回溯:
asyncio.run(main(), debug=True)
调试模式下的输出:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
exception=Exception('not consumed') created at asyncio/tasks.py:321>
source_traceback: Object created at (most recent call last):
File "../t.py", line 9, in <module>
asyncio.run(main(), debug=True)
< .. >
Traceback (most recent call last):
File "../t.py", line 4, in bug
raise Exception("not consumed")
Exception: not consumed