18.5.9. 使用 asyncio 进行开发 — Python 文档
18.5.9. 使用 asyncio 进行开发
异步编程不同于经典的“顺序”编程。 本页列出了常见的陷阱并解释了如何避免它们。
18.5.9.1。 asyncio 的调试模式
asyncio 的实现是为了性能而编写的。 为了方便异步代码的开发,不妨开启【X78X】调试模式【X92X】。
要为应用程序启用所有调试检查:
- 通过将环境变量 PYTHONASYNCIODEBUG 设置为
1
,或通过调用 AbstractEventLoop.set_debug() 来全局启用 asyncio 调试模式。 - 将asyncio logger的日志级别设置为
logging.DEBUG
。 例如,在启动时调用logging.basicConfig(level=logging.DEBUG)
。 - 配置 warnings 模块以显示 ResourceWarning 警告。 例如,使用 Python 的
-Wdefault
命令行选项来显示它们。
示例调试检查:
- 记录 已定义但从未“从” 产生的协程
- call_soon() 和 call_at() 方法如果从错误的线程调用,则会引发异常。
- 记录选择器的执行时间
- 执行时间超过 100 毫秒的日志回调。
AbstractEventLoop.slow_callback_duration
属性是“慢”回调的最短持续时间(以秒为单位)。 - ResourceWarning 警告在传输和事件循环 未明确关闭 时发出。
18.5.9.2. 消除
在经典编程中,取消任务并不常见。 在异步编程中,它不仅是常见的,而且您必须准备代码来处理它。
可以使用 Future.cancel() 方法显式取消期货和任务。 wait_for()
函数在超时发生时取消等待的任务。 还有许多其他情况可以间接取消任务。
如果未来被取消,不要调用 set_result() 或 set_exception() 方法的 Future:它会失败并出现异常。 例如,写:
if not fut.cancelled():
fut.set_result('done')
不要使用 AbstractEventLoop.call_soon() 直接安排对 set_result() 或 set_exception() 方法的调用:未来可以是在调用其方法之前取消。
如果你等待一个未来,你应该尽早检查未来是否被取消,以避免无用的操作。 例子:
@coroutine
def slow_operation(fut):
if fut.cancelled():
return
# ... slow computation ...
yield from fut
# ...
shield()
函数也可用于忽略取消。
18.5.9.3. 并发和多线程
事件循环在一个线程中运行,并在同一线程中执行所有回调和任务。 当一个任务在事件循环中运行时,没有其他任务在同一个线程中运行。 但是当任务使用yield from
时,任务被挂起,事件循环执行下一个任务。
要从不同的线程调度回调,应使用 AbstractEventLoop.call_soon_threadsafe() 方法。 例子:
loop.call_soon_threadsafe(callback, *args)
大多数 asyncio 对象都不是线程安全的。 如果您访问事件循环之外的对象,您应该只担心。 例如,要取消一个未来,不要直接调用它的 Future.cancel() 方法,而是:
loop.call_soon_threadsafe(fut.cancel)
为了处理信号和执行子进程,事件循环必须在主线程中运行。
要从不同的线程调度协程对象,应使用 run_coroutine_threadsafe() 函数。 它返回一个 concurrent.futures.Future 来访问结果:
future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
result = future.result(timeout) # Wait for the result with a timeout
AbstractEventLoop.run_in_executor()
方法可以与线程池执行器一起使用,在不同线程中执行回调,从而不阻塞事件循环的线程。
18.5.9.4. 正确处理阻塞函数
不应直接调用阻塞函数。 例如,如果功能阻塞 1 秒,其他任务将延迟 1 秒,这会对反应性产生重要影响。
对于网络和子进程,asyncio 模块提供了高级 API,如 protocols。
一个 executor 可以用来在不同的线程甚至不同的进程中运行任务,而不是阻塞事件循环的线程。 请参阅 AbstractEventLoop.run_in_executor()
方法。
18.5.9.5. 日志记录
asyncio 模块使用 logging 模块在记录器 'asyncio'
中记录信息。
asyncio 模块的默认日志级别是 logging.INFO
。 对于那些不希望 asyncio 如此冗长的人,可以更改日志级别。 例如,要将级别更改为 logging.WARNING
:
logging.getLogger('asyncio').setLevel(logging.WARNING)
18.5.9.6. 检测从未调度的协程对象
当协程函数被调用并且其结果没有传递给 ensure_future() 或 AbstractEventLoop.create_task() 方法时,协程对象的执行将永远不会被调度,即可能是一个错误。 启用 asyncio 的调试模式 记录警告 以检测它。
错误示例:
import asyncio
@asyncio.coroutine
def test():
print("never scheduled")
test()
调试模式下的输出:
Coroutine test() at test.py:3 was never yielded from
Coroutine object created at (most recent call last):
File "test.py", line 7, in <module>
test()
修复方法是使用协程对象调用 ensure_future() 函数或 AbstractEventLoop.create_task() 方法。
18.5.9.7。 检测从未消耗的异常
Python 通常在未处理的异常上调用 sys.excepthook()。 如果 Future.set_exception() 被调用,但异常从未被消耗,则 sys.excepthook() 不会被调用。 相反,当垃圾收集器删除未来时,会发出 日志 ,并带有引发异常的回溯。
未处理异常示例:
import asyncio
@asyncio.coroutine
def bug():
raise Exception("not consumed")
loop = asyncio.get_event_loop()
asyncio.ensure_future(bug())
loop.run_forever()
loop.close()
输出:
Task exception was never retrieved
future: <Task finished coro=<coro() done, defined at asyncio/coroutines.py:139> exception=Exception('not consumed',)>
Traceback (most recent call last):
File "asyncio/tasks.py", line 237, in _step
result = next(coro)
File "asyncio/coroutines.py", line 141, in coro
res = func(*args, **kw)
File "test.py", line 5, in bug
raise Exception("not consumed")
Exception: not consumed
开启asyncio的调试模式,获取任务创建的回溯。 调试模式下的输出:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3> exception=Exception('not consumed',) created at test.py:8>
source_traceback: Object created at (most recent call last):
File "test.py", line 8, in <module>
asyncio.ensure_future(bug())
Traceback (most recent call last):
File "asyncio/tasks.py", line 237, in _step
result = next(coro)
File "asyncio/coroutines.py", line 79, in __next__
return next(self.gen)
File "asyncio/coroutines.py", line 141, in coro
res = func(*args, **kw)
File "test.py", line 5, in bug
raise Exception("not consumed")
Exception: not consumed
有不同的选项可以解决这个问题。 第一个选项是将协程链接到另一个协程中并使用经典的 try/except:
@asyncio.coroutine
def handle_exception():
try:
yield from bug()
except Exception:
print("exception consumed")
loop = asyncio.get_event_loop()
asyncio.ensure_future(handle_exception())
loop.run_forever()
loop.close()
另一种选择是使用 AbstractEventLoop.run_until_complete() 函数:
task = asyncio.ensure_future(bug())
try:
loop.run_until_complete(task)
except Exception:
print("exception consumed")
18.5.9.8. 正确链协程
当一个协程函数调用其他协程函数和任务时,它们应该用 yield from
显式链接。 否则,不能保证执行是顺序的。
使用 asyncio.sleep()
模拟慢速操作的不同错误示例:
import asyncio
@asyncio.coroutine
def create():
yield from asyncio.sleep(3.0)
print("(1) create file")
@asyncio.coroutine
def write():
yield from asyncio.sleep(1.0)
print("(2) write into file")
@asyncio.coroutine
def close():
print("(3) close file")
@asyncio.coroutine
def test():
asyncio.ensure_future(create())
asyncio.ensure_future(write())
asyncio.ensure_future(close())
yield from asyncio.sleep(2.0)
loop.stop()
loop = asyncio.get_event_loop()
asyncio.ensure_future(test())
loop.run_forever()
print("Pending tasks at exit: %s" % asyncio.Task.all_tasks(loop))
loop.close()
预期输出:
(1) create file
(2) write into file
(3) close file
Pending tasks at exit: set()
实际输出:
(3) close file
(2) write into file
Pending tasks at exit: {<Task pending create() at test.py:7 wait_for=<Future pending cb=[Task._wakeup()]>>}
Task was destroyed but it is pending!
task: <Task pending create() done at test.py:5 wait_for=<Future pending cb=[Task._wakeup()]>>
循环在create()
结束前停止,close()
在write()
之前被调用,而协程函数的调用顺序是:create()
,[ X162X]、close()
。
为了修复这个例子,任务必须用 yield from
标记:
@asyncio.coroutine
def test():
yield from asyncio.ensure_future(create())
yield from asyncio.ensure_future(write())
yield from asyncio.ensure_future(close())
yield from asyncio.sleep(2.0)
loop.stop()
或不带 asyncio.ensure_future()
:
@asyncio.coroutine
def test():
yield from create()
yield from write()
yield from close()
yield from asyncio.sleep(2.0)
loop.stop()
18.5.9.9。 挂起的任务已销毁
如果一个挂起的任务被销毁,其包装的 协程 的执行没有完成。 这可能是一个错误,因此会记录警告。
日志示例:
Task was destroyed but it is pending!
task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()]>>
开启asyncio的调试模式,获取任务创建的回溯。 登录调试模式示例:
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
File "test.py", line 15, in <module>
task = asyncio.ensure_future(coro, loop=loop)
task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()] created at test.py:7> created at test.py:15>
18.5.9.10。 关闭传输和事件循环
当不再需要传输时,调用其 close()
方法释放资源。 事件循环也必须明确关闭。
如果传输或事件循环未明确关闭,则会在其析构函数中发出 ResourceWarning 警告。 默认情况下,忽略 ResourceWarning 警告。 asyncio 部分的 调试模式解释了如何显示它们。