8.5. heapq — 堆队列算法 — Python 文档
8.5. 堆 — 堆队列算法
该模块提供了堆队列算法的实现,也称为优先队列算法。
堆是二叉树,其每个父节点的值都小于或等于其任何子节点。 此实现使用数组,其中 heap[k] <= heap[2*k+1]
和 heap[k] <= heap[2*k+2]
用于所有 k,从零开始计算元素。 为了比较,不存在的元素被认为是无限的。 堆的有趣特性是它的最小元素总是根,heap[0]
。
下面的 API 在两个方面不同于教科书的堆算法: (a) 我们使用从零开始的索引。 这使得节点的索引与其子节点的索引之间的关系稍微不那么明显,但更合适,因为 Python 使用从零开始的索引。 (b) 我们的 pop 方法返回最小的项目,而不是最大的项目(在教科书中称为“最小堆”;“最大堆”在文本中更常见,因为它适用于就地排序)。
这两个使得可以毫无意外地将堆视为常规 Python 列表:heap[0]
是最小的项,而 heap.sort()
保持堆不变!
要创建堆,请使用初始化为 []
的列表,或者您可以通过函数 heapify() 将填充列表转换为堆。
提供以下功能:
- heapq.heappush(heap, item)
- 将值 item 推送到 heap,保持堆不变。
- heapq.heappop(heap)
- 从 堆 弹出并返回最小的项目,保持堆不变。 如果堆为空,则会引发 IndexError。 要访问最小的项目而不弹出它,请使用
heap[0]
。
- heapq.heappushpop(heap, item)
- 将项推入堆,然后从堆中弹出并返回最小的项。 组合操作比 heappush() 更有效地运行,然后单独调用 heappop()。
- heapq.heapify(x)
- 将列表 x 在线性时间内就地转换为堆。
- heapq.heapreplace(heap, item)
从堆中弹出并返回最小的项,同时推送新的项。 堆大小不会改变。 如果堆为空,则会引发 IndexError。
这一步操作比 heappop() 后接 heappush() 更有效,并且在使用固定大小的堆时更合适。 pop/push 组合总是从堆中返回一个元素并将其替换为 item。
返回的值可能大于添加的 项 。 如果不需要,请考虑使用 heappushpop() 代替。 它的 push/pop 组合返回两个值中较小的一个,将较大的值留在堆上。
该模块还提供了三个基于堆的通用函数。
- heapq.merge(*iterables, key=None, reverse=False)
将多个排序的输入合并为一个排序的输出(例如,合并来自多个日志文件的带时间戳的条目)。 在排序后的值上返回一个 迭代器 。
与
sorted(itertools.chain(*iterables))
类似,但返回一个可迭代对象,不会一次将数据全部拉入内存,并假设每个输入流都已经排序(从小到大)。有两个可选参数,必须指定为关键字参数。
key 指定一个参数的 key 函数 ,用于从每个输入元素中提取比较键。 默认值为
None
(直接比较元素)。reverse 是一个布尔值。 如果设置为
True
,则输入元素将被合并,就像每个比较都被反转一样。3.5 版本更改: 增加了可选的 key 和 reverse 参数。
- heapq.nlargest(n, iterable, key=None)
- 从 iterable 定义的数据集中返回一个包含 n 个最大元素的列表。 key,如果提供,指定一个参数的函数,用于从 iterable 中的每个元素提取比较键(例如,
key=str.lower
)。 相当于:sorted(iterable, key=key, reverse=True)[:n]
。
- heapq.nsmallest(n, iterable, key=None)
- 从 iterable 定义的数据集中返回一个包含 n 个最小元素的列表。 key,如果提供,指定一个参数的函数,用于从 iterable 中的每个元素提取比较键(例如,
key=str.lower
)。 相当于:sorted(iterable, key=key)[:n]
。
后两个函数在 n 的较小值下表现最佳。 对于较大的值,使用 sorted() 函数更有效。 此外,当 n==1
时,使用内置的 min() 和 max() 函数效率更高。 如果需要重复使用这些函数,请考虑将可迭代对象转换为实际的堆。
8.5.1. 基本示例
heapsort 可以通过将所有值推送到堆上,然后一次弹出一个最小值来实现:
>>> def heapsort(iterable):
... h = []
... for value in iterable:
... heappush(h, value)
... return [heappop(h) for i in range(len(h))]
...
>>> heapsort([1, 3, 5, 7, 9, 2, 4, 6, 8, 0])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
这与 sorted(iterable)
类似,但与 sorted() 不同的是,此实现并不稳定。
堆元素可以是元组。 这对于在被跟踪的主记录旁边分配比较值(例如任务优先级)很有用:
>>> h = []
>>> heappush(h, (5, 'write code'))
>>> heappush(h, (7, 'release product'))
>>> heappush(h, (1, 'write spec'))
>>> heappush(h, (3, 'create tests'))
>>> heappop(h)
(1, 'write spec')
8.5.2. 优先队列实施注意事项
优先级队列 是堆的常见用途,它提出了几个实现挑战:
- 排序稳定性:如何让两个优先级相同的任务按照最初添加的顺序返回?
- 如果优先级相等并且任务没有默认的比较顺序,则元组比较会中断(优先级,任务)对。
- 如果任务的优先级发生变化,如何将其移动到堆中的新位置?
- 或者如果一个挂起的任务需要删除,你如何找到它并将其从队列中删除?
前两个挑战的解决方案是将条目存储为 3 元素列表,包括优先级、条目计数和任务。 条目计数用作决胜局,以便具有相同优先级的两个任务按添加顺序返回。 并且由于没有两个条目计数相同,元组比较永远不会尝试直接比较两个任务。
剩下的挑战围绕着找到一个待处理的任务并更改其优先级或完全删除它。 可以使用指向队列中条目的字典来查找任务。
删除条目或更改其优先级更加困难,因为它会破坏堆结构不变量。 因此,一个可能的解决方案是将条目标记为已删除并添加一个具有修改后的优先级的新条目:
pq = [] # list of entries arranged in a heap
entry_finder = {} # mapping of tasks to entries
REMOVED = '<removed-task>' # placeholder for a removed task
counter = itertools.count() # unique sequence count
def add_task(task, priority=0):
'Add a new task or update the priority of an existing task'
if task in entry_finder:
remove_task(task)
count = next(counter)
entry = [priority, count, task]
entry_finder[task] = entry
heappush(pq, entry)
def remove_task(task):
'Mark an existing task as REMOVED. Raise KeyError if not found.'
entry = entry_finder.pop(task)
entry[-1] = REMOVED
def pop_task():
'Remove and return the lowest priority task. Raise KeyError if empty.'
while pq:
priority, count, task = heappop(pq)
if task is not REMOVED:
del entry_finder[task]
return task
raise KeyError('pop from an empty priority queue')
8.5.3. 理论
堆是数组,其中 a[k] <= a[2*k+1]
和 a[k] <= a[2*k+2]
用于所有 k,从 0 开始计数元素。 为了比较,不存在的元素被认为是无限的。 堆的有趣特性是 a[0]
总是它的最小元素。
上面奇怪的不变量旨在成为锦标赛的有效记忆表示。 下面的数字是 k,而不是 a[k]
:
0
1 2
3 4 5 6
7 8 9 10 11 12 13 14
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
在上面的树中,每个单元格 k 位于 2*k+1
和 2*k+2
的顶部。 在我们在体育运动中看到的通常的二元锦标赛中,每个单元格都是它顶部的两个单元格的获胜者,我们可以沿着树追踪获胜者以查看他/她的所有对手。 然而,在此类锦标赛的许多计算机应用程序中,我们不需要追踪获胜者的历史。 为了提高内存效率,当获胜者被提升时,我们尝试用较低级别的其他东西替换它,规则变成一个单元格和它顶部的两个单元格包含三个不同的项目,但顶部单元格“获胜”在两个顶部的单元格上。
如果此堆不变量始终受到保护,则索引 0 显然是总体赢家。 删除它并找到“下一个”赢家的最简单算法方法是将一些输家(假设上图中的单元格 30)移动到 0 位置,然后将这个新的 0 沿树向下渗透,交换值,直到不变量被重新建立。 这显然是树中项目总数的对数。 通过迭代所有项目,你得到一个 O(n log n) 排序。
这种排序的一个很好的功能是,您可以在排序进行时有效地插入新项目,前提是插入的项目不比您提取的最后一个第 0 元素“更好”。 这在模拟上下文中特别有用,其中树包含所有传入事件,并且“获胜”条件意味着最小的预定时间。 当一个事件调度其他事件执行时,它们被调度到未来,所以它们可以很容易地进入堆。 因此,堆是实现调度程序的良好结构(这是我用于 MIDI 音序器的 :-)。
已经广泛研究了用于实现调度器的各种结构,堆对此有好处,因为它们相当快,速度几乎恒定,最坏情况与平均情况没有太大区别。 但是,还有其他表示总体上更有效,但最坏的情况可能会很糟糕。
堆在大磁盘排序中也非常有用。 你很可能都知道大排序意味着产生“运行”(这是预先排序的序列,其大小通常与 CPU 内存量有关),然后是这些运行的合并通道,这种合并通常非常巧妙有组织的 1。 初始排序产生尽可能长的运行是非常重要的。 锦标赛是实现这一目标的好方法。 如果使用所有可用内存来举办锦标赛,您替换和渗透恰好适合当前运行的项目,您将产生两倍于随机输入内存大小的运行,并且对于模糊排序的输入要好得多。
此外,如果您在磁盘上输出第 0 个项目并获得一个可能不适合当前锦标赛的输入(因为该值“胜”于最后一个输出值),则它无法放入堆中,因此堆减少。 释放的内存可以立即巧妙地重用,以逐步构建第二个堆,其增长速度与第一个堆融化的速度完全相同。 当第一个堆完全消失时,您切换堆并开始新的运行。 聪明而且相当有效!
总之,堆是需要了解的有用内存结构。 我在一些应用程序中使用它们,我认为保留一个“堆”模块很好。 :-)
脚注
- 1
- 现在的磁盘平衡算法比智能更烦人,这是磁盘搜索能力的结果。 在无法搜索的设备上,如大型磁带驱动器,情况就大不相同了,必须非常聪明地确保(提前)每个磁带移动都是最有效的(即,将最好地参与“进行”合并)。 有些磁带甚至可以倒读,这也是为了避免倒带时间。 相信我,真正好的磁带种类非常值得观看! 从古至今,排序一直是一门伟大的艺术! :-)