18.5.5. Streams(基于协程的 API) — Python 文档
18.5.5. Streams(基于协程的 API)
源代码: :source:`Lib/asyncio/streams.py`
18.5.5.1. 流函数
笔记
此模块中的顶级函数仅用作便利包装器; 那里真的没有什么特别的,如果他们没有完全按照您的意愿行事,请随意复制他们的代码。
18.5.5.2. 流阅读器
- class asyncio.StreamReader(limit=_DEFAULT_LIMIT, loop=None)
此类是 不是线程安全的 。
limit 参数的默认值设置为 _DEFAULT_LIMIT,即 2**16 (64 KiB)
- exception()
获取异常。
- feed_eof()
确认EOF。
- feed_data(data)
在内部缓冲区中输入 data 字节。 任何等待数据的操作都将恢复。
- set_exception(exc)
设置例外。
- set_transport(transport)
设置传输。
- at_eof()
如果缓冲区为空并且 feed_eof() 被调用,则返回
True
。
18.5.5.3. 流写入器
- class asyncio.StreamWriter(transport, protocol, reader, loop)
包装运输。
这暴露了 write()、writelines()、can_write_eof()、write_eof()、get_extra_info() ] 和 close()。 它添加了
drain()
,它返回一个可选的 Future,您可以在其上等待流量控制。 它还添加了一个直接引用Transport
的传输属性。此类是 不是线程安全的 。
- transport
运输。
- can_write_eof()
如果传输支持 write_eof(),则返回 True,否则返回 False。 参见 WriteTransport.can_write_eof()。
- close()
关闭传输:参见 BaseTransport.close()。
- get_extra_info(name, default=None)
返回可选的传输信息:参见 BaseTransport.get_extra_info()。
- write(data)
将一些 data 字节写入传输:参见 WriteTransport.write()。
- writelines(data)
将数据字节的列表(或任何可迭代的)写入传输:参见 WriteTransport.writelines()。
- write_eof()
刷新缓冲数据后关闭传输的写端:参见 WriteTransport.write_eof()。
18.5.5.4. 流读取器协议
- class asyncio.StreamReaderProtocol(stream_reader, client_connected_cb=None, loop=None)
在 Protocol 和 StreamReader 之间进行调整的简单帮助器类。 协议的子类。
stream_reader 是一个 StreamReader 实例,client_connected_cb 是一个可选函数,在建立连接时使用 (stream_reader, stream_writer) 调用,loop 是要使用的事件循环实例。
(这是一个辅助类,而不是使 StreamReader 本身成为 Protocol 子类,因为 StreamReader 有其他潜在用途,并防止 的用户]StreamReader 意外调用了协议的不当方法。)
18.5.5.5. 不完整读取错误
- exception asyncio.IncompleteReadError
不完整读取错误,EOFError 的子类。
- expected
预期字节总数 (int)。
- partial
在到达流末尾之前读取字节字符串 (bytes)。
18.5.5.6. 限制超限错误
- exception asyncio.LimitOverrunError
- 查找分隔符时达到缓冲区限制。
- consumed
- 要消耗的字节总数。
18.5.5.7. 流示例
18.5.5.7.1。 使用流的 TCP 回显客户端
使用 asyncio.open_connection()
函数的 TCP 回显客户端:
import asyncio
@asyncio.coroutine
def tcp_echo_client(message, loop):
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888,
loop=loop)
print('Send: %r' % message)
writer.write(message.encode())
data = yield from reader.read(100)
print('Received: %r' % data.decode())
print('Close the socket')
writer.close()
message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message, loop))
loop.close()
18.5.5.7.2。 使用流的 TCP 回显服务器
使用 asyncio.start_server()
函数的 TCP 回显服务器:
import asyncio
@asyncio.coroutine
def handle_echo(reader, writer):
data = yield from reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print("Received %r from %r" % (message, addr))
print("Send: %r" % message)
writer.write(data)
yield from writer.drain()
print("Close the client socket")
writer.close()
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
18.5.5.7.3。 获取 HTTP 标头
查询通过命令行传递的 URL 的 HTTP 标头的简单示例:
import asyncio
import urllib.parse
import sys
@asyncio.coroutine
def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
connect = asyncio.open_connection(url.hostname, 443, ssl=True)
else:
connect = asyncio.open_connection(url.hostname, 80)
reader, writer = yield from connect
query = ('HEAD {path} HTTP/1.0\r\n'
'Host: {hostname}\r\n'
'\r\n').format(path=url.path or '/', hostname=url.hostname)
writer.write(query.encode('latin-1'))
while True:
line = yield from reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print('HTTP header> %s' % line)
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(print_http_headers(url))
loop.run_until_complete(task)
loop.close()
用法:
python example.py http://example.com/path/page.html
或使用 HTTPS:
python example.py https://example.com/path/page.html
18.5.5.7.4。 注册一个打开的套接字以使用流等待数据
协程等待套接字使用 open_connection()
函数接收数据:
import asyncio
try:
from socket import socketpair
except ImportError:
from asyncio.windows_utils import socketpair
@asyncio.coroutine
def wait_for_data(loop):
# Create a pair of connected sockets
rsock, wsock = socketpair()
# Register the open socket to wait for data
reader, writer = yield from asyncio.open_connection(sock=rsock, loop=loop)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = yield from reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
# Close the second socket
wsock.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(wait_for_data(loop))
loop.close()
也可以看看
使用协议注册一个打开的套接字以等待数据 示例使用由 AbstractEventLoop.create_connection()
方法创建的低级协议。
watch a file descriptor for read events 示例使用低级 AbstractEventLoop.add_reader() 方法注册套接字的文件描述符。