17.7. queue — 同步队列类 — Python 文档

来自菜鸟教程
Python/docs/3.6/library/queue
跳转至:导航、​搜索

17.7. 队列 — 一个同步队列类

源代码: :source:`Lib/queue.py`



queue 模块实现了多生产者、多消费者队列。 当信息必须在多个线程之间安全交换时,它在线程编程中特别有用。 该模块中的 Queue 类实现了所有必需的锁定语义。 这取决于 Python 中线程支持的可用性; 请参阅 线程 模块。

该模块实现了三种类型的队列,它们仅在检索条目的顺序上有所不同。 在 FIFO 队列中,添加的第一个任务是第一个检索的任务。 在 LIFO 队列中,最近添加的条目是第一个检索到的条目(像堆栈一样操作)。 使用优先级队列,条目保持排序(使用 heapq 模块)并首先检索最低值的条目。

在内部,模块使用锁来临时阻塞竞争线程; 然而,它并不是为处理线程内的可重入性而设计的。

queue 模块定义了以下类和异常:

class queue.Queue(maxsize=0)
FIFO 队列的构造函数。 maxsize 是一个整数,用于设置可以放入队列的项目数量的上限。 一旦达到此大小,插入将阻塞,直到队列项被消耗。 如果 maxsize 小于或等于 0,则队列大小为无限大。
class queue.LifoQueue(maxsize=0)
LIFO 队列的构造函数。 maxsize 是一个整数,用于设置可以放入队列的项目数量的上限。 一旦达到此大小,插入将阻塞,直到队列项被消耗。 如果 maxsize 小于或等于 0,则队列大小为无限大。
class queue.PriorityQueue(maxsize=0)

优先队列的构造函数。 maxsize 是一个整数,用于设置可以放入队列的项目数量的上限。 一旦达到此大小,插入将阻塞,直到队列项被消耗。 如果 maxsize 小于或等于 0,则队列大小为无限大。

首先检索最低值的条目(最低值的条目是 sorted(list(entries))[0] 返回的条目)。 条目的典型模式是以下形式的元组:(priority_number, data)

exception queue.Empty
在空的 Queue 对象上调用非阻塞 get()(或 get_nowait())时引发异常。
exception queue.Full
在已满的 Queue 对象上调用非阻塞 put()(或 put_nowait())时引发异常。

17.7.1. 队列对象

队列对象(QueueLifoQueuePriorityQueue)提供下面描述的公共方法。

Queue.qsize()
返回队列的大致大小。 注意, qsize() > 0 不保证后续的 get() 不会阻塞, qsize() < maxsize 也不会保证 put() 不会阻塞。
Queue.empty()
如果队列为空,则返回 True,否则返回 False。 如果 empty() 返回 True 它不能保证对 put() 的后续调用不会阻塞。 类似地,如果 empty() 返回 False,它不能保证对 get() 的后续调用不会阻塞。
Queue.full()
如果队列已满,则返回 True,否则返回 False。 如果 full() 返回 True,则不能保证对 get() 的后续调用不会阻塞。 类似地,如果 full() 返回 False,它不能保证对 put() 的后续调用不会阻塞。
Queue.put(item, block=True, timeout=None)
item 放入队列。 如果可选参数 block 为真且 timeoutNone(默认值),则在必要时阻止,直到有空闲插槽可用。 如果 timeout 是一个正数,它最多会阻塞 timeout 秒,如果在这段时间内没有空闲插槽可用,则会引发 Full 异常。 否则(block 为假),如果空闲插槽立即可用,则将项目放入队列,否则引发 Full 异常(timeout 被忽略)案件)。
Queue.put_nowait(item)
相当于 put(item, False)
Queue.get(block=True, timeout=None)
从队列中移除并返回一个项目。 如果可选参数 block 为真且 timeoutNone(默认值),则在必要时阻止,直到项目可用。 如果 timeout 是一个正数,它最多会阻塞 timeout 秒,如果在那段时间内没有可用的项目,则会引发 Empty 异常。 否则(block 为假),如果一个项目立即可用则返回一个项目,否则引发 Empty 异常(timeout 在这种情况下被忽略)。
Queue.get_nowait()
相当于 get(False)

提供了两种方法来支持跟踪排队任务是否已被守护程序消费者线程完全处理。

Queue.task_done()

指示以前排队的任务已完成。 由队列消费者线程使用。 对于用于获取任务的每个 get(),随后对 task_done() 的调用告诉队列该任务的处理已完成。

如果 join() 当前正在阻塞,它会在所有项目都处理完毕后恢复(意味着每个已经被 放置的项目都收到了一个 task_done() 调用() 进入队列)。

如果调用次数多于放入队列的项目,则引发 ValueError

Queue.join()

阻塞直到队列中的所有项目都被获取和处理。

每当将项目添加到队列时,未完成任务的计数就会增加。 每当消费者线程调用 task_done() 以指示该项目已被检索并且其上的所有工作已完成时,计数就会下降。 当未完成任务的数量降为零时,join() 解除阻塞。

如何等待排队任务完成的示例:

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()

q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for item in source():
    q.put(item)

# block until all tasks are done
q.join()

# stop workers
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

也可以看看

multiprocessing.Queue
用于多处理(而不是多线程)上下文的队列类。

collections.deque 是无界队列的另一种实现,具有不需要锁定的快速原子 append()popleft() 操作。