18.5.4. 传输和协议(基于回调的 API) — Python 文档

来自菜鸟教程
Python/docs/3.6/library/asyncio-protocol
跳转至:导航、​搜索

18.5.4. 传输和协议(基于回调的 API)

源代码: :source:`Lib/asyncio/transsports.py`

源代码: :source:`Lib/asyncio/protocols.py`

18.5.4.1. 运输

传输是 asyncio 提供的类,用于抽象各种通信通道。 您通常不会自己实例化传输; 相反,您将调用 AbstractEventLoop 方法,该方法将创建传输并尝试启动底层通信通道,并在成功时回调您。

一旦建立了通信通道,传输总是与 协议 实例配对。 然后,协议可以出于各种目的调用传输的方法。

asyncio 目前为 TCP、UDP、SSL 和子进程管道实现传输。 传输上可用的方法取决于传输的种类。

传输类 不是线程安全的

3.6 版更改: 插座选项 TCP_NODELAY 现在默认设置。


18.5.4.1.1。 基础传输

class asyncio.BaseTransport

传输的基类。

close()

关闭运输。 如果传输具有用于传出数据的缓冲区,则将异步刷新缓冲数据。 将不再接收数据。 刷新所有缓冲数据后,将使用 None 作为参数调用协议的 connection_lost() 方法。

is_closing()

如果传输正在关闭或已关闭,则返回 True

3.5.1 版中的新功能。

get_extra_info(name, default=None)

返回可选的运输信息。 name 是一个字符串,表示要获取的传输特定信息,default 是如果信息不存在则返回的值。

此方法允许传输实现轻松公开特定于通道的信息。

set_protocol(protocol)

设置新协议。 仅当两种协议都记录为支持切换时,才应执行切换协议。

3.5.3 版中的新功能。

get_protocol()

返回当前协议。

3.5.3 版中的新功能。

在 3.5.1 版更改:'ssl_object' 信息已添加到 SSL 套接字。


18.5.4.1.2. 读取传输

class asyncio.ReadTransport

只读传输接口。

pause_reading()

暂停传输的接收端。 在调用 resume_reading() 之前,不会将数据传递给协议的 data_received() 方法。

3.6.7版本变化:方法是幂等的,即 当传输已经暂停或关闭时可以调用它。

resume_reading()

恢复接收端。 如果有一些数据可供读取,协议的 data_received() 方法将被再次调用。

3.6.7版本变化:方法是幂等的,即 当传输已经读取时可以调用它。


18.5.4.1.3。 写传输

class asyncio.WriteTransport

只写传输接口。

abort()

立即关闭传输,无需等待挂起的操作完成。 缓冲的数据将丢失。 将不再接收数据。 协议的 connection_lost() 方法最终会以 None 作为参数被调用。

can_write_eof()

如果传输支持 write_eof(),则返回 True,否则返回 False

get_write_buffer_size()

返回传输使用的输出缓冲区的当前大小。

get_write_buffer_limits()

获取写入流量控制的 high- 和 low-water 限制。 返回一个元组 (low, high),其中 lowhigh 是正字节数。

使用 set_write_buffer_limits() 设置限制。

版本 3.4.2 中的新功能。

set_write_buffer_limits(high=None, low=None)

为写流量控制设置 high- 和 low-water 限制。

这两个值(以字节数衡量)控制何时调用协议的 pause_writing()resume_writing() 方法。 如果指定,低水位限制必须小于或等于高水位限制。 highlow 都不能为负数。

当缓冲区大小大于或等于 high 值时调用 pause_writing()。 如果写入已暂停,则在缓冲区大小小于或等于 low 值时调用 resume_writing()

默认值是特定于实现的。 如果仅给出高水位限制,则低水位限制默认为小于或等于高水位限制的特定于实现的值。 将 high 设置为零也会强制 low 为零,并导致每当缓冲区变为非空时调用 pause_writing()。 将 low 设置为零会导致仅在缓冲区为空时调用 resume_writing()。 对任一限制使用零通常是次优的,因为它减少了同时进行 I/O 和计算的机会。

使用 get_write_buffer_limits() 获取限制。

write(data)

将一些 data 字节写入传输。

这个方法不会阻塞; 它缓冲数据并安排它异步发送。

writelines(list_of_data)

将数据字节的列表(或任何可迭代的)写入传输。 这在功能上等同于对迭代产生的每个元素调用 write(),但可以更有效地实现。

write_eof()

刷新缓冲数据后关闭传输的写入端。 可能仍会收到数据。

如果传输(例如 SSL) 不支持半关闭。


18.5.4.1.4。 数据报传输

DatagramTransport.sendto(data, addr=None)

data 字节发送到由 addr(依赖于传输的目标地址)给出的远程对等方。 如果 addrNone,则数据将发送到传输创建时给定的目标地址。

这个方法不会阻塞; 它缓冲数据并安排它异步发送。

DatagramTransport.abort()
立即关闭传输,无需等待挂起的操作完成。 缓冲的数据将丢失。 将不再接收数据。 协议的 connection_lost() 方法最终会以 None 作为参数被调用。


18.5.4.1.5。 基子进程传输

class asyncio.BaseSubprocessTransport
get_pid()

以整数形式返回子进程进程 ID。

get_pipe_transport(fd)

返回与整数文件描述符 fd 对应的通信管道的传输:

  • 0:标准输入的可读流传输 (stdin),或者 None 如果子进程不是用 stdin=PIPE 创建的

  • 1:标准输出的可写流传输 (stdout),或者 None 如果子进程不是用 stdout=PIPE 创建的

  • 2:标准错误的可写流传输 (stderr),或者 None 如果子进程不是用 stderr=PIPE 创建的

  • 其他 fd:

get_returncode()

将子进程返回码作为整数返回,如果没有返回,则返回 None,类似于 subprocess.Popen.returncode 属性。

kill()

杀死子进程,如 subprocess.Popen.kill()

在 POSIX 系统上,该函数将 SIGKILL 发送到子进程。 在 Windows 上,此方法是 terminate() 的别名。

send_signal(signal)

信号 编号发送到子进程,如 subprocess.Popen.send_signal()

terminate()

要求子进程停止,如 subprocess.Popen.terminate()。 此方法是 close() 方法的别名。

在 POSIX 系统上,此方法将 SIGTERM 发送到子进程。 在 Windows 上,调用 Windows API 函数 TerminateProcess() 来停止子进程。

close()

如果子进程尚未返回,则通过调用 terminate() 方法要求子进程停止,并关闭所有管道的传输(stdinstdoutstderr)。


18.5.4.2. 协议

asyncio 提供了基类,您可以将其子类化以实现您的网络协议。 这些类与 transsports(见下文)结合使用:协议解析传入数据并要求写入传出数据,而传输负责实际的 I/O 和缓冲。

在子类化协议类时,建议您覆盖某些方法。 这些方法是回调:它们将在某些事件上被传输调用(例如,当接收到一些数据时); 你不应该自己打电话给他们,除非你正在实施运输。

笔记

所有回调都有默认实现,它们是空的。 因此,您只需为您感兴趣的事件实现回调。


18.5.4.2.1。 协议类

class asyncio.Protocol
用于实现流协议的基类(用于例如 TCP 和 SSL 传输)。
class asyncio.DatagramProtocol
实现数据报协议的基类(用于例如 UDP 传输)。
class asyncio.SubprocessProtocol
用于实现与子进程通信的协议的基类(通过一组单向管道)。


18.5.4.2.2. 连接回调

这些回调可以在 ProtocolDatagramProtocolSubprocessProtocol 实例上调用:

BaseProtocol.connection_made(transport)

建立连接时调用。

transport 参数是表示连接的传输。 您有责任将其存储在某处(例如 作为一个属性)如果你需要。

BaseProtocol.connection_lost(exc)

当连接丢失或关闭时调用。

参数是异常对象或 None。 后者意味着接收到常规 EOF,或者连接被连接的这一侧中止或关闭。

connection_made()connection_lost() 每次成功连接只调用一次。 所有其他回调将在这两个方法之间调用,这允许在您的协议实现中更轻松地管理资源。

以下回调只能在 SubprocessProtocol 实例上调用:

SubprocessProtocol.pipe_data_received(fd, data)
当子进程将数据写入其 stdout 或 stderr 管道时调用。 fd 是管道的整数文件描述符。 data 是一个包含数据的非空字节对象。
SubprocessProtocol.pipe_connection_lost(fd, exc)
当与子进程通信的管道之一关闭时调用。 fd 是已关闭的整数文件描述符。
SubprocessProtocol.process_exited()
当子进程退出时调用。


18.5.4.2.3。 流媒体协议

Protocol 实例上调用以下回调:

Protocol.data_received(data)

当接收到一些数据时调用。 data 是一个包含传入数据的非空字节对象。

笔记

数据是缓冲、分块还是重组取决于传输。 一般来说,您不应该依赖特定的语义,而应该让您的解析足够通用和灵活。 但是,数据总是以正确的顺序接收。

Protocol.eof_received()

当另一端发出不再发送任何数据的信号时调用(例如通过调用 write_eof(),如果另一端也使用 asyncio)。

此方法可能会返回一个错误值(包括 None),在这种情况下传输将自行关闭。 相反,如果此方法返回真值,则关闭传输取决于协议。 由于默认实现返回 None,它隐式关闭连接。

笔记

某些传输(例如 SSL)不支持半关闭连接,在这种情况下,从此方法返回 true 不会阻止关闭连接。

data_received() 在连接期间可以被调用任意次数。 然而,eof_received() 最多被调用一次,如果被调用,data_received() 将不会在它之后被调用。

状态机:

开始 -> connection_made() [-> data_received() *] [-> eof_received() ?] -> connection_lost()[X123X ] -> 结束


18.5.4.2.4。 数据报协议

DatagramProtocol 实例上调用以下回调。

