18.5.9. 使用 asyncio 进行开发 — Python 文档

来自菜鸟教程
Python/docs/3.6/library/asyncio-dev
跳转至:导航、​搜索

18.5.9. 使用 asyncio 进行开发

异步编程不同于经典的“顺序”编程。 本页列出了常见的陷阱并解释了如何避免它们。

18.5.9.1。 asyncio 的调试模式

asyncio 的实现是为了性能而编写的。 为了方便异步代码的开发,不妨开启【X78X】调试模式【X92X】。

要为应用程序启用所有调试检查:

示例调试检查:

  • 记录 已定义但从未“从” 产生的协程
  • call_soon()call_at() 方法如果从错误的线程调用,则会引发异常。
  • 记录选择器的执行时间
  • 执行时间超过 100 毫秒的日志回调。 AbstractEventLoop.slow_callback_duration 属性是“慢”回调的最短持续时间(以秒为单位)。
  • ResourceWarning 警告在传输和事件循环 未明确关闭 时发出。

也可以看看

AbstractEventLoop.set_debug() 方法和 异步记录器


18.5.9.2. 消除

在经典编程中,取消任务并不常见。 在异步编程中,它不仅是常见的,而且您必须准备代码来处理它。

可以使用 Future.cancel() 方法显式取消期货和任务。 wait_for()函数在超时发生时取消等待的任务。 还有许多其他情况可以间接取消任务。

如果未来被取消,不要调用 set_result()set_exception() 方法的 Future:它会失败并出现异常。 例如,写:

if not fut.cancelled():
    fut.set_result('done')

不要使用 AbstractEventLoop.call_soon() 直接安排对 set_result()set_exception() 方法的调用:未来可以是在调用其方法之前取消。

如果你等待一个未来,你应该尽早检查未来是否被取消,以避免无用的操作。 例子:

@coroutine
def slow_operation(fut):
    if fut.cancelled():
        return
    # ... slow computation ...
    yield from fut
    # ...

shield() 函数也可用于忽略取消。


18.5.9.3. 并发和多线程

事件循环在一个线程中运行,并在同一线程中执行所有回调和任务。 当一个任务在事件循环中运行时,没有其他任务在同一个线程中运行。 但是当任务使用yield from时,任务被挂起,事件循环执行下一个任务。

要从不同的线程调度回调,应使用 AbstractEventLoop.call_soon_threadsafe() 方法。 例子:

loop.call_soon_threadsafe(callback, *args)

大多数 asyncio 对象都不是线程安全的。 如果您访问事件循环之外的对象,您应该只担心。 例如,要取消一个未来,不要直接调用它的 Future.cancel() 方法,而是:

loop.call_soon_threadsafe(fut.cancel)

为了处理信号和执行子进程,事件循环必须在主线程中运行。

要从不同的线程调度协程对象,应使用 run_coroutine_threadsafe() 函数。 它返回一个 concurrent.futures.Future 来访问结果:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
result = future.result(timeout)  # Wait for the result with a timeout

AbstractEventLoop.run_in_executor() 方法可以与线程池执行器一起使用,在不同线程中执行回调,从而不阻塞事件循环的线程。

也可以看看

同步原语部分描述了同步任务的方法。

子进程和线程 部分列出了从不同线程运行子进程的异步限制。


18.5.9.4. 正确处理阻塞函数

不应直接调用阻塞函数。 例如,如果功能阻塞 1 秒,其他任务将延迟 1 秒,这会对反应性产生重要影响。

对于网络和子进程,asyncio 模块提供了高级 API,如 protocols

一个 executor 可以用来在不同的线程甚至不同的进程中运行任务,而不是阻塞事件循环的线程。 请参阅 AbstractEventLoop.run_in_executor() 方法。

也可以看看

Delayed calls 部分详细说明了事件循环如何处理时间。


18.5.9.5. 日志记录

asyncio 模块使用 logging 模块在记录器 'asyncio' 中记录信息。

asyncio 模块的默认日志级别是 logging.INFO。 对于那些不希望 asyncio 如此冗长的人,可以更改日志级别。 例如,要将级别更改为 logging.WARNING

logging.getLogger('asyncio').setLevel(logging.WARNING)

18.5.9.6. 检测从未调度的协程对象

当协程函数被调用并且其结果没有传递给 ensure_future()AbstractEventLoop.create_task() 方法时,协程对象的执行将永远不会被调度,即可能是一个错误。 启用 asyncio 的调试模式 记录警告 以检测它。

错误示例:

import asyncio

@asyncio.coroutine
def test():
    print("never scheduled")

test()

调试模式下的输出:

Coroutine test() at test.py:3 was never yielded from
Coroutine object created at (most recent call last):
  File "test.py", line 7, in <module>
    test()

修复方法是使用协程对象调用 ensure_future() 函数或 AbstractEventLoop.create_task() 方法。

也可以看看

挂起的任务已销毁


18.5.9.7。 检测从未消耗的异常

