18.5.3. 任务和协程 — Python 文档

来自菜鸟教程
Python/docs/3.6/library/asyncio-task
跳转至:导航、​搜索

18.5.3. 任务和协程

源代码: :source:`Lib/asyncio/tasks.py`

源代码: :source:`Lib/asyncio/coroutines.py`

18.5.3.1. 协程

asyncio 一起使用的协程可以使用 async def 语句或使用 generators 来实现。 async def 类型的协程是在 Python 3.5 中添加的,如果不需要支持旧的 Python 版本,建议使用。

基于生成器的协程应该用 @asyncio.coroutine 修饰,尽管这并没有严格执行。 装饰器能够与 async def 协程兼容,也可用作文档。 基于生成器的协程使用 PEP 380 中引入的 yield from 语法,而不是原始的 yield 语法。

“协程”这个词,就像“生成器”这个词一样,用于两个不同(虽然相关)的概念:

  • 定义协程的函数(使用 async def 或使用 @asyncio.coroutine 修饰的函数定义)。 如果需要消除歧义,我们将称其为 协程函数iscoroutinefunction() 返回 True)。
  • 通过调用协程函数获得的对象。 此对象表示最终将完成的计算或 I/O 操作(通常是组合)。 如果需要消除歧义,我们将称其为 协程对象iscoroutine() 返回 True)。

协程可以做的事情:

  • result = await futureresult = yield from future – 暂停协程直到未来完成,然后返回未来的结果,或引发异常,将被传播。 (如果未来被取消,它会引发一个 CancelledError 异常。)请注意,任务是未来,所有关于期货的说法也适用于任务。
  • result = await coroutineresult = yield from coroutine – 等待另一个协程产生结果(或引发异常,该异常将被传播)。 coroutine 表达式必须是对另一个协程的 call
  • return expression – 使用 awaityield from 向正在等待该协程的协程生成结果。
  • raise exception - 在协程中使用 awaityield from 在等待这个异常的协程中引发异常。

调用协程并不会启动其代码运行——调用返回的协程对象在您安排其执行之前不会执行任何操作。 有两种基本方法可以启动它运行:从另一个协程调用 await coroutineyield from coroutine(假设另一个协程已经在运行!),或者使用 ensure_future() 安排它的执行 函数或 AbstractEventLoop.create_task() 方法。

协程(和任务)只能在事件循环运行时运行。

@asyncio.coroutine

用于标记基于生成器的协程的装饰器。 这使生成器可以使用 yield from 调用 async def 协程,并且还使生成器能够被 async def 协程调用,例如使用 await 表达。

不需要自己修饰 async def 协程。

如果生成器在销毁之前没有产生,则会记录错误消息。 参见 检测从未调度的协程

笔记

在本文档中,一些方法被记录为协程,即使它们是返回 Future 的普通 Python 函数。 这是有意在未来调整这些功能的实现的自由。 如果需要在回调样式代码中使用这样的函数,请将其结果用 ensure_future() 包装起来。


18.5.3.1.1。 示例:Hello World 协程

协程显示 "Hello World" 示例:

import asyncio

async def hello_world():
    print("Hello World!")

loop = asyncio.get_event_loop()
# Blocking call which returns when the hello_world() coroutine is done
loop.run_until_complete(hello_world())
loop.close()

也可以看看

Hello World with call_soon() 示例使用 AbstractEventLoop.call_soon() 方法来安排回调。


18.5.3.1.2. 示例:协程显示当前日期

使用 sleep() 函数在 5 秒内每秒显示当前日期的协程示例:

import asyncio
import datetime

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

也可以看看

显示当前日期与 call_later() 示例使用回调与 AbstractEventLoop.call_later() 方法。


18.5.3.1.3。 示例:链式协程

示例链协程:

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

compute() 链接到 print_sum()print_sum() 协程等待直到 compute() 完成,然后返回其结果。

例子的时序图:

class=align-center|../_images/tulip_coro.png “任务”由 AbstractEventLoop.run_until_complete() 方法在它获取协程对象而不是任务时创建。

该图显示了控制流,它没有准确描述内部的工作方式。 例如,睡眠协程创建一个内部未来,它使用 AbstractEventLoop.call_later() 在 1 秒内唤醒任务。


18.5.3.2. 无效状态错误

exception asyncio.InvalidStateError
该状态下不允许操作。


18.5.3.3. 超时错误

exception asyncio.TimeoutError
操作超出了给定的最后期限。

笔记

这个异常不同于内置的 TimeoutError 异常!


