协程和任务 — Python 文档

来自菜鸟教程
Python/docs/3.9/library/asyncio-task
跳转至:导航、​搜索

协程和任务

本节概述了用于协同程序和任务的高级异步 API。

协程

使用 async/await 语法声明的 Coroutines 是编写 asyncio 应用程序的首选方式。 例如,以下代码片段(需要 Python 3.7+)打印“hello”,等待 1 秒,然后打印“world”:

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

请注意,简单地调用协程不会安排它执行:

>>> main()
<coroutine object main at 0x1053bb7c8>

为了实际运行协程,asyncio 提供了三种主要机制:

  • asyncio.run() 函数运行顶级入口点“main()”函数(见上面的例子。)

  • 等待协程。 以下代码片段将在等待 1 秒后打印“hello”,然后在等待 another 2 秒后打印“world”:

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
    
    async def main():
        print(f"started at {time.strftime('%X')}")
    
        await say_after(1, 'hello')
        await say_after(2, 'world')
    
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())

    预期输出:

    started at 17:13:52
    hello
    world
    finished at 17:13:55
  • asyncio.create_task() 函数作为 asyncio Tasks 同时运行协程。

    我们修改上面的例子,同时运行两个say_after协程'

    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
    
        task2 = asyncio.create_task(
            say_after(2, 'world'))
    
        print(f"started at {time.strftime('%X')}")
    
        # Wait until both tasks are completed (should take
        # around 2 seconds.)
        await task1
        await task2
    
        print(f"finished at {time.strftime('%X')}")

    请注意,现在的预期输出显示代码段的运行速度比以前快 1 秒:

    started at 17:14:32
    hello
    world
    finished at 17:14:34


待办事项

如果一个对象可以用在 await 表达式中,我们就说它是一个 awaitable 对象。 许多 asyncio API 被设计为接受等待。

awaitable 对象主要有三种类型:coroutinesTasksFutures

协程

Python 协程是 awaitables,因此可以从其他协程等待:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

重要的

在本文档中,术语“协程”可用于两个密切相关的概念:

  • 协程函数async def函数;
  • 协程对象:调用协程函数返回的对象。


asyncio 还支持基于 生成器的 协程。

任务

Tasks用于同时调度协同程序'

当一个协程被包装到一个 Task 中,函数类似于 asyncio.create_task() 时,协程会自动调度为很快运行:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

期货

Future 是一个特殊的 低级 可等待对象,表示异步操作的 最终结果

当 Future 对象是 awaited 时,这意味着协程将等到 Future 在其他地方解析。

需要 asyncio 中的未来对象来允许基于回调的代码与 async/await 一起使用。

通常不需要在应用程序级代码创建Future对象。

可以等待未来对象,有时由库和一些 asyncio API 公开:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

返回 Future 对象的低级函数的一个很好的例子是 loop.run_in_executor()


运行 asyncio 程序

asyncio.run(coro, *, debug=False)

执行 coroutine coro 并返回结果。

该函数运行传递的协程,负责管理 asyncio 事件循环、 完成异步生成器 并关闭线程池。

当另一个异步事件循环在同一线程中运行时,无法调用此函数。

如果 debugTrue,事件循环将在调试模式下运行。

这个函数总是创建一个新的事件循环并在最后关闭它。 它应该用作 asyncio 程序的主要入口点,并且最好只调用一次。

例子:

async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

3.7 版中的新功能。

3.9 版更改: 更新为使用 loop.shutdown_default_executor()

笔记

asyncio.run() 的源代码可以在 :source:`Lib/asyncio/runners.py` 中找到。


创建任务

asyncio.create_task(coro, *, name=None)

coro coroutine 包装成 Task 并安排其执行。 返回任务对象。

如果 name 不是 None,则使用 Task.set_name() 将其设置为任务的名称。

任务在 get_running_loop() 返回的循环中执行,如果当前线程中没有运行循环,则会引发 RuntimeError

这个函数已经在 Python 3.7 中增加了 。 在 Python 3.7 之前,可以使用低级 asyncio.ensure_future() 函数代替:

async def coro():
    ...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...

重要的

保存对该函数结果的引用,以避免任务在执行过程中消失。

3.7 版中的新功能。

3.8 版变更: 增加了 name 参数。


睡眠

同时运行任务

避免取消

超时

等待原语

asyncio.as_completed(aws, *, loop=None, timeout=None)

aws 可迭代对象中同时运行 awaitable objects。 返回协同程序的迭代器。 可以等待返回的每个协程,以从剩余的可等待对象的可迭代对象中获取最早的下一个结果。

如果超时发生在所有期货完成之前,则引发 asyncio.TimeoutError

例子:

for coro in as_completed(aws):
    earliest_result = await coro
    # ...


在线程中运行

从其他线程调度

asyncio.run_coroutine_threadsafe(coro, loop)

将协程提交给给定的事件循环。 线程安全。

返回 concurrent.futures.Future 以等待来自另一个 OS 线程的结果。

此函数旨在从与运行事件循环的操作系统线程不同的操作系统线程调用。 例子:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

如果协程中出现异常,将通知返回的 Future。 它也可以用于取消事件循环中的任务:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

请参阅文档的 并发和多线程 部分。

与其他 asyncio 函数不同,此函数需要显式传递 loop 参数。

3.5.1 版中的新功能。


内省

asyncio.current_task(loop=None)

返回当前运行的 Task 实例,如果没有正在运行的任务,则返回 None

如果 loopNone get_running_loop() 用于获取当前循环。

3.7 版中的新功能。

asyncio.all_tasks(loop=None)

返回循环运行的一组尚未完成的 Task 对象。

