队列 — Python 文档
来自菜鸟教程
Python/docs/3.9/library/asyncio-queue
队列
源代码: :source:`Lib/asyncio/queues.py`
asyncio 队列被设计为类似于 queue 模块的类。 尽管 asyncio 队列不是线程安全的,但它们被设计为专门用于 async/await 代码。
请注意,异步队列的方法没有 timeout 参数; 使用 asyncio.wait_for()
函数进行超时队列操作。
另请参阅下面的 示例 部分。
队列
- class asyncio.Queue(maxsize=0, \*, loop=None)
先进先出 (FIFO) 队列。
如果 maxsize 小于或等于 0,则队列大小为无限大。 如果它是一个大于
0
的整数,那么当队列达到 maxsize 时,await put()
阻塞,直到一个项目被get()
删除。与标准库线程 queue 不同,队列的大小始终是已知的,可以通过调用 qsize() 方法返回。
此类是 不是线程安全的 。
- maxsize
队列中允许的项目数。
- empty()
如果队列为空,则返回
True
,否则返回False
。
- get_nowait()
如果一项立即可用,则返回一项,否则引发 QueueEmpty。
- put_nowait(item)
将项目放入队列而不阻塞。
如果没有立即可用的空闲插槽,则引发 QueueFull。
- qsize()
返回队列中的项目数。
- task_done()
指示以前排队的任务已完成。
由队列消费者使用。 对于用于获取任务的每个
get()
,对 task_done() 的后续调用告诉队列该任务的处理已完成。如果一个
join()
当前正在阻塞,它会在所有项目都被处理完后恢复(这意味着每个已经被put()
进入的项目都收到了一个 task_done() 调用队列)。如果调用次数多于放入队列的项目,则引发 ValueError。
例外
- exception asyncio.QueueEmpty
- 当在空队列上调用 get_nowait() 方法时会引发此异常。
- exception asyncio.QueueFull
- 在已达到其 maxsize 的队列上调用 put_nowait() 方法时引发异常。
例子
队列可用于在多个并发任务之间分配工作负载:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())