18.5.3.4. 未来

class asyncio.Future(\*, loop=None)

此类 几乎concurrent.futures.Future 兼容。

区别:

此类是 不是线程安全的

cancel()

取消未来并安排回调。

如果未来已经完成或取消,返回False。 否则,将未来的状态更改为取消,安排回调并返回 True

cancelled()

如果未来被取消,则返回 True

done()

如果未来完成,则返回 True

完成意味着结果/异常可用,或者未来被取消。

result()

返回这个未来代表的结果。

如果未来已被取消,则引发 CancelledError。 如果未来的结果尚不可用,则引发 InvalidStateError。 如果未来完成并设置了异常,则会引发此异常。

exception()

返回在此未来设置的异常。

只有在未来完成时才返回异常(或 None 如果没有设置异常)。 如果未来已被取消,则引发 CancelledError。 如果未来尚未完成,则引发 InvalidStateError

add_done_callback(fn)

添加要在未来完成时运行的回调。

使用单个参数调用回调 - 未来对象。 如果调用此函数时未来已经完成,则使用 call_soon() 安排回调。

使用functools.partial给回调传递参数。 例如,fut.add_done_callback(functools.partial(print, "Future:", flush=True)) 将调用 print("Future:", fut, flush=True)

remove_done_callback(fn)

从“完成后调用”列表中删除回调的所有实例。

返回移除的回调数。

set_result(result)

标记未来完成并设置其结果。

如果调用此方法时未来已经完成,则引发 InvalidStateError

set_exception(exception)

标记未来完成并设置例外。

如果调用此方法时未来已经完成,则引发 InvalidStateError

18.5.3.4.1。 示例:带有 run_until_complete() 的 Future

结合 Future协程函数 的示例:

import asyncio

async def slow_operation(future):
    await asyncio.sleep(1)
    future.set_result('Future is done!')

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
loop.run_until_complete(future)
print(future.result())
loop.close()

协程函数负责计算(需要 1 秒)并将结果存储到未来。 run_until_complete() 方法等待未来的完成。

笔记

run_until_complete() 方法在内部使用 add_done_callback() 方法在未来完成时得到通知。


18.5.3.4.2。 示例:带有 run_forever() 的 Future

可以使用 Future.add_done_callback() 方法以不同方式编写前面的示例以明确描述控制流:

import asyncio

async def slow_operation(future):
    await asyncio.sleep(1)
    future.set_result('Future is done!')

def got_result(future):
    print(future.result())
    loop.stop()

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result)
try:
    loop.run_forever()
finally:
    loop.close()

在这个例子中,future 用于将 slow_operation() 链接到 got_result():当 slow_operation() 完成时,got_result() 被调用并带有结果。


18.5.3.5. 任务

class asyncio.Task(coro, \*, loop=None)

安排 协程 的执行:将其包装在未来。 任务是 Future 的子类。

任务负责在事件循环中执行协程对象。 如果包装的协程从未来产生,则任务挂起包装的协程的执行并等待未来的完成。 当 future 完成后,被包装的协程的执行会以结果或 future 的异常重新开始。

事件循环使用协作调度:一个事件循环一次只运行一个任务。 如果其他事件循环在不同线程中运行,则其他任务可能会并行运行。 当一个任务等待未来完成时,事件循环执行一个新任务。

取消任务与取消未来不同。 调用 cancel() 将向包装的协程抛出 CancelledErrorcancelled() 仅在包装的协程未捕获 CancelledError 异常或引发 CancelledError 异常时才返回 True

如果一个挂起的任务被销毁,其包装的 协程 的执行没有完成。 这可能是一个错误并记录了警告:请参阅 挂起的任务已销毁

不要直接创建 Task 实例:使用 ensure_future() 函数或 AbstractEventLoop.create_task() 方法。

此类是 不是线程安全的

classmethod all_tasks(loop=None)

为事件循环返回一组所有任务。

默认情况下,返回当前事件循环的所有任务。

classmethod current_task(loop=None)

在事件循环或 None 中返回当前正在运行的任务。

默认情况下,返回当前事件循环的当前任务。

None 在不在 Task 的上下文中调用时返回。

cancel()

请求此任务自行取消。

这安排了 CancelledError 在事件循环的下一个循环中被抛出到包装的协程中。 然后协程有机会使用 try/except/finally 清理甚至拒绝请求。

Future.cancel() 不同,这并不能保证任务会被取消:异常可能会被捕获并采取行动,延迟取消任务或完全阻止取消。 任务也可能返回一个值或引发不同的异常。