如果 loopNone,则 get_running_loop() 用于获取当前回路。

3.7 版中的新功能。


任务对象

class asyncio.Task(coro, *, loop=None, name=None)

一个运行 Python 协程Future-like 对象。 不是线程安全的。

任务用于在事件循环中运行协程。 如果协程在等待 Future,则 Task 会暂停协程的执行并等待 Future 的完成。 当 Future 为 done 时,包装的协程恢复执行。

事件循环使用协作调度:一个事件循环一次运行一个任务。 当 Task 等待 Future 完成时,事件循环会运行其他 Task、回调或执行 IO 操作。

使用高级 asyncio.create_task() 函数创建任务,或使用低级 loop.create_task()ensure_future() 函数。 不鼓励手动实例化任务。

要取消正在运行的任务,请使用 cancel() 方法。 调用它会导致任务将 CancelledError 异常抛出到包装的协程中。 如果协程在取消期间等待 Future 对象,则 Future 对象将被取消。

cancelled() 可用于检查任务是否被取消。 如果包装的协程没有抑制 CancelledError 异常并且实际上被取消,则该方法返回 True

asyncio.Task 继承了 Future 的所有 API,除了 Future.set_result()Future.set_exception()

任务支持 contextvars 模块。 创建 Task 时,它会复制当前上下文,然后在复制的上下文中运行其协程。

3.7 版更改: 添加了对 contextvars 模块的支持。

3.8 版变更: 增加了 name 参数。

cancel(msg=None)

请求取消任务。

这安排在事件循环的下一个循环中将 CancelledError 异常抛出到包装的协程中。

然后协程有机会通过使用 try … … except CancelledErrorfinally 块抑制异常来清理甚至拒绝请求。 因此,与 Future.cancel() 不同,Task.cancel() 并不能保证 Task 会被取消,尽管完全抑制取消并不常见并且被积极劝阻。

3.9 版本变化: 增加了 msg 参数。

以下示例说明了协程如何拦截取消请求:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled()

如果任务被取消,则返回True

当使用 cancel() 请求取消并且包装的协程传播了抛出的 CancelledError 异常时,任务被 canceled

done()

如果任务 完成 ,则返回 True

当包装的协程返回值、引发异常或任务被取消时,任务是 done

result()

返回任务的结果。

如果 Task 是 done,则返回包装协程的结果(或者如果协程引发异常,则重新引发该异常。)

如果任务已被 取消 ,则此方法会引发 CancelledError 异常。

如果任务的结果尚不可用,此方法将引发 InvalidStateError 异常。

exception()

返回任务的异常。

如果包装的协程引发异常,则返回异常。 如果包装的协程正常返回,则此方法返回 None

如果任务已被 取消 ,则此方法会引发 CancelledError 异常。

如果 Task 还没有 done,这个方法会引发一个 InvalidStateError 异常。

add_done_callback(callback, *, context=None)

添加要在任务 完成 时运行的回调。

此方法只应用于基于回调的低级代码。

有关更多详细信息,请参阅 Future.add_done_callback() 的文档。

remove_done_callback(callback)

从回调列表中删除 callback

此方法只应用于基于回调的低级代码。

有关更多详细信息,请参阅 Future.remove_done_callback() 的文档。

get_stack(*, limit=None)

返回此任务的堆栈帧列表。

如果包装的协程没有完成,这将返回它挂起的堆栈。 如果协程已成功完成或被取消,则返回一个空列表。 如果协程因异常终止,则返回回溯帧列表。

帧总是从最旧到最新排序。

挂起的协程只返回一个堆栈帧。

可选的 limit 参数设置要返回的最大帧数; 默认情况下返回所有可用的帧。 返回列表的顺序取决于是返回堆栈还是回溯:返回堆栈的最新帧,但返回最旧的回溯帧。 (这与回溯模块的行为相匹配。)

print_stack(*, limit=None, file=None)

打印此任务的堆栈或回溯。

这会为 get_stack() 检索到的帧生成类似于回溯模块的输出。

limit 参数直接传递给 get_stack()

file 参数是输出写入的 I/O 流; 默认输出写入 sys.stderr

get_coro()

返回 Task 包裹的协程对象。

3.8 版中的新功能。

get_name()

返回任务的名称。

如果没有为 Task 显式分配名称,则默认的 asyncio Task 实现会在实例化期间生成一个默认名称。

3.8 版中的新功能。

set_name(value)

设置任务的名称。

value 参数可以是任何对象,然后将其转换为字符串。

在默认任务实现中,名称将在任务对象的 repr() 输出中可见。

3.8 版中的新功能。


基于生成器的协程

笔记

对基于生成器的协程的支持已被 弃用 ,并计划在 Python 3.10 中移除。


基于生成器的协程早于 async/await 语法。 它们是 Python 生成器,使用 yield from 表达式来等待 Futures 和其他协程。

基于生成器的协程应该用 @asyncio.coroutine 修饰,尽管这不是强制执行的。

@asyncio.coroutine

标记基于生成器的协程的装饰器。

这个装饰器使传统的基于生成器的协程与 async/await 代码兼容:

@asyncio.coroutine
def old_style_coroutine():
    yield from asyncio.sleep(1)

async def main():
    await old_style_coroutine()

此装饰器不应用于 async def 协程。

asyncio.iscoroutine(obj)

如果 obj协程对象 ,则返回 True

此方法与 inspect.iscoroutine() 不同,因为它为基于生成器的协程返回 True

asyncio.iscoroutinefunction(func)

如果 func协程函数 ,则返回 True

此方法与 inspect.iscoroutinefunction() 不同,因为它为使用 @coroutine 修饰的基于生成器的协程函数返回 True