传输和协议 — Python 文档
传输和协议
前言
传输和协议由 低级 事件循环 API 使用,例如 loop.create_connection()
。 它们使用基于回调的编程风格并支持网络或 IPC 协议的高性能实现(例如 HTTP)。
本质上,传输和协议应该只在库和框架中使用,而不要在高级异步应用程序中使用。
介绍
在最高级别,传输关注 如何 字节传输,而协议确定 传输哪些 字节(以及在某种程度上何时传输)。
说同一件事的另一种方式:从传输的角度来看,传输是套接字(或类似的 I/O 端点)的抽象,而协议是应用程序的抽象。
另一个观点是传输接口和协议接口一起定义了一个抽象接口,用于使用网络 I/O 和进程间 I/O。
传输和协议对象之间始终存在 1:1 的关系:协议调用传输方法发送数据,而传输调用协议方法将接收到的数据传递给它。
大多数面向连接的事件循环方法(例如 loop.create_connection()
)通常接受一个 protocol_factory 参数,用于为已接受的连接创建 Protocol 对象,由 表示]传输对象。 此类方法通常返回 (transport, protocol)
的元组。
内容
此文档页面包含以下部分:
- Transports 部分记录了 asyncio BaseTransport、ReadTransport、WriteTransport、Transport、X1Datagram4X] , 和 SubprocessTransport 类。
- Protocols 部分记录了 asyncio BaseProtocol、Protocol、BufferedProtocol、DatagramProtocol[X1384X] 和 [X1384X] 和 [X1384X] ] 类。
- 示例 部分展示了如何使用传输、协议和低级事件循环 API。
运输
源代码: :source:`Lib/asyncio/transsports.py`
传输是 asyncio 提供的类,用于抽象各种通信通道。
传输对象总是由 异步事件循环 实例化。
asyncio 为 TCP、UDP、SSL 和子进程管道实现传输。 传输上可用的方法取决于传输的种类。
传输类 不是线程安全的 。
传输层次结构
- class asyncio.BaseTransport
- 所有传输的基类。 包含所有 asyncio 传输共享的方法。
- class asyncio.WriteTransport(BaseTransport)
只写连接的基本传输。
WriteTransport 类的实例从
loop.connect_write_pipe()
事件循环方法返回,并且也被子进程相关方法使用,如loop.subprocess_exec()
。
- class asyncio.ReadTransport(BaseTransport)
只读连接的基本传输。
ReadTransport 类的实例从
loop.connect_read_pipe()
事件循环方法返回,也被子进程相关方法使用,如loop.subprocess_exec()
。
- class asyncio.Transport(WriteTransport, ReadTransport)
表示双向传输的接口,例如 TCP 连接。
用户不直接实例化传输; 他们调用一个实用函数,将一个协议工厂和其他创建传输和协议所需的信息传递给它。
Transport 类的实例由
loop.create_connection()
、loop.create_unix_connection()
、loop.create_server()
、loop.sendfile()
等事件循环方法返回或使用.
- class asyncio.DatagramTransport(BaseTransport)
数据报 (UDP) 连接的传输。
DatagramTransport 类的实例从
loop.create_datagram_endpoint()
事件循环方法返回。
- class asyncio.SubprocessTransport(BaseTransport)
表示父操作系统与其子操作系统进程之间的连接的抽象。
SubprocessTransport 类的实例从事件循环方法
loop.subprocess_shell()
和loop.subprocess_exec()
返回。
基地运输
- BaseTransport.close()
关闭运输。
如果传输具有用于传出数据的缓冲区,则将异步刷新缓冲数据。 将不再接收数据。 刷新所有缓冲数据后,将使用 None 作为参数调用协议的 protocol.connection_lost() 方法。
- BaseTransport.is_closing()
- 如果传输正在关闭或已关闭,则返回
True
。
- BaseTransport.get_extra_info(name, default=None)
返回有关它使用的传输或底层资源的信息。
name 是一个字符串,表示要获取的传输特定信息。
default 是在信息不可用时返回的值,或者如果传输不支持使用给定的第三方事件循环实现或在当前平台上查询它。
例如,以下代码尝试获取传输的底层套接字对象:
sock = transport.get_extra_info('socket') if sock is not None: print(sock.getsockopt(...))
可以在某些传输上查询的信息类别:
插座:
'peername'
:socket连接的远程地址,socket.socket.getpeername()的结果(None
出错)'socket'
: socket.socket 实例'sockname'
:socket自己的地址,socket.socket.getsockname()的结果
SSL 套接字:
'compression'
:作为字符串使用的压缩算法,如果连接未压缩,则为None
; ssl.SSLSocket.compression() 的结果'cipher'
:一个三值元组,包含所用密码的名称、定义其用途的 SSL 协议版本以及所使用的秘密位数; ssl.SSLSocket.cipher() 的结果'peercert'
:对等证书; ssl.SSLSocket.getpeercert() 的结果'sslcontext'
:ssl.SSLContext 实例'ssl_object'
: ssl.SSLObject 或 ssl.SSLSocket 实例
管道:
'pipe'
:管道对象
子流程:
'subprocess'
: subprocess.Popen 实例
- BaseTransport.set_protocol(protocol)
设置新协议。
仅当两种协议都记录为支持切换时,才应执行切换协议。
- BaseTransport.get_protocol()
- 返回当前协议。
只读传输
- ReadTransport.is_reading()
如果传输正在接收新数据,则返回
True
。3.7 版中的新功能。
- ReadTransport.pause_reading()
暂停传输的接收端。 在调用 resume_reading() 之前,不会将数据传递给协议的 protocol.data_received() 方法。
3.7版本变化:方法是幂等的,即 当传输已经暂停或关闭时可以调用它。
- ReadTransport.resume_reading()
恢复接收端。 如果某些数据可供读取,协议的 protocol.data_received() 方法将再次调用。
3.7版本变化:方法是幂等的,即 当传输已经读取时可以调用它。
只写传输
- WriteTransport.abort()
- 立即关闭传输,无需等待挂起的操作完成。 缓冲的数据将丢失。 将不再接收数据。 协议的 protocol.connection_lost() 方法最终将使用 None 作为参数调用。
- WriteTransport.can_write_eof()
- 如果传输支持 write_eof(),则返回 True,否则返回 False。
- WriteTransport.get_write_buffer_size()
- 返回传输使用的输出缓冲区的当前大小。
- WriteTransport.get_write_buffer_limits()
获取 high 和 low 水印用于写流控制。 返回一个元组
(low, high)
,其中 low 和 high 是正字节数。使用 set_write_buffer_limits() 设置限制。
版本 3.4.2 中的新功能。
- WriteTransport.set_write_buffer_limits(high=None, low=None)
设置 high 和 low 水印用于写流控制。
这两个值(以字节数衡量)控制何时调用协议的 protocol.pause_writing() 和 protocol.resume_writing() 方法。 如果指定,低水印必须小于或等于高水印。 high 和 low 都不能为负数。
pause_writing() 当缓冲区大小变得大于或等于 high 值时调用。 如果写入已暂停,则在缓冲区大小小于或等于 low 值时调用 resume_writing()。
默认值是特定于实现的。 如果仅给出高水印,则低水印默认为小于或等于高水印的特定于实现的值。 将 high 设置为零也会强制 low 为零,并导致在缓冲区变为非空时调用 pause_writing()。 将 low 设置为零会导致 resume_writing() 仅在缓冲区为空时调用。 对任一限制使用零通常是次优的,因为它减少了同时进行 I/O 和计算的机会。
使用 get_write_buffer_limits() 获取限制。
- WriteTransport.write(data)
将一些 data 字节写入传输。
这个方法不会阻塞; 它缓冲数据并安排它异步发送。
- WriteTransport.writelines(list_of_data)
- 将数据字节的列表(或任何可迭代的)写入传输。 这在功能上等同于对迭代产生的每个元素调用 write(),但可以更有效地实现。
- WriteTransport.write_eof()
刷新所有缓冲数据后关闭传输的写入端。 可能仍会收到数据。
如果传输(例如 SSL) 不支持半封闭连接。
数据报传输
- DatagramTransport.sendto(data, addr=None)
将 data 字节发送到由 addr(依赖于传输的目标地址)给出的远程对等方。 如果 addr 是 None,则数据将发送到传输创建时给定的目标地址。
这个方法不会阻塞; 它缓冲数据并安排它异步发送。
- DatagramTransport.abort()
- 立即关闭传输,无需等待挂起的操作完成。 缓冲的数据将丢失。 将不再接收数据。 协议的 protocol.connection_lost() 方法最终将使用 None 作为参数调用。
子流程传输
- SubprocessTransport.get_pid()
- 以整数形式返回子进程进程 ID。
- SubprocessTransport.get_pipe_transport(fd)
- 返回与整数文件描述符 fd 对应的通信管道的传输:
- SubprocessTransport.get_returncode()
- 将子进程返回码作为整数返回,如果没有返回,则返回 None,类似于 subprocess.Popen.returncode 属性。
- SubprocessTransport.kill()
杀死子进程。
在 POSIX 系统上,该函数将 SIGKILL 发送到子进程。 在 Windows 上,此方法是 terminate() 的别名。
- SubprocessTransport.send_signal(signal)
- 将 信号 编号发送到子进程,如 subprocess.Popen.send_signal()。
- SubprocessTransport.terminate()
停止子进程。
在 POSIX 系统上,此方法将 SIGTERM 发送到子进程。 在 Windows 上,调用 Windows API 函数 TerminateProcess() 来停止子进程。
- SubprocessTransport.close()
通过调用 kill() 方法终止子进程。
如果子进程尚未返回,则关闭 stdin、stdout 和 stderr 管道的传输。
协议
源代码: :source:`Lib/asyncio/protocols.py`
asyncio 提供了一组用于实现网络协议的抽象基类。 这些类旨在与 传输 一起使用。
抽象基协议类的子类可以实现一些或所有方法。 所有这些方法都是回调:它们在某些事件上被传输调用,例如当接收到一些数据时。 相应的传输应调用基本协议方法。
基本协议
- class asyncio.BaseProtocol
- 具有所有协议共享的方法的基本协议。
- class asyncio.Protocol(BaseProtocol)
- 用于实现流协议(TCP、Unix 套接字等)的基类。
- class asyncio.BufferedProtocol(BaseProtocol)
- 用于通过手动控制接收缓冲区来实现流协议的基类。
- class asyncio.DatagramProtocol(BaseProtocol)
- 用于实现数据报 (UDP) 协议的基类。
- class asyncio.SubprocessProtocol(BaseProtocol)
- 用于实现与子进程(单向管道)通信的协议的基类。
基础协议
所有 asyncio 协议都可以实现 Base Protocol 回调。
连接回调
对所有协议调用连接回调,每次成功连接仅调用一次。 所有其他协议回调只能在这两种方法之间调用。
- BaseProtocol.connection_made(transport)
建立连接时调用。
transport 参数是表示连接的传输。 该协议负责存储对其传输的引用。
- BaseProtocol.connection_lost(exc)
当连接丢失或关闭时调用。
参数是异常对象或 None。 后者意味着接收到常规 EOF,或者连接被连接的这一侧中止或关闭。
流控制回调
传输可以调用流控制回调来暂停或恢复协议执行的写入。
有关更多详细信息,请参阅 set_write_buffer_limits() 方法的文档。
- BaseProtocol.pause_writing()
- 当传输的缓冲区超过高水位线时调用。
- BaseProtocol.resume_writing()
- 当传输的缓冲区耗尽低于低水位线时调用。
如果缓冲区大小等于高水印,则不调用 pause_writing():缓冲区大小必须严格超过。
相反,当缓冲区大小等于或小于低水印时,会调用 resume_writing()。 当任一标记为零时,这些结束条件对于确保事情按预期进行非常重要。
流媒体协议
事件方法,例如 loop.create_server()
、loop.create_unix_server()
、loop.create_connection()
、loop.create_unix_connection()
、loop.connect_accepted_socket()
、loop.connect_read_pipe()
和 loop.connect_write_pipe()
接受返回流协议的工厂。
- Protocol.data_received(data)
当接收到一些数据时调用。 data 是一个包含传入数据的非空字节对象。
数据是缓冲、分块还是重组取决于传输。 通常,您不应依赖特定的语义,而应使解析通用且灵活。 但是,数据总是以正确的顺序接收。
该方法可以在连接打开时调用任意次数。
然而, protocol.eof_received() 最多被调用一次。 一旦 eof_received() 被调用,
data_received()
就不再被调用。
- Protocol.eof_received()
当另一端发出信号不会再发送任何数据时调用(例如通过调用 transport.write_eof(),如果另一端也使用 asyncio)。
此方法可能会返回一个错误值(包括
None
),在这种情况下传输将自行关闭。 相反,如果此方法返回真值,则使用的协议确定是否关闭传输。 由于默认实现返回None
,它隐式关闭连接。某些传输(包括 SSL)不支持半关闭连接,在这种情况下,从此方法返回 true 将导致连接被关闭。
状态机:
start -> connection_made
[-> data_received]*
[-> eof_received]?
-> connection_lost -> end
缓冲流协议
3.7 版的新功能:重要提示: 这已临时添加到 Python 3.7 中的 asyncio! 这是一个实验性的 API,可能会在 Python 3.8 中完全更改或删除。
缓冲协议可以与任何支持 流协议 的事件循环方法一起使用。
BufferedProtocol
实现允许显式手动分配和控制接收缓冲区。 然后事件循环可以使用协议提供的缓冲区来避免不必要的数据复制。 这可以显着提高接收大量数据的协议的性能。 复杂的协议实现可以显着减少缓冲区分配的数量。
在 BufferedProtocol 实例上调用以下回调:
- BufferedProtocol.get_buffer(sizehint)
调用以分配新的接收缓冲区。
sizehint 是推荐的返回缓冲区的最小大小。 返回比 sizehint 建议的更小或更大的缓冲区是可以接受的。 当设置为 -1 时,缓冲区大小可以是任意的。 返回大小为零的缓冲区是错误的。
get_buffer()
必须返回一个实现 缓冲协议 的对象。
- BufferedProtocol.buffer_updated(nbytes)
在使用接收到的数据更新缓冲区时调用。
nbytes 是写入缓冲区的总字节数。
- BufferedProtocol.eof_received()
- 请参阅 protocol.eof_received() 方法的文档。
get_buffer() 在连接期间可以被调用任意次数。 然而, protocol.eof_received() 最多被调用一次,如果被调用,get_buffer() 和 buffer_updated() 将不会被调用。
状态机:
start -> connection_made
[-> get_buffer
[-> buffer_updated]?
]*
[-> eof_received]?
-> connection_lost -> end
数据报协议
数据报协议实例应该由传递给 loop.create_datagram_endpoint()
方法的协议工厂构造。
- DatagramProtocol.datagram_received(data, addr)
- 收到数据报时调用。 data 是一个包含传入数据的字节对象。 addr为发送数据的peer地址; 确切的格式取决于传输。
- DatagramProtocol.error_received(exc)
当先前的发送或接收操作引发 OSError 时调用。 exc 是 OSError 实例。
在极少数情况下调用此方法,当传输(例如 UDP) 检测到数据报无法传送到其接收者。 但在许多情况下,无法传递的数据报将被悄悄丢弃。
笔记
在 BSD 系统(macOS、FreeBSD 等)上,数据报协议不支持流量控制,因为没有可靠的方法来检测由于写入过多数据包而导致的发送失败。
套接字始终显示为“就绪”,多余的数据包将被丢弃。 OSError errno
设置为 errno.ENOBUFS 可能会也可能不会引发; 如果它被引发,它将被报告给 DatagramProtocol.error_received() 但否则会被忽略。
子进程协议
数据报协议实例应该由传递给 loop.subprocess_exec()
和 loop.subprocess_shell()
方法的协议工厂构造。
- SubprocessProtocol.pipe_data_received(fd, data)
当子进程将数据写入其 stdout 或 stderr 管道时调用。
fd 是管道的整数文件描述符。
data 是一个包含接收数据的非空字节对象。
- SubprocessProtocol.pipe_connection_lost(fd, exc)
当与子进程通信的管道之一关闭时调用。
fd 是已关闭的整数文件描述符。
- SubprocessProtocol.process_exited()
- 当子进程退出时调用。
例子
TCP 回声服务器
使用 loop.create_server()
方法创建一个 TCP 回显服务器,发送回接收到的数据,并关闭连接:
import asyncio
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
print('Send: {!r}'.format(message))
self.transport.write(data)
print('Close the client socket')
self.transport.close()
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
server = await loop.create_server(
lambda: EchoServerProtocol(),
'127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
TCP 回显客户端
使用 loop.create_connection()
方法的 TCP 回显客户端发送数据,并等待连接关闭:
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = 'Hello World!'
transport, protocol = await loop.create_connection(
lambda: EchoClientProtocol(message, on_con_lost),
'127.0.0.1', 8888)
# Wait until the protocol signals that the connection
# is lost and close the transport.
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
UDP 回波服务器
UDP 回显服务器使用 loop.create_datagram_endpoint()
方法发回接收到的数据:
import asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
async def main():
print("Starting UDP server")
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
# One protocol instance will be created to serve all
# client requests.
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoServerProtocol(),
local_addr=('127.0.0.1', 9999))
try:
await asyncio.sleep(3600) # Serve for 1 hour.
finally:
transport.close()
asyncio.run(main())
UDP 回声客户端
UDP 回显客户端使用 loop.create_datagram_endpoint()
方法发送数据并在收到答复时关闭传输:
import asyncio
class EchoClientProtocol:
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('Send:', self.message)
self.transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print("Received:", data.decode())
print("Close the socket")
self.transport.close()
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Connection closed")
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = "Hello World!"
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoClientProtocol(message, on_con_lost),
remote_addr=('127.0.0.1', 9999))
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
连接现有的套接字
等待套接字使用带有协议的 loop.create_connection()
方法接收数据:
import asyncio
import socket
class MyProtocol(asyncio.Protocol):
def __init__(self, on_con_lost):
self.transport = None
self.on_con_lost = on_con_lost
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print("Received:", data.decode())
# We are done: close the transport;
# connection_lost() will be called automatically.
self.transport.close()
def connection_lost(self, exc):
# The socket has been closed
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
# Create a pair of connected sockets
rsock, wsock = socket.socketpair()
# Register the socket to wait for data.
transport, protocol = await loop.create_connection(
lambda: MyProtocol(on_con_lost), sock=rsock)
# Simulate the reception of data from the network.
loop.call_soon(wsock.send, 'abc'.encode())
try:
await protocol.on_con_lost
finally:
transport.close()
wsock.close()
asyncio.run(main())
也可以看看
watch a file descriptor for read events 示例使用低级 loop.add_reader() 方法注册 FD。
注册一个打开的套接字以使用流等待数据 示例使用由 open_connection()
函数在协程中创建的高级流。
loop.subprocess_exec() 和 SubprocessProtocol
用于获取子进程输出并等待子进程退出的子进程协议示例。
子进程由 loop.subprocess_exec()
方法创建:
import asyncio
import sys
class DateProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future):
self.exit_future = exit_future
self.output = bytearray()
def pipe_data_received(self, fd, data):
self.output.extend(data)
def process_exited(self):
self.exit_future.set_result(True)
async def get_date():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
code = 'import datetime; print(datetime.datetime.now())'
exit_future = asyncio.Future(loop=loop)
# Create the subprocess controlled by DateProtocol;
# redirect the standard output into a pipe.
transport, protocol = await loop.subprocess_exec(
lambda: DateProtocol(exit_future),
sys.executable, '-c', code,
stdin=None, stderr=None)
# Wait for the subprocess exit using the process_exited()
# method of the protocol.
await exit_future
# Close the stdout pipe.
transport.close()
# Read the output which was collected by the
# pipe_data_received() method of the protocol.
data = bytes(protocol.output)
return data.decode('ascii').rstrip()
date = asyncio.run(get_date())
print(f"Current date: {date}")
另请参阅使用高级 API 编写的 相同示例 。