Python 通常在未处理的异常上调用 sys.excepthook()。 如果 Future.set_exception() 被调用,但异常从未被消耗,则 sys.excepthook() 不会被调用。 相反,当垃圾收集器删除未来时,会发出 日志 ,并带有引发异常的回溯。

未处理异常示例:

import asyncio

@asyncio.coroutine
def bug():
    raise Exception("not consumed")

loop = asyncio.get_event_loop()
asyncio.ensure_future(bug())
loop.run_forever()
loop.close()

输出:

Task exception was never retrieved
future: <Task finished coro=<coro() done, defined at asyncio/coroutines.py:139> exception=Exception('not consumed',)>
Traceback (most recent call last):
  File "asyncio/tasks.py", line 237, in _step
    result = next(coro)
  File "asyncio/coroutines.py", line 141, in coro
    res = func(*args, **kw)
  File "test.py", line 5, in bug
    raise Exception("not consumed")
Exception: not consumed

开启asyncio的调试模式,获取任务创建的回溯。 调试模式下的输出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3> exception=Exception('not consumed',) created at test.py:8>
source_traceback: Object created at (most recent call last):
  File "test.py", line 8, in <module>
    asyncio.ensure_future(bug())
Traceback (most recent call last):
  File "asyncio/tasks.py", line 237, in _step
    result = next(coro)
  File "asyncio/coroutines.py", line 79, in __next__
    return next(self.gen)
  File "asyncio/coroutines.py", line 141, in coro
    res = func(*args, **kw)
  File "test.py", line 5, in bug
    raise Exception("not consumed")
Exception: not consumed

有不同的选项可以解决这个问题。 第一个选项是将协程链接到另一个协程中并使用经典的 try/except:

@asyncio.coroutine
def handle_exception():
    try:
        yield from bug()
    except Exception:
        print("exception consumed")

loop = asyncio.get_event_loop()
asyncio.ensure_future(handle_exception())
loop.run_forever()
loop.close()

另一种选择是使用 AbstractEventLoop.run_until_complete() 函数:

task = asyncio.ensure_future(bug())
try:
    loop.run_until_complete(task)
except Exception:
    print("exception consumed")

也可以看看

Future.exception() 方法。


18.5.9.8. 正确链协程

当一个协程函数调用其他协程函数和任务时,它们应该用 yield from 显式链接。 否则,不能保证执行是顺序的。

使用 asyncio.sleep() 模拟慢速操作的不同错误示例:

import asyncio

@asyncio.coroutine
def create():
    yield from asyncio.sleep(3.0)
    print("(1) create file")

@asyncio.coroutine
def write():
    yield from asyncio.sleep(1.0)
    print("(2) write into file")

@asyncio.coroutine
def close():
    print("(3) close file")

@asyncio.coroutine
def test():
    asyncio.ensure_future(create())
    asyncio.ensure_future(write())
    asyncio.ensure_future(close())
    yield from asyncio.sleep(2.0)
    loop.stop()

loop = asyncio.get_event_loop()
asyncio.ensure_future(test())
loop.run_forever()
print("Pending tasks at exit: %s" % asyncio.Task.all_tasks(loop))
loop.close()

预期输出:

(1) create file
(2) write into file
(3) close file
Pending tasks at exit: set()

实际输出:

(3) close file
(2) write into file
Pending tasks at exit: {<Task pending create() at test.py:7 wait_for=<Future pending cb=[Task._wakeup()]>>}
Task was destroyed but it is pending!
task: <Task pending create() done at test.py:5 wait_for=<Future pending cb=[Task._wakeup()]>>

循环在create()结束前停止,close()write()之前被调用,而协程函数的调用顺序是:create(),[ X162X]、close()

为了修复这个例子,任务必须用 yield from 标记:

@asyncio.coroutine
def test():
    yield from asyncio.ensure_future(create())
    yield from asyncio.ensure_future(write())
    yield from asyncio.ensure_future(close())
    yield from asyncio.sleep(2.0)
    loop.stop()

或不带 asyncio.ensure_future()

@asyncio.coroutine
def test():
    yield from create()
    yield from write()
    yield from close()
    yield from asyncio.sleep(2.0)
    loop.stop()

18.5.9.9。 挂起的任务已销毁

如果一个挂起的任务被销毁,其包装的 协程 的执行没有完成。 这可能是一个错误,因此会记录警告。

日志示例:

Task was destroyed but it is pending!
task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()]>>

开启asyncio的调试模式,获取任务创建的回溯。 登录调试模式示例:

Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "test.py", line 15, in <module>
    task = asyncio.ensure_future(coro, loop=loop)
task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()] created at test.py:7> created at test.py:15>

18.5.9.10。 关闭传输和事件循环

当不再需要传输时,调用其 close() 方法释放资源。 事件循环也必须明确关闭。

如果传输或事件循环未明确关闭,则会在其析构函数中发出 ResourceWarning 警告。 默认情况下,忽略 ResourceWarning 警告。 asyncio 部分的 调试模式解释了如何显示它们。