流 — Python 文档
流
流是用于处理网络连接的高级异步/等待就绪原语。 流允许在不使用回调或低级协议和传输的情况下发送和接收数据。
下面是一个使用 asyncio 流编写的 TCP 回显客户端的示例:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
另请参阅下面的 示例 部分。
流函数
以下顶级异步函数可用于创建和处理流:
Unix 套接字
流阅读器
- class asyncio.StreamReader
表示提供 API 以从 IO 流读取数据的读取器对象。
不建议直接实例化StreamReader对象; 改用
open_connection()
和start_server()
。- at_eof()
如果缓冲区为空并且
feed_eof()
被调用,则返回True
。
流写入器
- class asyncio.StreamWriter
表示提供 API 以将数据写入 IO 流的编写器对象。
不建议直接实例化StreamWriter对象; 改用
open_connection()
和start_server()
。- can_write_eof()
如果底层传输支持 write_eof() 方法,则返回
True
,否则返回False
。
- write_eof()
缓冲的写入数据刷新后关闭流的写入端。
- transport
返回底层异步传输。
- get_extra_info(name, default=None)
访问可选的交通信息; 有关详细信息,请参阅 BaseTransport.get_extra_info()。
- write(data)
将 data 写入流。
此方法不受流量控制。 对
write()
的调用应该跟在drain()
之后。
- writelines(data)
将字节列表(或任何可迭代的)写入流。
此方法不受流量控制。 对
writelines()
的调用应该跟在drain()
之后。
- close()
关闭流。
- is_closing()
如果流已关闭或正在关闭,则返回
True
。3.7 版中的新功能。
例子
使用流的 TCP 回显客户端
使用 asyncio.open_connection()
函数的 TCP 回显客户端:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
asyncio.run(tcp_echo_client('Hello World!'))
使用流的 TCP 回显服务器
使用 asyncio.start_server()
函数的 TCP 回显服务器:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
asyncio.run(main())
获取 HTTP 标头
查询通过命令行传递的 URL 的 HTTP 标头的简单示例:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
用法:
python example.py http://example.com/path/page.html
或使用 HTTPS:
python example.py https://example.com/path/page.html
注册一个打开的套接字以使用流等待数据
协程等待套接字使用 open_connection()
函数接收数据:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
也可以看看
注册一个打开的套接字以使用协议等待数据示例使用低级协议和loop.create_connection()
方法。
watch a file descriptor for read events 示例使用低级 loop.add_reader() 方法来监视文件描述符。