使用 asyncio 进行开发 — Python 文档

来自菜鸟教程
Python/docs/3.10/library/asyncio-dev
跳转至:导航、​搜索

使用 asyncio 开发

异步编程不同于经典的“顺序”编程。

此页面列出了常见的错误和陷阱,并说明了如何避免它们。

调试模式

默认情况下 asyncio 在生产模式下运行。 为了方便开发asyncio有一个调试模式

有几种方法可以启用 asyncio 调试模式:

除了启用调试模式外,还要考虑:

  • asyncio logger 的日志级别设置为 logging.DEBUG,例如可以在应用程序启动时运行以下代码片段:

    logging.basicConfig(level=logging.DEBUG)
  • 配置 warnings 模块以显示 ResourceWarning 警告。 一种方法是使用 -W default 命令行选项。

启用调试模式时:

  • asyncio 检查未等待的 协程 并记录它们; 这减轻了“忘记等待”的陷阱。
  • 许多非线程安全的异步 API(例如 loop.call_soon()loop.call_at() 方法)如果从错误的线程调用它们会引发异常。
  • 如果执行 I/O 操作所需的时间过长,则会记录 I/O 选择器的执行时间。
  • 记录超过 100 毫秒的回调。 loop.slow_callback_duration 属性可用于设置被视为“慢”的最短执行持续时间(以秒为单位)。


并发和多线程

事件循环在一个线程(通常是主线程)中运行,并在其线程中执行所有回调和任务。 当一个任务在事件循环中运行时,没有其他任务可以在同一个线程中运行。 当 Task 执行 await 表达式时,正在运行的 Task 被挂起,事件循环执行下一个 Task。

要从另一个操作系统线程调度 callback,应使用 loop.call_soon_threadsafe() 方法。 例子:

loop.call_soon_threadsafe(callback, *args)

几乎所有的 asyncio 对象都不是线程安全的,这通常不是问题,除非有代码从 Task 或回调外部使用它们。 如果需要这样的代码来调用低级异步 API,则应使用 loop.call_soon_threadsafe() 方法,例如:

loop.call_soon_threadsafe(fut.cancel)

要从不同的 OS 线程调度协程对象,应使用 run_coroutine_threadsafe() 函数。 它返回一个 concurrent.futures.Future 来访问结果:

async def coro_func():
     return await asyncio.sleep(1, 42)

# Later in another OS thread:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()

为了处理信号和执行子进程,事件循环必须在主线程中运行。

loop.run_in_executor() 方法可以与 concurrent.futures.ThreadPoolExecutor 一起使用,以在不同的操作系统线程中执行阻塞代码,而不阻塞运行事件循环的操作系统线程。

目前没有办法直接从不同的进程(例如以 multiprocessing 启动的进程)调度协程或回调。 事件循环方法 部分列出了可以从管道读取和观察文件描述符而不阻塞事件循环的 API。 此外,asyncio 的 Subprocess API 提供了一种从事件循环启动进程并与其通信的方法。 最后,前面提到的 loop.run_in_executor() 方法也可以与 concurrent.futures.ProcessPoolExecutor 一起使用,以在不同的进程中执行代码。


运行阻塞代码

不应直接调用阻塞(CPU 绑定)代码。 例如,如果一个函数执行 CPU 密集型计算 1 秒,则所有并发 asyncio 任务和 IO 操作都会延迟 1 秒。

执行器可用于在不同线程甚至不同进程中运行任务,以避免使用事件循环阻塞操作系统线程。 有关更多详细信息,请参阅 loop.run_in_executor() 方法。


日志记录

asyncio 使用 logging 模块,所有日志记录都通过 "asyncio" 记录器执行。

默认日志级别为logging.INFO,可以轻松调整:

logging.getLogger("asyncio").setLevel(logging.WARNING)

检测从未等待的协程

当调用协程函数但未等待时(例如 coro() 而不是 await coro()) 或者协程没有用 asyncio.create_task() 调度,asyncio 将发出 RuntimeWarning

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

输出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

调试模式下的输出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

通常的解决方法是等待协程或调用 asyncio.create_task() 函数:

async def main():
    await test()

检测从未检索到的异常

如果调用了 Future.set_exception() 但从未等待 Future 对象,则异常将永远不会传播到用户代码。 在这种情况下,当 Future 对象被垃圾回收时,asyncio 会发出一条日志消息。

未处理异常的示例:

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

输出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

启用调试模式以获取创建任务的回溯:

asyncio.run(main(), debug=True)

调试模式下的输出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed