事件循环 — Python 文档
事件循环
前言
事件循环是每个 asyncio 应用程序的核心。 事件循环运行异步任务和回调,执行网络 IO 操作,并运行子进程。
应用程序开发人员通常应该使用高级 asyncio 函数,例如 asyncio.run(),并且很少需要引用循环对象或调用其方法。 本节主要面向需要更好地控制事件循环行为的低级代码、库和框架的作者。
获取事件循环
以下低级函数可用于获取、设置或创建事件循环:
- asyncio.get_running_loop()
返回当前操作系统线程中正在运行的事件循环。
如果没有正在运行的事件循环,则会引发 RuntimeError。 此函数只能从协程或回调中调用。
3.7 版中的新功能。
- asyncio.get_event_loop()
获取当前事件循环。
如果当前操作系统线程中没有设置当前事件循环,操作系统线程是主线程,并且 set_event_loop() 尚未被调用,asyncio 将创建一个新的事件循环并将其设置为当前的.
由于该函数的行为相当复杂(尤其是在使用自定义事件循环策略时),因此在协程和回调中,使用 get_running_loop() 函数优于 get_event_loop() 函数。
还可以考虑使用 asyncio.run() 函数而不是使用较低级别的函数来手动创建和关闭事件循环。
- asyncio.set_event_loop(loop)
- 将 loop 设置为当前 OS 线程的当前事件循环。
- asyncio.new_event_loop()
- 创建一个新的事件循环对象。
请注意,get_event_loop()、set_event_loop() 和 new_event_loop() 函数的行为可以通过 设置自定义事件循环策略[ X177X]。
内容
此文档页面包含以下部分:
- Event Loop Methods 部分是事件循环 API 的参考文档;
- Callback Handles 部分记录了从 loop.call_soon() 和 等调度方法返回的 Handle 和 TimerHandle 实例]loop.call_later();
- Server Objects 部分记录了从事件循环方法返回的类型,如
loop.create_server()
; - Event Loop Implementations 部分记录了 SelectorEventLoop 和 ProactorEventLoop 类;
- Examples 部分展示了如何使用一些事件循环 API。
事件循环方法
事件循环具有用于以下内容的 低级 API:
- 运行和停止循环
- 调度回调
- 调度延迟回调
- 创建期货和任务
- 打开网络连接
- 创建网络服务器
- 传输文件
- TLS 升级
- 查看文件描述符
- 直接使用套接字对象
- 域名系统
- 使用管道
- Unix 信号
- 在线程或进程池中执行代码
- 错误处理 API
- 启用调试模式
- 运行子进程
运行和停止循环
- loop.run_until_complete(future)
运行直到 future(Future 的一个实例)完成。
如果参数是 协程对象 ,则它被隐式安排为作为 asyncio.Task 运行。
返回 Future 的结果或引发其异常。
- loop.run_forever()
运行事件循环直到 stop() 被调用。
如果在调用 run_forever() 之前调用了 stop(),则循环将轮询一次 I/O 选择器,超时为零,运行所有调度以响应 I/ O 事件(以及那些已经安排好的事件),然后退出。
如果在 run_forever() 运行时调用 stop(),则循环将运行当前批次的回调,然后退出。 请注意,在这种情况下,回调安排的新回调将不会运行; 相反,它们将在下次调用 run_forever() 或 run_until_complete() 时运行。
- loop.stop()
- 停止事件循环。
- loop.is_running()
- 如果事件循环当前正在运行,则返回
True
。
- loop.is_closed()
- 如果事件循环已关闭,则返回
True
。
- loop.close()
关闭事件循环。
调用此函数时,循环不能运行。 任何挂起的回调都将被丢弃。
此方法清除所有队列并关闭执行程序,但不等待执行程序完成。
这种方法是幂等的且不可逆的。 事件循环关闭后不应调用其他方法。
调度回调
- loop.call_soon(callback, *args, context=None)
安排 callback 在事件循环的下一次迭代中使用 args 参数调用。
回调按注册的顺序调用。 每个回调只会被调用一次。
一个可选的仅关键字 context 参数允许为 callback 指定自定义 contextvars.Context 以运行。 当没有提供 context 时使用当前上下文。
返回一个 asyncio.Handle 的实例,稍后可以使用它来取消回调。
此方法不是线程安全的。
- loop.call_soon_threadsafe(callback, *args, context=None)
call_soon() 的线程安全变体。 必须用于从另一个线程 调度回调 。
请参阅文档的 并发和多线程 部分。
3.7 版更改: 添加了 context 仅关键字参数。 有关更多详细信息,请参阅 PEP 567。
笔记
大多数 asyncio 调度函数不允许传递关键字参数。 为此,请使用 functools.partial():
# will schedule "print("Hello", flush=True)"
loop.call_soon(
functools.partial(print, "Hello", flush=True))
使用部分对象通常比使用 lambda 更方便,因为 asyncio 可以在调试和错误消息中更好地呈现部分对象。
调度延迟回调
事件循环提供了安排回调函数在未来某个时间点调用的机制。 事件循环使用单调时钟来跟踪时间。
- loop.call_later(delay, callback, *args, context=None)
安排 callback 在给定的 delay 秒数(可以是 int 或 float)后调用。
返回 asyncio.TimerHandle 的实例,可用于取消回调。
callback 只会被调用一次。 如果两个回调被安排在完全相同的时间,则调用它们的顺序是不确定的。
可选的位置 args 将在调用时传递给回调。 如果您希望使用关键字参数调用回调,请使用 functools.partial()。
一个可选的仅关键字 context 参数允许为 callback 指定自定义 contextvars.Context 以运行。 当没有提供 context 时使用当前上下文。
3.7 版更改: 添加了 context 仅关键字参数。 有关更多详细信息,请参阅 PEP 567。
3.7.1 版更改: 在 Python 3.7.0 及更早版本中,使用默认事件循环实现, 延迟 不能超过 1 天。 这已在 Python 3.7.1 中修复。
- loop.call_at(when, callback, *args, context=None)
安排 callback 在给定的绝对时间戳 when(整数或浮点数)上调用,使用与 loop.time() 相同的时间参考。
此方法的行为与 call_later() 相同。
返回 asyncio.TimerHandle 的实例,可用于取消回调。
3.7 版更改: 添加了 context 仅关键字参数。 有关更多详细信息,请参阅 PEP 567。
3.7.1 版本更改: 在 Python 3.7.0 及更早版本中,默认事件循环实现,when 与当前时间之间的差异不能超过一天。 这已在 Python 3.7.1 中修复。
- loop.time()
- 根据事件循环的内部单调时钟返回当前时间,作为 float 值。
笔记
3.8 版更改: 在 Python 3.7 及更早版本中,超时(相对 延迟 或绝对 when )不应超过 1 天。 这已在 Python 3.8 中修复。
也可以看看
asyncio.sleep()
功能。
创建期货和任务
- loop.create_future()
创建一个附加到事件循环的 asyncio.Future 对象。
这是在 asyncio 中创建 Futures 的首选方式。 这让第三方事件循环提供 Future 对象的替代实现(具有更好的性能或检测)。
版本 3.5.2 中的新功能。
- loop.create_task(coro)
- loop.set_task_factory(factory)
设置一个由 loop.create_task() 使用的任务工厂。
如果 factory 是
None
将设置默认任务工厂。 否则,factory 必须是一个 callable,签名匹配(loop, coro)
,其中 loop 是对活动事件循环的引用,而 ]coro 是一个协程对象。 可调用对象必须返回 asyncio.Future 兼容对象。
- loop.get_task_factory()
- 如果使用默认工厂,则返回任务工厂或
None
。
打开网络连接
创建网络服务器
传输文件
TLS 升级
查看文件描述符
- loop.add_reader(fd, callback, \*args)
- 开始监视 fd 文件描述符的读取可用性,并在 fd 可供读取时使用指定的参数调用 callback。
- loop.remove_reader(fd)
- 停止监视 fd 文件描述符以获取读取可用性。
- loop.add_writer(fd, callback, \*args)
开始监视 fd 文件描述符的写入可用性,并在 fd 可用于写入时使用指定的参数调用 callback。
使用 functools.partial() 将关键字参数 传递给 callback。
- loop.remove_writer(fd)
- 停止监视 fd 文件描述符以获取写入可用性。
有关这些方法的一些限制,另请参阅 平台支持 部分。
直接使用套接字对象
通常,使用基于传输的 API(例如 loop.create_connection()
和 loop.create_server()
)的协议实现比直接使用套接字的实现要快。 但是,在某些情况下,性能并不重要,直接使用 socket 对象更方便。
域名系统
3.7 版更改: getaddrinfo 和 getnameinfo 方法总是被记录为返回协程,但在 Python 3.7 之前,它们实际上返回 ]asyncio.Future 对象。 从 Python 3.7 开始,这两种方法都是协程。
使用管道
也可以看看
loop.subprocess_exec()
和 loop.subprocess_shell()
方法。
Unix 信号
- loop.add_signal_handler(signum, callback, \*args)
将 callback 设置为 signum 信号的处理程序。
回调将由 loop 以及该事件循环的其他排队回调和可运行协程调用。 与使用 signal.signal() 注册的信号处理程序不同,使用此函数注册的回调允许与事件循环交互。
如果信号编号无效或无法捕获,则引发 ValueError。 如果设置处理程序出现问题,则引发 RuntimeError。
使用 functools.partial() 将关键字参数 传递给 callback。
和signal.signal()一样,这个函数必须在主线程中调用。
- loop.remove_signal_handler(sig)
删除 sig 信号的处理程序。
如果删除了信号处理程序,则返回
True
,如果没有为给定信号设置处理程序,则返回False
。
在线程或进程池中执行代码
- loop.set_default_executor(executor)
将 executor 设置为
run_in_executor()
使用的默认执行器。 executor 应该是 ThreadPoolExecutor 的一个实例。自 3.7 版起已弃用: 不推荐使用不是 ThreadPoolExecutor 实例的执行程序,并将在 Python 3.9 中触发错误。
executor 必须是 concurrent.futures.ThreadPoolExecutor 的实例。
错误处理 API
允许自定义如何在事件循环中处理异常。
- loop.set_exception_handler(handler)
将 handler 设置为新的事件循环异常处理程序。
如果 handler 是
None
,将设置默认异常处理程序。 否则,handler 必须是一个签名匹配(loop, context)
的可调用对象,其中loop
是对活动事件循环的引用,而context
是一个 [ X162X] 包含异常详细信息的对象(有关上下文的详细信息,请参阅 call_exception_handler() 文档)。
- loop.get_exception_handler()
返回当前异常处理程序,如果没有设置自定义异常处理程序,则返回
None
。版本 3.5.2 中的新功能。
- loop.default_exception_handler(context)
默认异常处理程序。
当发生异常且未设置异常处理程序时调用此方法。 这可以由想要遵循默认处理程序行为的自定义异常处理程序调用。
context 参数的含义与 call_exception_handler() 中的含义相同。
- loop.call_exception_handler(context)
调用当前事件循环异常处理程序。
context 是一个
dict
对象,包含以下键(新的键可能会在未来的 Python 版本中引入):'message':错误信息;
“异常”(可选):异常对象;
'future'(可选):asyncio.Future 实例;
'handle'(可选):asyncio.Handle 实例;
'protocol'(可选):Protocol 实例;
'transport'(可选):Transport 实例;
'socket'(可选):socket.socket 实例。
笔记
此方法不应在子类事件循环中重载。 对于自定义异常处理,请使用 set_exception_handler() 方法。
启用调试模式
- loop.get_debug()
获取事件循环的调试模式(bool)。
如果环境变量 PYTHONASYNCIODEBUG 设置为非空字符串,则默认值为
True
,否则为False
。
- loop.set_debug(enabled: bool)
设置事件循环的调试模式。
3.7 版更改: 新的
-X dev
命令行选项现在也可用于启用调试模式。
运行子进程
本小节中描述的方法是低级的。 在常规 async/await 代码中,请考虑使用高级 asyncio.create_subprocess_shell()
和 asyncio.create_subprocess_exec()
便利函数。
笔记
Windows 上的默认 asyncio 事件循环不支持子进程。 有关详细信息,请参阅 Windows 上的子进程支持 [X37X]。
回调句柄
- class asyncio.Handle
loop.call_soon(), loop.call_soon_threadsafe() 返回的回调包装对象。
- cancel()
取消回调。 如果回调已经被取消或执行,则此方法无效。
- cancelled()
如果回调被取消,则返回
True
。3.7 版中的新功能。
- class asyncio.TimerHandle
loop.call_later() 和 loop.call_at() 返回的回调包装对象。
此类是 Handle 的子类。
- when()
将预定的回调时间返回为 float 秒。
时间是绝对时间戳,使用与 loop.time() 相同的时间参考。
3.7 版中的新功能。
服务器对象
服务器对象由 loop.create_server()
、loop.create_unix_server()
、start_server()
和 start_unix_server()
函数创建。
不要直接实例化类。
- class asyncio.Server
Server 对象是异步上下文管理器。 在
async with
语句中使用时,可以保证在async with
语句完成时服务器对象已关闭且不接受新连接:srv = await loop.create_server(...) async with srv: # some code # At this point, srv is closed and no longer accepts new connections.
3.7 版更改:Server 对象是自 Python 3.7 以来的异步上下文管理器。
- close()
停止服务:关闭监听套接字并将 sockets 属性设置为
None
。代表现有传入客户端连接的套接字保持打开状态。
服务器异步关闭,使用
wait_closed()
协程等待服务器关闭。
- get_loop()
返回与服务器对象关联的事件循环。
3.7 版中的新功能。
- is_serving()
如果服务器正在接受新连接,则返回
True
。3.7 版中的新功能。
- sockets
服务器正在侦听的 socket.socket 对象列表,如果服务器关闭,则为
None
。3.7 版更改: Python 3.7 之前
Server.sockets
用于直接返回服务器套接字的内部列表。 在 3.7 中,会返回该列表的副本。
事件循环实现
asyncio 附带两种不同的事件循环实现:SelectorEventLoop 和 ProactorEventLoop。
默认情况下,asyncio 配置为在所有平台上使用 SelectorEventLoop。
- class asyncio.SelectorEventLoop
基于 选择器 模块的事件循环。
使用给定平台可用的最有效的 选择器 。 也可以手动配置要使用的确切选择器实现:
import asyncio import selectors selector = selectors.SelectSelector() loop = asyncio.SelectorEventLoop(selector) asyncio.set_event_loop(loop)
- class asyncio.ProactorEventLoop
使用“I/O 完成端口”(IOCP) 的 Windows 事件循环。
如何在 Windows 上使用 ProactorEventLoop 的示例:
import asyncio import sys if sys.platform == 'win32': loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop)
也可以看看
- class asyncio.AbstractEventLoop
异步兼容事件循环的抽象基类。
Event Loop Methods 部分列出了
AbstractEventLoop
的替代实现应该定义的所有方法。
例子
请注意,本节 有意 中的所有示例都展示了如何使用低级事件循环 API,例如 loop.run_forever() 和 loop.call_soon() ]。 现代 asyncio 应用程序很少需要以这种方式编写; 考虑使用像 asyncio.run() 这样的高级函数。
Hello World with call_soon()
使用 loop.call_soon() 方法安排回调的示例。 回调显示 "Hello World"
然后停止事件循环:
import asyncio
def hello_world(loop):
"""A callback to print 'Hello World' and stop the event loop"""
print('Hello World')
loop.stop()
loop = asyncio.get_event_loop()
# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)
# Blocking call interrupted by loop.stop()
try:
loop.run_forever()
finally:
loop.close()
使用 call_later() 显示当前日期
每秒显示当前日期的回调示例。 回调使用 loop.call_later() 方法在 5 秒后重新调度自身,然后停止事件循环:
import asyncio
import datetime
def display_date(end_time, loop):
print(datetime.datetime.now())
if (loop.time() + 1.0) < end_time:
loop.call_later(1, display_date, end_time, loop)
else:
loop.stop()
loop = asyncio.get_event_loop()
# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)
# Blocking call interrupted by loop.stop()
try:
loop.run_forever()
finally:
loop.close()
观察文件描述符以获取读取事件
等到文件描述符使用 loop.add_reader() 方法接收到一些数据,然后关闭事件循环:
import asyncio
from socket import socketpair
# Create a pair of connected file descriptors
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()
def reader():
data = rsock.recv(100)
print("Received:", data.decode())
# We are done: unregister the file descriptor
loop.remove_reader(rsock)
# Stop the event loop
loop.stop()
# Register the file descriptor for read event
loop.add_reader(rsock, reader)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
try:
# Run the event loop
loop.run_forever()
finally:
# We are done. Close sockets and the event loop.
rsock.close()
wsock.close()
loop.close()
为 SIGINT 和 SIGTERM 设置信号处理程序
(此 signals
示例仅适用于 Unix。)
使用 loop.add_signal_handler() 方法为信号 SIGINT
和 SIGTERM
注册处理程序:
import asyncio
import functools
import os
import signal
def ask_exit(signame, loop):
print("got signal %s: exit" % signame)
loop.stop()
async def main():
loop = asyncio.get_running_loop()
for signame in {'SIGINT', 'SIGTERM'}:
loop.add_signal_handler(
getattr(signal, signame),
functools.partial(ask_exit, signame, loop))
await asyncio.sleep(3600)
print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")
asyncio.run(main())