调用此方法后, cancelled() 不会立即返回 True(除非任务已被取消)。 当包装的协程以 CancelledError 异常终止时,任务将被标记为已取消(即使 cancel() 未被调用)。

get_stack(\*, limit=None)

返回此任务的协程的堆栈帧列表。

如果协程没有完成,这将返回它挂起的堆栈。 如果协程已成功完成或被取消,则返回一个空列表。 如果协程因异常终止,则返回回溯帧列表。

帧总是从最旧到最新排序。

可选的限制给出了要返回的最大帧数; 默认情况下返回所有可用的帧。 它的含义取决于是返回堆栈还是回溯:返回堆栈的最新帧,但返回最旧的回溯帧。 (这与回溯模块的行为相匹配。)

由于我们无法控制的原因,挂起的协程只返回一个堆栈帧。

print_stack(\*, limit=None, file=None)

打印此任务的协程的堆栈或回溯。

对于 get_stack() 检索到的帧,这会产生类似于回溯模块的输出。 limit 参数传递给 get_stack()。 文件参数是输出写入的 I/O 流; 默认输出写入 sys.stderr。

18.5.3.5.1。 示例:并行执行任务

并行执行 3 个任务(A、B、C)的示例:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        await asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
    factorial("A", 2),
    factorial("B", 3),
    factorial("C", 4),
))
loop.close()

输出:

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24

任务在创建时会自动安排执行。 当所有任务完成时,事件循环停止。


18.5.3.6. 任务功能

笔记

在下面的函数中,可选的 loop 参数允许显式设置底层任务或协程使用的事件循环对象。 如果未提供,则使用默认事件循环。


asyncio.as_completed(fs, \*, loop=None, timeout=None)

返回一个迭代器,其值在等待时为 Future 实例。

如果超时发生在所有期货完成之前,则引发 asyncio.TimeoutError

例子:

for f in as_completed(fs):
    result = yield from f  # The 'yield from' may raise
    # Use result

笔记

期货f不一定是fs的成员。

asyncio.ensure_future(coro_or_future, \*, loop=None)

安排 协程对象 的执行:将其包装在未来。 返回一个 Task 对象。

如果参数是Future,则直接返回。

版本 3.4.4 中的新功能。

在 3.5.1 版更改: 该函数接受任何 awaitable 对象。

也可以看看

AbstractEventLoop.create_task() 方法。

asyncio.async(coro_or_future, \*, loop=None)

ensure_future() 的已弃用别名。

自 3.4.4 版起已弃用。

asyncio.wrap_future(future, \*, loop=None)
concurrent.futures.Future 对象包裹在 Future 对象中。
asyncio.gather(\*coros_or_futures, loop=None, return_exceptions=False)

从给定的协程对象或期货返回未来聚合结果。

所有期货必须共享相同的事件循环。 如果所有任务都成功完成,返回的future的结果就是结果列表(按照原来的顺序,不一定是结果到达的先后顺序)。 如果 return_exceptions 为真,则任务中的异常与成功结果相同,并收集到结果列表中; 否则,第一个引发的异常将立即传播到返回的未来。

取消:如果外部 Future 被取消,则所有子级(尚未完成)也将被取消。 如果任何孩子被取消,这将被视为它引发了 CancelledError - 在这种情况下,外部 Future 是 不是 取消。 (这是为了防止取消一个孩子导致其他孩子被取消。)

在 3.6.6 版更改: 如果 gather 本身被取消,则无论 return_exceptions如何,都会传播取消。

asyncio.iscoroutine(obj)
如果 obj协程对象 ,则返回 True,它可能基于生成器或 async def 协程。
asyncio.iscoroutinefunction(func)
如果确定 func协程函数 ,则返回 True,该函数可能是修饰的生成器函数或 async def 函数。
asyncio.run_coroutine_threadsafe(coro, loop)

协程对象 提交给给定的事件循环。

返回 concurrent.futures.Future 以访问结果。

此函数旨在从与运行事件循环的线程不同的线程调用。 用法:

# Create a coroutine
coro = asyncio.sleep(1, result=3)
# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)
# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

如果在协程中引发异常,将通知返回的未来。 它也可以用于取消事件循环中的任务:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print('The coroutine raised an exception: {!r}'.format(exc))
else:
    print('The coroutine returned: {!r}'.format(result))

请参阅文档的 并发和多线程 部分。

笔记

与模块中的其他函数不同,run_coroutine_threadsafe() 需要显式传递 loop 参数。

3.5.1 版中的新功能。