18.5.3. 任务和协程 — Python 文档
18.5.3. 任务和协程
源代码: :source:`Lib/asyncio/tasks.py`
源代码: :source:`Lib/asyncio/coroutines.py`
18.5.3.1. 协程
与 asyncio 一起使用的协程可以使用 async def 语句或使用 generators 来实现。 async def 类型的协程是在 Python 3.5 中添加的,如果不需要支持旧的 Python 版本,建议使用。
基于生成器的协程应该用 @asyncio.coroutine 修饰,尽管这并没有严格执行。 装饰器能够与 async def 协程兼容,也可用作文档。 基于生成器的协程使用 PEP 380 中引入的 yield from
语法,而不是原始的 yield
语法。
“协程”这个词,就像“生成器”这个词一样,用于两个不同(虽然相关)的概念:
- 定义协程的函数(使用 async def 或使用
@asyncio.coroutine
修饰的函数定义)。 如果需要消除歧义,我们将称其为 协程函数 (iscoroutinefunction() 返回True
)。 - 通过调用协程函数获得的对象。 此对象表示最终将完成的计算或 I/O 操作(通常是组合)。 如果需要消除歧义,我们将称其为 协程对象 (iscoroutine() 返回
True
)。
协程可以做的事情:
result = await future
或result = yield from future
– 暂停协程直到未来完成,然后返回未来的结果,或引发异常,将被传播。 (如果未来被取消,它会引发一个CancelledError
异常。)请注意,任务是未来,所有关于期货的说法也适用于任务。result = await coroutine
或result = yield from coroutine
– 等待另一个协程产生结果(或引发异常,该异常将被传播)。coroutine
表达式必须是对另一个协程的 call。return expression
– 使用 await 或yield from
向正在等待该协程的协程生成结果。raise exception
- 在协程中使用 await 或yield from
在等待这个异常的协程中引发异常。
调用协程并不会启动其代码运行——调用返回的协程对象在您安排其执行之前不会执行任何操作。 有两种基本方法可以启动它运行:从另一个协程调用 await coroutine
或 yield from coroutine
(假设另一个协程已经在运行!),或者使用 ensure_future() 安排它的执行 函数或 AbstractEventLoop.create_task() 方法。
协程(和任务)只能在事件循环运行时运行。
- @asyncio.coroutine
用于标记基于生成器的协程的装饰器。 这使生成器可以使用
yield from
调用 async def 协程,并且还使生成器能够被 async def 协程调用,例如使用 await 表达。不需要自己修饰 async def 协程。
如果生成器在销毁之前没有产生,则会记录错误消息。 参见 检测从未调度的协程 。
笔记
在本文档中,一些方法被记录为协程,即使它们是返回 Future 的普通 Python 函数。 这是有意在未来调整这些功能的实现的自由。 如果需要在回调样式代码中使用这样的函数,请将其结果用 ensure_future() 包装起来。
18.5.3.1.1。 示例:Hello World 协程
协程显示 "Hello World"
示例:
import asyncio
async def hello_world():
print("Hello World!")
loop = asyncio.get_event_loop()
# Blocking call which returns when the hello_world() coroutine is done
loop.run_until_complete(hello_world())
loop.close()
18.5.3.1.2. 示例:协程显示当前日期
使用 sleep()
函数在 5 秒内每秒显示当前日期的协程示例:
import asyncio
import datetime
async def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
18.5.3.1.3。 示例:链式协程
示例链协程:
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
compute()
链接到 print_sum()
:print_sum()
协程等待直到 compute()
完成,然后返回其结果。
例子的时序图:
class=align-center|../_images/tulip_coro.png “任务”由 AbstractEventLoop.run_until_complete() 方法在它获取协程对象而不是任务时创建。
该图显示了控制流,它没有准确描述内部的工作方式。 例如,睡眠协程创建一个内部未来,它使用 AbstractEventLoop.call_later() 在 1 秒内唤醒任务。
18.5.3.2. 无效状态错误
- exception asyncio.InvalidStateError
- 该状态下不允许操作。
18.5.3.4. 未来
- class asyncio.Future(\*, loop=None)
此类 几乎 与 concurrent.futures.Future 兼容。
区别:
result() 和 exception() 不接受超时参数并在未来尚未完成时引发异常。
使用 add_done_callback() 注册的回调总是通过事件循环的 call_soon() 调用。
此类与 concurrent.futures 包中的 wait() 和 as_completed() 函数不兼容。
此类是 不是线程安全的 。
- cancel()
取消未来并安排回调。
如果未来已经完成或取消,返回
False
。 否则,将未来的状态更改为取消,安排回调并返回True
。
- cancelled()
如果未来被取消,则返回
True
。
- done()
如果未来完成,则返回
True
。完成意味着结果/异常可用,或者未来被取消。
- result()
返回这个未来代表的结果。
如果未来已被取消,则引发
CancelledError
。 如果未来的结果尚不可用,则引发 InvalidStateError。 如果未来完成并设置了异常,则会引发此异常。
- exception()
返回在此未来设置的异常。
只有在未来完成时才返回异常(或
None
如果没有设置异常)。 如果未来已被取消,则引发CancelledError
。 如果未来尚未完成,则引发 InvalidStateError。
- add_done_callback(fn)
添加要在未来完成时运行的回调。
使用单个参数调用回调 - 未来对象。 如果调用此函数时未来已经完成,则使用 call_soon() 安排回调。
使用functools.partial给回调传递参数。 例如,
fut.add_done_callback(functools.partial(print, "Future:", flush=True))
将调用print("Future:", fut, flush=True)
。
- remove_done_callback(fn)
从“完成后调用”列表中删除回调的所有实例。
返回移除的回调数。
- set_result(result)
标记未来完成并设置其结果。
如果调用此方法时未来已经完成,则引发 InvalidStateError。
- set_exception(exception)
标记未来完成并设置例外。
如果调用此方法时未来已经完成,则引发 InvalidStateError。
18.5.3.4.1。 示例:带有 run_until_complete() 的 Future
import asyncio
async def slow_operation(future):
await asyncio.sleep(1)
future.set_result('Future is done!')
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
loop.run_until_complete(future)
print(future.result())
loop.close()
协程函数负责计算(需要 1 秒)并将结果存储到未来。 run_until_complete() 方法等待未来的完成。
18.5.3.4.2。 示例:带有 run_forever() 的 Future
可以使用 Future.add_done_callback() 方法以不同方式编写前面的示例以明确描述控制流:
import asyncio
async def slow_operation(future):
await asyncio.sleep(1)
future.set_result('Future is done!')
def got_result(future):
print(future.result())
loop.stop()
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result)
try:
loop.run_forever()
finally:
loop.close()
在这个例子中,future 用于将 slow_operation()
链接到 got_result()
:当 slow_operation()
完成时,got_result()
被调用并带有结果。
18.5.3.5. 任务
- class asyncio.Task(coro, \*, loop=None)
安排 协程 的执行:将其包装在未来。 任务是 Future 的子类。
任务负责在事件循环中执行协程对象。 如果包装的协程从未来产生,则任务挂起包装的协程的执行并等待未来的完成。 当 future 完成后,被包装的协程的执行会以结果或 future 的异常重新开始。
事件循环使用协作调度:一个事件循环一次只运行一个任务。 如果其他事件循环在不同线程中运行,则其他任务可能会并行运行。 当一个任务等待未来完成时,事件循环执行一个新任务。
取消任务与取消未来不同。 调用 cancel() 将向包装的协程抛出 CancelledError。 cancelled() 仅在包装的协程未捕获 CancelledError 异常或引发 CancelledError 异常时才返回
True
。如果一个挂起的任务被销毁,其包装的 协程 的执行没有完成。 这可能是一个错误并记录了警告:请参阅 挂起的任务已销毁 。
不要直接创建 Task 实例:使用 ensure_future() 函数或 AbstractEventLoop.create_task() 方法。
此类是 不是线程安全的 。
- classmethod all_tasks(loop=None)
为事件循环返回一组所有任务。
默认情况下,返回当前事件循环的所有任务。
- classmethod current_task(loop=None)
在事件循环或
None
中返回当前正在运行的任务。默认情况下,返回当前事件循环的当前任务。
None
在不在 Task 的上下文中调用时返回。
- cancel()
请求此任务自行取消。
这安排了 CancelledError 在事件循环的下一个循环中被抛出到包装的协程中。 然后协程有机会使用 try/except/finally 清理甚至拒绝请求。
与 Future.cancel() 不同,这并不能保证任务会被取消:异常可能会被捕获并采取行动,延迟取消任务或完全阻止取消。 任务也可能返回一个值或引发不同的异常。
调用此方法后, cancelled() 不会立即返回
True
(除非任务已被取消)。 当包装的协程以 CancelledError 异常终止时,任务将被标记为已取消(即使 cancel() 未被调用)。
- get_stack(\*, limit=None)
返回此任务的协程的堆栈帧列表。
如果协程没有完成,这将返回它挂起的堆栈。 如果协程已成功完成或被取消,则返回一个空列表。 如果协程因异常终止,则返回回溯帧列表。
帧总是从最旧到最新排序。
可选的限制给出了要返回的最大帧数; 默认情况下返回所有可用的帧。 它的含义取决于是返回堆栈还是回溯:返回堆栈的最新帧,但返回最旧的回溯帧。 (这与回溯模块的行为相匹配。)
由于我们无法控制的原因,挂起的协程只返回一个堆栈帧。
- print_stack(\*, limit=None, file=None)
打印此任务的协程的堆栈或回溯。
对于 get_stack() 检索到的帧,这会产生类似于回溯模块的输出。 limit 参数传递给 get_stack()。 文件参数是输出写入的 I/O 流; 默认输出写入 sys.stderr。
18.5.3.5.1。 示例:并行执行任务
并行执行 3 个任务(A、B、C)的示例:
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number+1):
print("Task %s: Compute factorial(%s)..." % (name, i))
await asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
))
loop.close()
输出:
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24
任务在创建时会自动安排执行。 当所有任务完成时,事件循环停止。
18.5.3.6. 任务功能
笔记
在下面的函数中,可选的 loop 参数允许显式设置底层任务或协程使用的事件循环对象。 如果未提供,则使用默认事件循环。
- asyncio.as_completed(fs, \*, loop=None, timeout=None)
返回一个迭代器,其值在等待时为 Future 实例。
如果超时发生在所有期货完成之前,则引发 asyncio.TimeoutError。
例子:
for f in as_completed(fs): result = yield from f # The 'yield from' may raise # Use result
笔记
期货
f
不一定是fs的成员。
- asyncio.ensure_future(coro_or_future, \*, loop=None)
安排 协程对象 的执行:将其包装在未来。 返回一个 Task 对象。
如果参数是Future,则直接返回。
版本 3.4.4 中的新功能。
在 3.5.1 版更改: 该函数接受任何 awaitable 对象。
也可以看看
- asyncio.async(coro_or_future, \*, loop=None)
ensure_future() 的已弃用别名。
自 3.4.4 版起已弃用。
- asyncio.wrap_future(future, \*, loop=None)
- 将 concurrent.futures.Future 对象包裹在 Future 对象中。
- asyncio.gather(\*coros_or_futures, loop=None, return_exceptions=False)
从给定的协程对象或期货返回未来聚合结果。
所有期货必须共享相同的事件循环。 如果所有任务都成功完成,返回的future的结果就是结果列表(按照原来的顺序,不一定是结果到达的先后顺序)。 如果 return_exceptions 为真,则任务中的异常与成功结果相同,并收集到结果列表中; 否则,第一个引发的异常将立即传播到返回的未来。
取消:如果外部 Future 被取消,则所有子级(尚未完成)也将被取消。 如果任何孩子被取消,这将被视为它引发了 CancelledError - 在这种情况下,外部 Future 是 不是 取消。 (这是为了防止取消一个孩子导致其他孩子被取消。)
在 3.6.6 版更改: 如果 gather 本身被取消,则无论 return_exceptions如何,都会传播取消。
- asyncio.run_coroutine_threadsafe(coro, loop)
将 协程对象 提交给给定的事件循环。
返回 concurrent.futures.Future 以访问结果。
此函数旨在从与运行事件循环的线程不同的线程调用。 用法:
# Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout) == 3
如果在协程中引发异常,将通知返回的未来。 它也可以用于取消事件循环中的任务:
try: result = future.result(timeout) except asyncio.TimeoutError: print('The coroutine took too long, cancelling the task...') future.cancel() except Exception as exc: print('The coroutine raised an exception: {!r}'.format(exc)) else: print('The coroutine returned: {!r}'.format(result))
请参阅文档的 并发和多线程 部分。
笔记
与模块中的其他函数不同,run_coroutine_threadsafe() 需要显式传递 loop 参数。
3.5.1 版中的新功能。