queue — 同步队列类 — Python 文档

来自菜鸟教程
Python/docs/3.7/library/queue
跳转至:导航、​搜索

queue — 同步队列类

源代码: :source:`Lib/queue.py`



queue 模块实现了多生产者、多消费者队列。 当信息必须在多个线程之间安全交换时,它在线程编程中特别有用。 该模块中的 Queue 类实现了所有必需的锁定语义。 这取决于 Python 中线程支持的可用性; 请参阅 线程 模块。

该模块实现了三种类型的队列,它们仅在检索条目的顺序上有所不同。 在 FIFO 队列中,添加的第一个任务是第一个检索的任务。 在 LIFO 队列中,最近添加的条目是第一个检索到的条目(像堆栈一样操作)。 使用优先级队列,条目保持排序(使用 heapq 模块)并首先检索最低值的条目。

在内部,这三种类型的队列使用锁来临时阻塞竞争线程; 然而,它们并不是为处理线程内的可重入性而设计的。

此外,该模块实现了一个“简单”的FIFO队列类型,SimpleQueue,它的具体实现提供了额外的保证,以换取更小的功能。

queue 模块定义了以下类和异常:

class queue.Queue(maxsize=0)
FIFO 队列的构造函数。 maxsize 是一个整数,用于设置可以放入队列的项目数量的上限。 一旦达到此大小,插入将阻塞,直到队列项被消耗。 如果 maxsize 小于或等于 0,则队列大小为无限大。
class queue.LifoQueue(maxsize=0)
LIFO 队列的构造函数。 maxsize 是一个整数,用于设置可以放入队列的项目数量的上限。 一旦达到此大小,插入将阻塞,直到队列项被消耗。 如果 maxsize 小于或等于 0,则队列大小为无限大。
class queue.PriorityQueue(maxsize=0)

优先队列的构造函数。 maxsize 是一个整数,用于设置可以放入队列的项目数量的上限。 一旦达到此大小,插入将阻塞,直到队列项被消耗。 如果 maxsize 小于或等于 0,则队列大小为无限大。

首先检索最低值的条目(最低值的条目是 sorted(list(entries))[0] 返回的条目)。 条目的典型模式是以下形式的元组:(priority_number, data)

如果 data 元素不可比较,则可以将数据包装在忽略数据项而只比较优先级编号的类中:

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue

无界 FIFO 队列的构造函数。 简单队列缺乏任务跟踪等高级功能。

3.7 版中的新功能。

exception queue.Empty
在空的 Queue 对象上调用非阻塞 get()(或 get_nowait())时引发异常。
exception queue.Full
在已满的 Queue 对象上调用非阻塞 put()(或 put_nowait())时引发异常。

队列对象

队列对象(QueueLifoQueuePriorityQueue)提供下面描述的公共方法。

Queue.qsize()
返回队列的大致大小。 注意, qsize() > 0 不保证后续的 get() 不会阻塞, qsize() < maxsize 也不会保证 put() 不会阻塞。
Queue.empty()
如果队列为空,则返回 True,否则返回 False。 如果 empty() 返回 True 它不能保证对 put() 的后续调用不会阻塞。 类似地,如果 empty() 返回 False,它不能保证对 get() 的后续调用不会阻塞。
Queue.full()
如果队列已满,则返回 True,否则返回 False。 如果 full() 返回 True,则不能保证对 get() 的后续调用不会阻塞。 类似地,如果 full() 返回 False,它不能保证对 put() 的后续调用不会阻塞。
Queue.put(item, block=True, timeout=None)
item 放入队列。 如果可选参数 block 为 true 且 timeoutNone(默认值),则在必要时阻止,直到有空闲插槽可用。 如果 timeout 是一个正数,它最多会阻塞 timeout 秒,如果在这段时间内没有空闲插槽可用,则会引发 Full 异常。 否则(block 为假),如果空闲插槽立即可用,则将项目放入队列,否则引发 Full 异常(timeout 被忽略)案件)。
Queue.put_nowait(item)
相当于 put(item, False)
Queue.get(block=True, timeout=None)

从队列中移除并返回一个项目。 如果可选参数 block 为真且 timeoutNone(默认值),则在必要时阻止,直到项目可用。 如果 timeout 是一个正数,它最多会阻塞 timeout 秒,如果在那段时间内没有可用的项目,则会引发 Empty 异常。 否则(block 为假),如果一个项目立即可用则返回一个项目,否则引发 Empty 异常(timeout 在这种情况下被忽略)。

在 POSIX 系统上的 3.0 之前,对于 Windows 上的所有版本,如果 block 为真且 timeoutNone,则此操作会在底层上进入不间断等待锁。 这意味着不会发生异常,特别是 SIGINT 不会触发 KeyboardInterrupt

Queue.get_nowait()
相当于 get(False)

提供了两种方法来支持跟踪排队任务是否已被守护程序消费者线程完全处理。

Queue.task_done()

指示以前排队的任务已完成。 由队列消费者线程使用。 对于用于获取任务的每个 get(),随后对 task_done() 的调用告诉队列该任务的处理已完成。

如果 join() 当前正在阻塞,它会在所有项目都处理完毕后恢复(这意味着每个已经被 放置的项目都收到了一个 task_done() 调用() 进入队列)。

如果调用次数多于放置在队列中的项目,则引发 ValueError

Queue.join()

阻塞直到队列中的所有项目都被获取和处理。

每当将项目添加到队列时,未完成任务的计数就会增加。 每当消费者线程调用 task_done() 以指示该项目已被检索并且其上的所有工作已完成时,计数就会下降。 当未完成任务的数量降为零时,join() 解除阻塞。

如何等待排队任务完成的示例:

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()

q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for item in source():
    q.put(item)

# block until all tasks are done
q.join()

# stop workers
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

简单队列对象

SimpleQueue 对象提供下面描述的公共方法。

SimpleQueue.qsize()
返回队列的大致大小。 注意, qsize() > 0 并不能保证后续的 get() 不会阻塞。
SimpleQueue.empty()
如果队列为空,则返回 True,否则返回 False。 如果empty() 返回False,它不能保证对get() 的后续调用不会阻塞。
SimpleQueue.put(item, block=True, timeout=None)
item 放入队列。 该方法从不阻塞并且总是成功(除了潜在的低级错误,例如分配内存失败)。 可选参数 blocktimeout 被忽略,仅用于与 Queue.put() 兼容。
SimpleQueue.put_nowait(item)
等效于 put(item),提供与 Queue.put_nowait() 兼容。
SimpleQueue.get(block=True, timeout=None)
从队列中移除并返回一个项目。 如果可选参数 block 为真且 timeoutNone(默认值),则在必要时阻止,直到项目可用。 如果 timeout 是一个正数,它最多会阻塞 timeout 秒,如果在那段时间内没有可用的项目,则会引发 Empty 异常。 否则(block 为假),如果一个项目立即可用则返回一个项目,否则引发 Empty 异常(timeout 在这种情况下被忽略)。
SimpleQueue.get_nowait()
相当于 get(False)

也可以看看

multiprocessing.Queue
用于多处理(而不是多线程)上下文的队列类。

collections.deque 是无界队列的另一种实现,具有不需要锁定的快速原子 append()popleft() 操作。