队列 — Python 文档

来自菜鸟教程
Python/docs/3.9/library/asyncio-queue
跳转至:导航、​搜索

队列

源代码: :source:`Lib/asyncio/queues.py`



asyncio 队列被设计为类似于 queue 模块的类。 尽管 asyncio 队列不是线程安全的,但它们被设计为专门用于 async/await 代码。

请注意,异步队列的方法没有 timeout 参数; 使用 asyncio.wait_for() 函数进行超时队列操作。

另请参阅下面的 示例 部分。

队列

class asyncio.Queue(maxsize=0, \*, loop=None)

先进先出 (FIFO) 队列。

如果 maxsize 小于或等于 0,则队列大小为无限大。 如果它是一个大于 0 的整数,那么当队列达到 maxsize 时,await put() 阻塞,直到一个项目被 get() 删除。

与标准库线程 queue 不同,队列的大小始终是已知的,可以通过调用 qsize() 方法返回。

此类是 不是线程安全的

maxsize

队列中允许的项目数。

empty()

如果队列为空,则返回 True,否则返回 False

full()

如果队列中有 maxsize 个项目,则返回 True

如果队列是用 maxsize=0(默认值)初始化的,那么 full() 永远不会返回 True

get_nowait()

如果一项立即可用,则返回一项,否则引发 QueueEmpty

put_nowait(item)

将项目放入队列而不阻塞。

如果没有立即可用的空闲插槽,则引发 QueueFull

qsize()

返回队列中的项目数。

task_done()

指示以前排队的任务已完成。

由队列消费者使用。 对于用于获取任务的每个 get(),对 task_done() 的后续调用告诉队列该任务的处理已完成。

如果一个 join() 当前正在阻塞,它会在所有项目都被处理完后恢复(这意味着每个已经被 put() 进入的项目都收到了一个 task_done() 调用队列)。

如果调用次数多于放入队列的项目,则引发 ValueError


优先队列

class asyncio.PriorityQueue

Queue 的变体; 按优先级顺序检索条目(从低到高)。

条目通常是 (priority_number, data) 形式的元组。


后进先出队列

class asyncio.LifoQueue
Queue 的变体,首先检索最近添加的条目(后进先出)。


例外

exception asyncio.QueueEmpty
当在空队列上调用 get_nowait() 方法时会引发此异常。
exception asyncio.QueueFull
在已达到其 maxsize 的队列上调用 put_nowait() 方法时引发异常。


例子

队列可用于在多个并发任务之间分配工作负载:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())