18.5.4. 传输和协议(基于回调的 API) — Python 文档
18.5.4. 传输和协议(基于回调的 API)
源代码: :source:`Lib/asyncio/transsports.py`
源代码: :source:`Lib/asyncio/protocols.py`
18.5.4.1. 运输
传输是 asyncio 提供的类,用于抽象各种通信通道。 您通常不会自己实例化传输; 相反,您将调用 AbstractEventLoop 方法,该方法将创建传输并尝试启动底层通信通道,并在成功时回调您。
一旦建立了通信通道,传输总是与 协议 实例配对。 然后,协议可以出于各种目的调用传输的方法。
asyncio 目前为 TCP、UDP、SSL 和子进程管道实现传输。 传输上可用的方法取决于传输的种类。
传输类 不是线程安全的 。
3.6 版更改: 插座选项 TCP_NODELAY
现在默认设置。
18.5.4.1.1。 基础传输
- class asyncio.BaseTransport
传输的基类。
- close()
关闭运输。 如果传输具有用于传出数据的缓冲区,则将异步刷新缓冲数据。 将不再接收数据。 刷新所有缓冲数据后,将使用 None 作为参数调用协议的
connection_lost()
方法。
- is_closing()
如果传输正在关闭或已关闭,则返回
True
。3.5.1 版中的新功能。
- get_extra_info(name, default=None)
返回可选的运输信息。 name 是一个字符串,表示要获取的传输特定信息,default 是如果信息不存在则返回的值。
此方法允许传输实现轻松公开特定于通道的信息。
插座:
'peername'
:socket连接的远程地址,socket.socket.getpeername()的结果(None
出错)'socket'
: socket.socket 实例'sockname'
:socket自己的地址,socket.socket.getsockname()的结果
SSL 套接字:
'compression'
:作为字符串使用的压缩算法,如果连接未压缩,则为None
; ssl.SSLSocket.compression() 的结果'cipher'
:一个三值元组,包含所用密码的名称、定义其用途的 SSL 协议版本以及所使用的秘密位数; ssl.SSLSocket.cipher() 的结果'peercert'
:对等证书; ssl.SSLSocket.getpeercert() 的结果'sslcontext'
:ssl.SSLContext 实例'ssl_object'
: ssl.SSLObject 或 ssl.SSLSocket 实例
管道:
'pipe'
:管道对象
子流程:
'subprocess'
: subprocess.Popen 实例
- set_protocol(protocol)
设置新协议。 仅当两种协议都记录为支持切换时,才应执行切换协议。
3.5.3 版中的新功能。
- get_protocol()
返回当前协议。
3.5.3 版中的新功能。
在 3.5.1 版更改:
'ssl_object'
信息已添加到 SSL 套接字。
18.5.4.1.2. 读取传输
- class asyncio.ReadTransport
只读传输接口。
- pause_reading()
暂停传输的接收端。 在调用 resume_reading() 之前,不会将数据传递给协议的
data_received()
方法。3.6.7版本变化:方法是幂等的,即 当传输已经暂停或关闭时可以调用它。
- resume_reading()
恢复接收端。 如果有一些数据可供读取,协议的
data_received()
方法将被再次调用。3.6.7版本变化:方法是幂等的,即 当传输已经读取时可以调用它。
18.5.4.1.3。 写传输
- class asyncio.WriteTransport
只写传输接口。
- abort()
立即关闭传输,无需等待挂起的操作完成。 缓冲的数据将丢失。 将不再接收数据。 协议的
connection_lost()
方法最终会以 None 作为参数被调用。
- can_write_eof()
如果传输支持 write_eof(),则返回 True,否则返回 False。
- get_write_buffer_size()
返回传输使用的输出缓冲区的当前大小。
- get_write_buffer_limits()
获取写入流量控制的 high- 和 low-water 限制。 返回一个元组
(low, high)
,其中 low 和 high 是正字节数。使用 set_write_buffer_limits() 设置限制。
版本 3.4.2 中的新功能。
- set_write_buffer_limits(high=None, low=None)
为写流量控制设置 high- 和 low-water 限制。
这两个值(以字节数衡量)控制何时调用协议的
pause_writing()
和resume_writing()
方法。 如果指定,低水位限制必须小于或等于高水位限制。 high 和 low 都不能为负数。当缓冲区大小大于或等于 high 值时调用
pause_writing()
。 如果写入已暂停,则在缓冲区大小小于或等于 low 值时调用resume_writing()
。默认值是特定于实现的。 如果仅给出高水位限制,则低水位限制默认为小于或等于高水位限制的特定于实现的值。 将 high 设置为零也会强制 low 为零,并导致每当缓冲区变为非空时调用
pause_writing()
。 将 low 设置为零会导致仅在缓冲区为空时调用resume_writing()
。 对任一限制使用零通常是次优的,因为它减少了同时进行 I/O 和计算的机会。使用 get_write_buffer_limits() 获取限制。
- write(data)
将一些 data 字节写入传输。
这个方法不会阻塞; 它缓冲数据并安排它异步发送。
- writelines(list_of_data)
将数据字节的列表(或任何可迭代的)写入传输。 这在功能上等同于对迭代产生的每个元素调用 write(),但可以更有效地实现。
- write_eof()
刷新缓冲数据后关闭传输的写入端。 可能仍会收到数据。
如果传输(例如 SSL) 不支持半关闭。
18.5.4.1.4。 数据报传输
- DatagramTransport.sendto(data, addr=None)
将 data 字节发送到由 addr(依赖于传输的目标地址)给出的远程对等方。 如果 addr 是 None,则数据将发送到传输创建时给定的目标地址。
这个方法不会阻塞; 它缓冲数据并安排它异步发送。
- DatagramTransport.abort()
- 立即关闭传输,无需等待挂起的操作完成。 缓冲的数据将丢失。 将不再接收数据。 协议的
connection_lost()
方法最终会以 None 作为参数被调用。
18.5.4.1.5。 基子进程传输
- class asyncio.BaseSubprocessTransport
- get_pid()
以整数形式返回子进程进程 ID。
- get_pipe_transport(fd)
返回与整数文件描述符 fd 对应的通信管道的传输:
- get_returncode()
将子进程返回码作为整数返回,如果没有返回,则返回 None,类似于 subprocess.Popen.returncode 属性。
- kill()
杀死子进程,如 subprocess.Popen.kill()。
在 POSIX 系统上,该函数将 SIGKILL 发送到子进程。 在 Windows 上,此方法是 terminate() 的别名。
- send_signal(signal)
将 信号 编号发送到子进程,如 subprocess.Popen.send_signal()。
- terminate()
要求子进程停止,如 subprocess.Popen.terminate()。 此方法是 close() 方法的别名。
在 POSIX 系统上,此方法将 SIGTERM 发送到子进程。 在 Windows 上,调用 Windows API 函数 TerminateProcess() 来停止子进程。
- close()
如果子进程尚未返回,则通过调用 terminate() 方法要求子进程停止,并关闭所有管道的传输(stdin、stdout 和stderr)。
18.5.4.2. 协议
asyncio 提供了基类,您可以将其子类化以实现您的网络协议。 这些类与 transsports(见下文)结合使用:协议解析传入数据并要求写入传出数据,而传输负责实际的 I/O 和缓冲。
在子类化协议类时,建议您覆盖某些方法。 这些方法是回调:它们将在某些事件上被传输调用(例如,当接收到一些数据时); 你不应该自己打电话给他们,除非你正在实施运输。
笔记
所有回调都有默认实现,它们是空的。 因此,您只需为您感兴趣的事件实现回调。
18.5.4.2.1。 协议类
- class asyncio.Protocol
- 用于实现流协议的基类(用于例如 TCP 和 SSL 传输)。
- class asyncio.DatagramProtocol
- 实现数据报协议的基类(用于例如 UDP 传输)。
- class asyncio.SubprocessProtocol
- 用于实现与子进程通信的协议的基类(通过一组单向管道)。
18.5.4.2.2. 连接回调
这些回调可以在 Protocol、DatagramProtocol 和 SubprocessProtocol 实例上调用:
- BaseProtocol.connection_made(transport)
建立连接时调用。
transport 参数是表示连接的传输。 您有责任将其存储在某处(例如 作为一个属性)如果你需要。
- BaseProtocol.connection_lost(exc)
当连接丢失或关闭时调用。
参数是异常对象或 None。 后者意味着接收到常规 EOF,或者连接被连接的这一侧中止或关闭。
connection_made() 和 connection_lost() 每次成功连接只调用一次。 所有其他回调将在这两个方法之间调用,这允许在您的协议实现中更轻松地管理资源。
以下回调只能在 SubprocessProtocol 实例上调用:
- SubprocessProtocol.pipe_data_received(fd, data)
- 当子进程将数据写入其 stdout 或 stderr 管道时调用。 fd 是管道的整数文件描述符。 data 是一个包含数据的非空字节对象。
- SubprocessProtocol.pipe_connection_lost(fd, exc)
- 当与子进程通信的管道之一关闭时调用。 fd 是已关闭的整数文件描述符。
- SubprocessProtocol.process_exited()
- 当子进程退出时调用。
18.5.4.2.3。 流媒体协议
在 Protocol 实例上调用以下回调:
- Protocol.data_received(data)
当接收到一些数据时调用。 data 是一个包含传入数据的非空字节对象。
笔记
数据是缓冲、分块还是重组取决于传输。 一般来说,您不应该依赖特定的语义,而应该让您的解析足够通用和灵活。 但是,数据总是以正确的顺序接收。
- Protocol.eof_received()
当另一端发出不再发送任何数据的信号时调用(例如通过调用
write_eof()
,如果另一端也使用 asyncio)。此方法可能会返回一个错误值(包括
None
),在这种情况下传输将自行关闭。 相反,如果此方法返回真值,则关闭传输取决于协议。 由于默认实现返回None
,它隐式关闭连接。笔记
某些传输(例如 SSL)不支持半关闭连接,在这种情况下,从此方法返回 true 不会阻止关闭连接。
data_received()
在连接期间可以被调用任意次数。 然而,eof_received()
最多被调用一次,如果被调用,data_received()
将不会在它之后被调用。
状态机:
开始 -> connection_made() [-> data_received() *] [-> eof_received() ?] -> connection_lost()[X123X ] -> 结束
18.5.4.2.4。 数据报协议
在 DatagramProtocol 实例上调用以下回调。
- DatagramProtocol.datagram_received(data, addr)
- 收到数据报时调用。 data 是一个包含传入数据的字节对象。 addr为发送数据的peer地址; 确切的格式取决于传输。
- DatagramProtocol.error_received(exc)
当先前的发送或接收操作引发 OSError 时调用。 exc 是 OSError 实例。
在极少数情况下调用此方法,当传输(例如 UDP) 检测到数据报无法传送到其接收者。 但在许多情况下,无法传递的数据报将被悄悄丢弃。
18.5.4.2.5。 流控回调
这些回调可以在 Protocol、DatagramProtocol 和 SubprocessProtocol 实例上调用:
- BaseProtocol.pause_writing()
- 当传输的缓冲区超过高水位线时调用。
- BaseProtocol.resume_writing()
- 当传输的缓冲区排放到低水位线以下时调用。
pause_writing()
和 resume_writing()
调用是成对的——当缓冲区严格超过高水位线时,pause_writing()
被调用一次(即使随后的写入增加了缓冲区大小),当缓冲区大小达到低水位线时,最终会调用一次 resume_writing()
。
笔记
如果缓冲区大小等于高水位线,则不调用 pause_writing()
- 它必须严格结束。 相反,当缓冲区大小等于或低于低水位线时,会调用 resume_writing()
。 当任一标记为零时,这些结束条件对于确保事情按预期进行非常重要。
笔记
在 BSD 系统(OS X、FreeBSD 等)上,DatagramProtocol 不支持流量控制,因为无法轻易检测到写入过多数据包导致的发送失败。 套接字始终显示为“就绪”,多余的数据包将被丢弃; OSError errno 设置为 errno.ENOBUFS 可能会也可能不会引发; 如果它被引发,它将被报告给 DatagramProtocol.error_received() 但否则会被忽略。
18.5.4.2.6。 协程和协议
可以使用 ensure_future() 在协议方法中调度协程,但不能保证执行顺序。 协议不知道在协议方法中创建的协程,因此不会等待它们。
要获得可靠的执行顺序,请在具有 yield from
的协程中使用 流对象 。 例如,StreamWriter.drain()
协程可用于等待直到写入缓冲区被刷新。
18.5.4.3. 协议示例
18.5.4.3.1。 TCP 回显客户端协议
TCP echo客户端使用AbstractEventLoop.create_connection()
方法,发送数据并等待连接关闭:
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, loop):
self.message = message
self.loop = loop
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
'127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
事件循环运行两次。 run_until_complete() 方法在这个简短的例子中是首选,如果服务器没有侦听,则引发异常,而不是必须编写一个简短的协程来处理异常并停止运行循环。 在 run_until_complete() 退出时,循环不再运行,因此发生错误时无需停止循环。
18.5.4.3.2. TCP 回显服务器协议
TCP回显服务器使用AbstractEventLoop.create_server()
方法,发回接收到的数据并关闭连接:
import asyncio
class EchoServerClientProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
print('Send: {!r}'.format(message))
self.transport.write(data)
print('Close the client socket')
self.transport.close()
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
Transport.close()
可以在 WriteTransport.write() 之后立即调用,即使数据尚未在套接字上发送:这两种方法都是异步的。 yield from
不需要,因为这些传输方法不是协程。
18.5.4.3.3。 UDP 回显客户端协议
UDP 回显客户端使用 AbstractEventLoop.create_datagram_endpoint()
方法,当我们收到答案时发送数据并关闭传输:
import asyncio
class EchoClientProtocol:
def __init__(self, message, loop):
self.message = message
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('Send:', self.message)
self.transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print("Received:", data.decode())
print("Close the socket")
self.transport.close()
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Socket closed, stop the event loop")
loop = asyncio.get_event_loop()
loop.stop()
loop = asyncio.get_event_loop()
message = "Hello World!"
connect = loop.create_datagram_endpoint(
lambda: EchoClientProtocol(message, loop),
remote_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()
18.5.4.3.4。 UDP 回显服务器协议
UDP echo server使用AbstractEventLoop.create_datagram_endpoint()
方法,发回接收到的数据:
import asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
loop = asyncio.get_event_loop()
print("Starting UDP server")
# One protocol instance will be created to serve all client requests
listen = loop.create_datagram_endpoint(
EchoServerProtocol, local_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(listen)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
transport.close()
loop.close()
18.5.4.3.5。 使用协议注册一个打开的套接字以等待数据
等待socket通过协议使用AbstractEventLoop.create_connection()
方法接收到数据,然后关闭事件循环
import asyncio
try:
from socket import socketpair
except ImportError:
from asyncio.windows_utils import socketpair
# Create a pair of connected sockets
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()
class MyProtocol(asyncio.Protocol):
transport = None
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print("Received:", data.decode())
# We are done: close the transport (it will call connection_lost())
self.transport.close()
def connection_lost(self, exc):
# The socket has been closed, stop the event loop
loop.stop()
# Register the socket to wait for data
connect_coro = loop.create_connection(MyProtocol, sock=rsock)
transport, protocol = loop.run_until_complete(connect_coro)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Run the event loop
loop.run_forever()
# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()
也可以看看
watch a file descriptor for read events 示例使用低级 AbstractEventLoop.add_reader() 方法注册套接字的文件描述符。
注册一个打开的套接字以使用流等待数据 示例使用由协程中的 open_connection()
函数创建的高级流。