DatagramProtocol.datagram_received(data, addr)
收到数据报时调用。 data 是一个包含传入数据的字节对象。 addr为发送数据的peer地址; 确切的格式取决于传输。
DatagramProtocol.error_received(exc)

当先前的发送或接收操作引发 OSError 时调用。 excOSError 实例。

在极少数情况下调用此方法,当传输(例如 UDP) 检测到数据报无法传送到其接收者。 但在许多情况下,无法传递的数据报将被悄悄丢弃。


18.5.4.2.5。 流控回调

这些回调可以在 ProtocolDatagramProtocolSubprocessProtocol 实例上调用:

BaseProtocol.pause_writing()
当传输的缓冲区超过高水位线时调用。
BaseProtocol.resume_writing()
当传输的缓冲区排放到低水位线以下时调用。

pause_writing()resume_writing() 调用是成对的——当缓冲区严格超过高水位线时,pause_writing() 被调用一次(即使随后的写入增加了缓冲区大小),当缓冲区大小达到低水位线时,最终会调用一次 resume_writing()

笔记

如果缓冲区大小等于高水位线,则不调用 pause_writing() - 它必须严格结束。 相反,当缓冲区大小等于或低于低水位线时,会调用 resume_writing()。 当任一标记为零时,这些结束条件对于确保事情按预期进行非常重要。


笔记

在 BSD 系统(OS X、FreeBSD 等)上,DatagramProtocol 不支持流量控制,因为无法轻易检测到写入过多数据包导致的发送失败。 套接字始终显示为“就绪”,多余的数据包将被丢弃; OSError errno 设置为 errno.ENOBUFS 可能会也可能不会引发; 如果它被引发,它将被报告给 DatagramProtocol.error_received() 但否则会被忽略。


18.5.4.2.6。 协程和协议

可以使用 ensure_future() 在协议方法中调度协程,但不能保证执行顺序。 协议不知道在协议方法中创建的协程,因此不会等待它们。

要获得可靠的执行顺序,请在具有 yield from 的协程中使用 流对象 。 例如,StreamWriter.drain() 协程可用于等待直到写入缓冲区被刷新。


18.5.4.3. 协议示例

18.5.4.3.1。 TCP 回显客户端协议

TCP echo客户端使用AbstractEventLoop.create_connection()方法,发送数据并等待连接关闭:

import asyncio

class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
                              '127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

事件循环运行两次。 run_until_complete() 方法在这个简短的例子中是首选,如果服务器没有侦听,则引发异常,而不是必须编写一个简短的协程来处理异常并停止运行循环。 在 run_until_complete() 退出时,循环不再运行,因此发生错误时无需停止循环。

也可以看看

TCP 回显客户端使用流 示例使用 asyncio.open_connection() 函数。


18.5.4.3.2. TCP 回显服务器协议

TCP回显服务器使用AbstractEventLoop.create_server()方法,发回接收到的数据并关闭连接:

import asyncio

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()

loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

Transport.close() 可以在 WriteTransport.write() 之后立即调用,即使数据尚未在套接字上发送:这两种方法都是异步的。 yield from 不需要,因为这些传输方法不是协程。

也可以看看

TCP 回显服务器使用流 示例使用 asyncio.start_server() 函数。


18.5.4.3.3。 UDP 回显客户端协议

UDP 回显客户端使用 AbstractEventLoop.create_datagram_endpoint() 方法,当我们收到答案时发送数据并关闭传输:

import asyncio

class EchoClientProtocol:
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Socket closed, stop the event loop")
        loop = asyncio.get_event_loop()
        loop.stop()

loop = asyncio.get_event_loop()
message = "Hello World!"
connect = loop.create_datagram_endpoint(
    lambda: EchoClientProtocol(message, loop),
    remote_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()

18.5.4.3.4。 UDP 回显服务器协议

UDP echo server使用AbstractEventLoop.create_datagram_endpoint()方法,发回接收到的数据:

import asyncio

class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)

loop = asyncio.get_event_loop()
print("Starting UDP server")
# One protocol instance will be created to serve all client requests
listen = loop.create_datagram_endpoint(
    EchoServerProtocol, local_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(listen)

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

transport.close()
loop.close()

18.5.4.3.5。 使用协议注册一个打开的套接字以等待数据

等待socket通过协议使用AbstractEventLoop.create_connection()方法接收到数据,然后关闭事件循环

import asyncio
try:
    from socket import socketpair
except ImportError:
    from asyncio.windows_utils import socketpair

# Create a pair of connected sockets
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()

class MyProtocol(asyncio.Protocol):
    transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport (it will call connection_lost())
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed, stop the event loop
        loop.stop()

# Register the socket to wait for data
connect_coro = loop.create_connection(MyProtocol, sock=rsock)
transport, protocol = loop.run_until_complete(connect_coro)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

# Run the event loop
loop.run_forever()

# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()

也可以看看

watch a file descriptor for read events 示例使用低级 AbstractEventLoop.add_reader() 方法注册套接字的文件描述符。

注册一个打开的套接字以使用流等待数据 示例使用由协程中的 open_connection() 函数创建的高级流。