itertools — 为高效循环创建迭代器的函数 — Python 文档
itertools — 为高效循环创建迭代器的函数
该模块实现了许多受 APL、Haskell 和 SML 构造启发的 iterator 构建块。 每个都以适合 Python 的形式重新编写。
该模块标准化了一组快速、内存高效的核心工具,这些工具可以单独使用或组合使用。 它们共同构成了一个“迭代器代数”,使得在纯 Python 中简洁高效地构建专用工具成为可能。
例如,SML 提供了一个制表工具:tabulate(f)
,它产生一个序列 f(0), f(1), ...
。 在Python中也可以通过将map()和count()组合成map(f, count())
来达到同样的效果。
这些工具及其内置工具也与 operator 模块中的高速功能配合良好。 例如,乘法运算符可以映射到两个向量上以形成有效的点积:sum(map(operator.mul, vector1, vector2))
。
无限迭代器:
迭代器 | 参数 | 结果 | 例子 |
---|---|---|---|
count()
|
开始,[步骤] | 开始,开始+步,开始+2*步,… | count(10) --> 10 11 12 13 14 ...
|
cycle()
|
p | p0, p1, ... plast, p0, p1, … | cycle('ABCD') --> A B C D A B C D ...
|
repeat()
|
元素 [,n] | elem, elem, elem, ... 无休止地或最多 n 次 | repeat(10, 3) --> 10 10 10
|
在最短输入序列上终止的迭代器:
迭代器 | 参数 | 结果 | 例子 |
---|---|---|---|
accumulate()
|
p [,func] | p0, p0+p1, p0+p1+p2, … | accumulate([1,2,3,4,5]) --> 1 3 6 10 15
|
chain()
|
p, q, … | p0, p1, ... plast, q0, q1, … | chain('ABC', 'DEF') --> A B C D E F
|
chain.from_iterable()
|
可迭代的 | p0, p1, ... plast, q0, q1, … | chain.from_iterable(['ABC', 'DEF']) --> A B C D E F
|
compress()
|
数据,选择器 | (d[0] 如果 s[0]), (d[1] 如果 s[1]), … | compress('ABCDEF', [1,0,1,0,1,1]) --> A C E F
|
dropwhile()
|
预测,序列 | seq[n], seq[n+1], 当 pred 失败时开始 | dropwhile(lambda x: x<5, [1,4,6,4,1]) --> 6 4 1
|
filterfalse()
|
预测,序列 | seq 的元素,其中 pred(elem) 为 false | filterfalse(lambda x: x%2, range(10)) --> 0 2 4 6 8
|
groupby()
|
可迭代[,键] | 按 key(v) 的值分组的子迭代器 | |
islice()
|
seq, [开始,] 停止 [, 步骤] | 来自 seq[start:stop:step] 的元素 | islice('ABCDEFG', 2, None) --> C D E F G
|
starmap()
|
功能,序列 | func(*seq[0]), func(*seq[1]), … | starmap(pow, [(2,5), (3,2), (10,3)]) --> 32 9 1000
|
takewhile()
|
预测,序列 | seq[0], seq[1], 直到 pred 失败 | takewhile(lambda x: x<5, [1,4,6,4,1]) --> 1 4
|
tee()
|
它,n | it1, it2, ... itn 将一个迭代器拆分为 n | |
zip_longest()
|
p, q, … | (p[0], q[0]), (p[1], q[1]), … | zip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D-
|
组合迭代器:
迭代器 | 参数 | 结果 |
---|---|---|
product()
|
p, q, ... [重复=1] | 笛卡尔积,相当于嵌套的 for 循环 |
permutations()
|
p[, r] | r-length 元组,所有可能的排序,没有重复的元素 |
combinations()
|
p, r | r 长度的元组,按排序顺序,没有重复的元素 |
combinations_with_replacement()
|
p, r | r 长度元组,按排序顺序,具有重复元素 |
例子 | 结果 |
---|---|
product('ABCD', repeat=2)
|
AA AB AC AD BA BB BC BD CA CB CC CD DA DB DC DD
|
permutations('ABCD', 2)
|
AB AC AD BA BC BD CA CB CD DA DB DC
|
combinations('ABCD', 2)
|
AB AC AD BC BD CD
|
combinations_with_replacement('ABCD', 2)
|
AA AB AC AD BB BC BD CC CD DD
|
迭代工具函数
以下模块函数都构造和返回迭代器。 有些提供无限长度的流,因此它们只能由截断流的函数或循环访问。
- itertools.accumulate(iterable[, func, *, initial=None])
创建一个迭代器,返回累积的总和,或其他二元函数的累积结果(通过可选的 func 参数指定)。
如果提供了 func,它应该是一个有两个参数的函数。 输入 iterable 的元素可以是任何可以作为 func 参数接受的类型。 (例如,对于加法的默认操作,元素可以是任何可加类型,包括 Decimal 或 Fraction。)
通常,元素输出的数量与输入迭代匹配。 但是,如果提供关键字参数 initial,则累积以 initial 值开始,因此输出比输入可迭代元素多一个元素。
大致相当于:
def accumulate(iterable, func=operator.add, *, initial=None): 'Return running totals' # accumulate([1,2,3,4,5]) --> 1 3 6 10 15 # accumulate([1,2,3,4,5], initial=100) --> 100 101 103 106 110 115 # accumulate([1,2,3,4,5], operator.mul) --> 1 2 6 24 120 it = iter(iterable) total = initial if initial is None: try: total = next(it) except StopIteration: return yield total for element in it: total = func(total, element) yield total
func 参数有多种用途。 它可以设置为 min() 用于运行最小值,max() 用于运行最大值,或 operator.mul() 用于运行产品。 可以通过累积利息和应用付款来构建摊销表。 一阶 递归关系 可以通过提供迭代中的初始值并仅使用 func 参数中的累计总数来建模:
>>> data = [3, 4, 6, 2, 1, 9, 0, 7, 5, 8] >>> list(accumulate(data, operator.mul)) # running product [3, 12, 72, 144, 144, 1296, 0, 0, 0, 0] >>> list(accumulate(data, max)) # running maximum [3, 4, 6, 6, 6, 9, 9, 9, 9, 9] # Amortize a 5% loan of 1000 with 4 annual payments of 90 >>> cashflows = [1000, -90, -90, -90, -90] >>> list(accumulate(cashflows, lambda bal, pmt: bal*1.05 + pmt)) [1000, 960.0, 918.0, 873.9000000000001, 827.5950000000001] # Chaotic recurrence relation https://en.wikipedia.org/wiki/Logistic_map >>> logistic_map = lambda x, _: r * x * (1 - x) >>> r = 3.8 >>> x0 = 0.4 >>> inputs = repeat(x0, 36) # only the initial value is used >>> [format(x, '.2f') for x in accumulate(inputs, logistic_map)] ['0.40', '0.91', '0.30', '0.81', '0.60', '0.92', '0.29', '0.79', '0.63', '0.88', '0.39', '0.90', '0.33', '0.84', '0.52', '0.95', '0.18', '0.57', '0.93', '0.25', '0.71', '0.79', '0.63', '0.88', '0.39', '0.91', '0.32', '0.83', '0.54', '0.95', '0.20', '0.60', '0.91', '0.30', '0.80', '0.60']
有关仅返回最终累加值的类似函数,请参阅 functools.reduce()。
3.2 版中的新功能。
3.3 版更改: 增加了可选的 func 参数。
3.8 版更改: 增加了可选的 initial 参数。
- itertools.chain(*iterables)
制作一个迭代器,从第一个迭代器返回元素,直到它耗尽,然后继续下一个迭代器,直到所有迭代器都耗尽。 用于将连续序列视为单个序列。 大致相当于:
def chain(*iterables): # chain('ABC', 'DEF') --> A B C D E F for it in iterables: for element in it: yield element
- classmethod chain.from_iterable(iterable)
chain() 的替代构造函数。 从延迟计算的单个可迭代参数获取链接输入。 大致相当于:
def from_iterable(iterables): # chain.from_iterable(['ABC', 'DEF']) --> A B C D E F for it in iterables: for element in it: yield element
- itertools.combinations(iterable, r)
从输入 iterable 返回元素的 r 长度子序列。
组合元组根据输入 iterable 的顺序以字典顺序发出。 因此,如果输入 iterable 已排序,则组合元组将按排序顺序生成。
元素被视为唯一基于它们的位置,而不是它们的值。 因此,如果输入元素是唯一的,则每个组合中都不会出现重复值。
大致相当于:
def combinations(iterable, r): # combinations('ABCD', 2) --> AB AC AD BC BD CD # combinations(range(4), 3) --> 012 013 023 123 pool = tuple(iterable) n = len(pool) if r > n: return indices = list(range(r)) yield tuple(pool[i] for i in indices) while True: for i in reversed(range(r)): if indices[i] != i + n - r: break else: return indices[i] += 1 for j in range(i+1, r): indices[j] = indices[j-1] + 1 yield tuple(pool[i] for i in indices)
combinations() 的代码也可以表示为 permutations() 的子序列,在过滤元素未按顺序排序的条目后(根据它们在输入池中的位置) :
def combinations(iterable, r): pool = tuple(iterable) n = len(pool) for indices in permutations(range(n), r): if sorted(indices) == list(indices): yield tuple(pool[i] for i in indices)
0 <= r <= n
时返回的项目数为n! / r! / (n-r)!
,r > n
时为0。
- itertools.combinations_with_replacement(iterable, r)
从输入 iterable 返回元素的 r 长度子序列,允许单个元素重复多次。
组合元组根据输入 iterable 的顺序以字典顺序发出。 因此,如果输入 iterable 已排序,则组合元组将按排序顺序生成。
元素被视为唯一基于它们的位置,而不是它们的值。 因此,如果输入元素是唯一的,则生成的组合也将是唯一的。
大致相当于:
def combinations_with_replacement(iterable, r): # combinations_with_replacement('ABC', 2) --> AA AB AC BB BC CC pool = tuple(iterable) n = len(pool) if not n and r: return indices = [0] * r yield tuple(pool[i] for i in indices) while True: for i in reversed(range(r)): if indices[i] != n - 1: break else: return indices[i:] = [indices[i] + 1] * (r - i) yield tuple(pool[i] for i in indices)
combinations_with_replacement() 的代码也可以表示为 product() 过滤条目后的子序列,其中元素未按排序顺序(根据它们在输入池中的位置) :
def combinations_with_replacement(iterable, r): pool = tuple(iterable) n = len(pool) for indices in product(range(n), repeat=r): if sorted(indices) == list(indices): yield tuple(pool[i] for i in indices)
n > 0
时返回的项目数为(n+r-1)! / r! / (n-1)!
。3.1 版中的新功能。
- itertools.compress(data, selectors)
制作一个迭代器,过滤来自 data 的元素,只返回那些在 selectors 中具有相应元素的元素,该元素的计算结果为
True
。 当 data 或 selectors 迭代用完时停止。 大致相当于:def compress(data, selectors): # compress('ABCDEF', [1,0,1,0,1,1]) --> A C E F return (d for d, s in zip(data, selectors) if s)
3.1 版中的新功能。
- itertools.count(start=0, step=1)
制作一个迭代器,返回以数字 start 开头的均匀间隔值。 通常用作 map() 的参数来生成连续的数据点。 此外,与 zip() 一起使用以添加序列号。 大致相当于:
def count(start=0, step=1): # count(10) --> 10 11 12 13 14 ... # count(2.5, 0.5) -> 2.5 3.0 3.5 ... n = start while True: yield n n += step
使用浮点数进行计数时,有时可以通过替换乘法代码来获得更好的精度,例如:
(start + step * i for i in count())
。3.1 版更改: 添加 step 参数并允许非整数参数。
- itertools.cycle(iterable)
使迭代器从可迭代对象返回元素并保存每个元素的副本。 当迭代用完时,从保存的副本中返回元素。 无限重复。 大致相当于:
def cycle(iterable): # cycle('ABCD') --> A B C D A B C D A B C D ... saved = [] for element in iterable: yield element saved.append(element) while saved: for element in saved: yield element
注意,工具包的这个成员可能需要大量的辅助存储(取决于迭代的长度)。
- itertools.dropwhile(predicate, iterable)
制作一个迭代器,只要谓词为真,它就会从可迭代对象中删除元素; 之后,返回每个元素。 请注意,迭代器在谓词第一次变为假之前不会产生 any 输出,因此它可能需要很长的启动时间。 大致相当于:
def dropwhile(predicate, iterable): # dropwhile(lambda x: x<5, [1,4,6,4,1]) --> 6 4 1 iterable = iter(iterable) for x in iterable: if not predicate(x): yield x break for x in iterable: yield x
- itertools.filterfalse(predicate, iterable)
制作一个迭代器,从 iterable 中过滤元素,只返回那些谓词为
False
的元素。 如果 predicate 是None
,则返回为假的项。 大致相当于:def filterfalse(predicate, iterable): # filterfalse(lambda x: x%2, range(10)) --> 0 2 4 6 8 if predicate is None: predicate = bool for x in iterable: if not predicate(x): yield x
- itertools.groupby(iterable, key=None)
制作一个迭代器,从 iterable 返回连续的键和组。 key 是计算每个元素的键值的函数。 如果未指定或为
None
,则 key 默认为恒等函数并返回元素不变。 通常,迭代需要已经在同一个键函数上排序。groupby()的操作类似于Unix中的
uniq
过滤器。 每次键函数的值发生变化时,它都会生成一个中断或新组(这就是为什么通常需要使用相同的键函数对数据进行排序的原因)。 该行为与 SQL 的 GROUP BY 不同,后者聚合公共元素而不管它们的输入顺序。返回的组本身是一个迭代器,它与 groupby() 共享底层可迭代对象。 因为源是共享的,当 groupby() 对象前进时,之前的组不再可见。 因此,如果稍后需要该数据,则应将其存储为列表:
groups = [] uniquekeys = [] data = sorted(data, key=keyfunc) for k, g in groupby(data, keyfunc): groups.append(list(g)) # Store group iterator as a list uniquekeys.append(k)
groupby() 大致相当于:
class groupby: # [k for k, g in groupby('AAAABBBCCDAABBB')] --> A B C D A B # [list(g) for k, g in groupby('AAAABBBCCD')] --> AAAA BBB CC D def __init__(self, iterable, key=None): if key is None: key = lambda x: x self.keyfunc = key self.it = iter(iterable) self.tgtkey = self.currkey = self.currvalue = object() def __iter__(self): return self def __next__(self): self.id = object() while self.currkey == self.tgtkey: self.currvalue = next(self.it) # Exit on StopIteration self.currkey = self.keyfunc(self.currvalue) self.tgtkey = self.currkey return (self.currkey, self._grouper(self.tgtkey, self.id)) def _grouper(self, tgtkey, id): while self.id is id and self.currkey == tgtkey: yield self.currvalue try: self.currvalue = next(self.it) except StopIteration: return self.currkey = self.keyfunc(self.currvalue)
- itertools.islice(iterable, stop)
itertools.islice(iterable, start, stop[, step]) 制作一个迭代器,从可迭代对象中返回选定的元素。 如果 start 非零,则可迭代对象中的元素将被跳过,直到到达 start 为止。 之后,元素将连续返回,除非 step 设置为高于 1 导致项目被跳过。 如果 stop 是
None
,则迭代继续直到迭代器耗尽,如果有的话; 否则,它在指定位置停止。 与常规切片不同,islice() 不支持 start、stop 或 step 的负值。 可用于从内部结构已扁平化的数据中提取相关字段(例如,多行报告可能会在每三行列出一个名称字段)。 大致相当于:def islice(iterable, *args): # islice('ABCDEFG', 2) --> A B # islice('ABCDEFG', 2, 4) --> C D # islice('ABCDEFG', 2, None) --> C D E F G # islice('ABCDEFG', 0, None, 2) --> A C E G s = slice(*args) start, stop, step = s.start or 0, s.stop or sys.maxsize, s.step or 1 it = iter(range(start, stop, step)) try: nexti = next(it) except StopIteration: # Consume *iterable* up to the *start* position. for i, element in zip(range(start), iterable): pass return try: for i, element in enumerate(iterable): if i == nexti: yield element nexti = next(it) except StopIteration: # Consume to *stop*. for i, element in zip(range(i + 1, stop), iterable): pass
如果 start 是
None
,则迭代从零开始。 如果 step 为None
,则 step 默认为 1。
- itertools.permutations(iterable, r=None)
返回 iterable 中元素的连续 r 长度排列。
如果未指定 r 或
None
,则 r 默认为 iterable 的长度,并生成所有可能的全长排列.排列元组根据输入 iterable 的顺序以字典顺序发出。 因此,如果输入 iterable 已排序,则组合元组将按排序顺序生成。
元素被视为唯一基于它们的位置,而不是它们的值。 因此,如果输入元素是唯一的,则每个排列中都不会出现重复值。
大致相当于:
def permutations(iterable, r=None): # permutations('ABCD', 2) --> AB AC AD BA BC BD CA CB CD DA DB DC # permutations(range(3)) --> 012 021 102 120 201 210 pool = tuple(iterable) n = len(pool) r = n if r is None else r if r > n: return indices = list(range(n)) cycles = list(range(n, n-r, -1)) yield tuple(pool[i] for i in indices[:r]) while n: for i in reversed(range(r)): cycles[i] -= 1 if cycles[i] == 0: indices[i:] = indices[i+1:] + indices[i:i+1] cycles[i] = n - i else: j = cycles[i] indices[i], indices[-j] = indices[-j], indices[i] yield tuple(pool[i] for i in indices[:r]) break else: return
permutations() 的代码也可以表示为 product() 的子序列,过滤以排除具有重复元素的条目(来自输入池中相同位置的条目):
def permutations(iterable, r=None): pool = tuple(iterable) n = len(pool) r = n if r is None else r for indices in product(range(n), repeat=r): if len(set(indices)) == r: yield tuple(pool[i] for i in indices)
0 <= r <= n
时返回的项目数为n! / (n-r)!
,r > n
时为0。
- itertools.product(*iterables, repeat=1)
输入迭代的笛卡尔积。
大致相当于生成器表达式中的嵌套 for 循环。 例如,
product(A, B)
返回与((x,y) for x in A for y in B)
相同。嵌套循环就像里程表一样循环,最右边的元素在每次迭代中前进。 此模式创建字典顺序,以便如果输入的可迭代对象已排序,则产品元组按排序顺序发出。
要计算可迭代对象与自身的乘积,请使用可选的 repeat 关键字参数指定重复次数。 例如,
product(A, repeat=4)
与product(A, A, A, A)
的含义相同。这个函数大致相当于下面的代码,只不过实际的实现不会在内存中建立中间结果:
def product(*args, repeat=1): # product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy # product(range(2), repeat=3) --> 000 001 010 011 100 101 110 111 pools = [tuple(pool) for pool in args] * repeat result = [[../]] for pool in pools: result = [x+[y] for x in result for y in pool] for prod in result: yield tuple(prod)
在 product() 运行之前,它会完全消耗输入的可迭代对象,将值池保存在内存中以生成产品。 因此,它仅对有限输入有用。
- itertools.repeat(object[, times])
制作一个迭代器,一遍又一遍地返回 object。 无限期运行,除非指定了 times 参数。 用作 map() 的参数,用于被调用函数的不变参数。 还与 zip() 一起使用来创建元组记录的不变部分。
大致相当于:
def repeat(object, times=None): # repeat(10, 3) --> 10 10 10 if times is None: while True: yield object else: for i in range(times): yield object
repeat 的常见用途是向 map 或 zip 提供常量值流:
>>> list(map(pow, range(10), repeat(2))) [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
- itertools.starmap(function, iterable)
制作一个迭代器,使用从可迭代对象获得的参数来计算函数。 当参数参数已经从单个可迭代对象分组到元组中时(数据已经“预压缩”),用于代替 map()。 map() 和 starmap() 之间的区别与
function(a,b)
和function(*c)
之间的区别相似。 大致相当于:def starmap(function, iterable): # starmap(pow, [(2,5), (3,2), (10,3)]) --> 32 9 1000 for args in iterable: yield function(*args)
- itertools.takewhile(predicate, iterable)
制作一个迭代器,只要谓词为真,它就会从可迭代对象中返回元素。 大致相当于:
def takewhile(predicate, iterable): # takewhile(lambda x: x<5, [1,4,6,4,1]) --> 1 4 for x in iterable: if predicate(x): yield x else: break
- itertools.tee(iterable, n=2)
从单个迭代器返回 n 个独立迭代器。
以下 Python 代码有助于解释 tee 的作用(尽管实际实现更复杂,并且仅使用单个底层 FIFO 队列)。
大致相当于:
def tee(iterable, n=2): it = iter(iterable) deques = [collections.deque() for i in range(n)] def gen(mydeque): while True: if not mydeque: # when the local deque is empty try: newval = next(it) # fetch a new value and except StopIteration: return for d in deques: # load it to all the deques d.append(newval) yield mydeque.popleft() return tuple(gen(d) for d in deques)
一旦 tee() 进行了拆分,则不应在其他任何地方使用原始的 iterable; 否则,iterable 可以在不通知 tee 对象的情况下提前。
tee
迭代器不是线程安全的。 当同时使用由相同的 tee() 调用返回的迭代器时,可能会引发 RuntimeError,即使原始的 iterable 是线程安全的。这个 itertool 可能需要大量的辅助存储(取决于需要存储多少临时数据)。 通常,如果一个迭代器在另一个迭代器启动之前使用了大部分或全部数据,则使用 list() 而不是 tee() 会更快。
- itertools.zip_longest(*iterables, fillvalue=None)
制作一个迭代器,聚合来自每个可迭代对象的元素。 如果可迭代对象的长度不均匀,则用 fillvalue 填充缺失值。 迭代一直持续到最长的迭代用完为止。 大致相当于:
def zip_longest(*args, fillvalue=None): # zip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D- iterators = [iter(it) for it in args] num_active = len(iterators) if not num_active: return while True: values = [] for i, it in enumerate(iterators): try: value = next(it) except StopIteration: num_active -= 1 if not num_active: return iterators[i] = repeat(fillvalue) value = fillvalue values.append(value) yield tuple(values)
如果可迭代对象之一可能是无限的,那么 zip_longest() 函数应该用限制调用次数的东西包装(例如 islice() 或 takewhile( ))。 如果未指定,fillvalue 默认为
None
。
Itertools 食谱
本节展示了使用现有 itertools 作为构建块创建扩展工具集的方法。
基本上所有这些配方以及许多其他配方都可以从 Python 包索引上的 more-itertools 项目 中安装:
pip install more-itertools
扩展工具提供与底层工具集相同的高性能。 一次处理一个元素,而不是一次将整个迭代器全部放入内存中,从而保持了卓越的内存性能。 通过以有助于消除临时变量的功能风格将工具链接在一起,代码量保持较小。 通过优先使用“矢量化”构建块而不是使用 for 循环和 生成器 来保持高速,这会导致解释器开销。
def take(n, iterable):
"Return first n items of the iterable as a list"
return list(islice(iterable, n))
def prepend(value, iterator):
"Prepend a single value in front of an iterator"
# prepend(1, [2, 3, 4]) -> 1 2 3 4
return chain([value], iterator)
def tabulate(function, start=0):
"Return function(0), function(1), ..."
return map(function, count(start))
def tail(n, iterable):
"Return an iterator over the last n items"
# tail(3, 'ABCDEFG') --> E F G
return iter(collections.deque(iterable, maxlen=n))
def consume(iterator, n=None):
"Advance the iterator n-steps ahead. If n is None, consume entirely."
# Use functions that consume iterators at C speed.
if n is None:
# feed the entire iterator into a zero-length deque
collections.deque(iterator, maxlen=0)
else:
# advance to the empty slice starting at position n
next(islice(iterator, n, n), None)
def nth(iterable, n, default=None):
"Returns the nth item or a default value"
return next(islice(iterable, n, None), default)
def all_equal(iterable):
"Returns True if all the elements are equal to each other"
g = groupby(iterable)
return next(g, True) and not next(g, False)
def quantify(iterable, pred=bool):
"Count how many times the predicate is true"
return sum(map(pred, iterable))
def pad_none(iterable):
"""Returns the sequence elements and then returns None indefinitely.
Useful for emulating the behavior of the built-in map() function.
"""
return chain(iterable, repeat(None))
def ncycles(iterable, n):
"Returns the sequence elements n times"
return chain.from_iterable(repeat(tuple(iterable), n))
def dotproduct(vec1, vec2):
return sum(map(operator.mul, vec1, vec2))
def convolve(signal, kernel):
# See: https://betterexplained.com/articles/intuitive-convolution/
# convolve(data, [0.25, 0.25, 0.25, 0.25]) --> Moving average (blur)
# convolve(data, [1, -1]) --> 1st finite difference (1st derivative)
# convolve(data, [1, -2, 1]) --> 2nd finite difference (2nd derivative)
kernel = tuple(kernel)[::-1]
n = len(kernel)
window = collections.deque([0], maxlen=n) * n
for x in chain(signal, repeat(0, n-1)):
window.append(x)
yield sum(map(operator.mul, kernel, window))
def flatten(list_of_lists):
"Flatten one level of nesting"
return chain.from_iterable(list_of_lists)
def repeatfunc(func, times=None, *args):
"""Repeat calls to func with specified arguments.
Example: repeatfunc(random.random)
"""
if times is None:
return starmap(func, repeat(args))
return starmap(func, repeat(args, times))
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = tee(iterable)
next(b, None)
return zip(a, b)
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def roundrobin(*iterables):
"roundrobin('ABC', 'D', 'EF') --> A D E B F C"
# Recipe credited to George Sakkis
num_active = len(iterables)
nexts = cycle(iter(it).__next__ for it in iterables)
while num_active:
try:
for next in nexts:
yield next()
except StopIteration:
# Remove the iterator we just exhausted from the cycle.
num_active -= 1
nexts = cycle(islice(nexts, num_active))
def partition(pred, iterable):
"Use a predicate to partition entries into false entries and true entries"
# partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9
t1, t2 = tee(iterable)
return filterfalse(pred, t1), filter(pred, t2)
def powerset(iterable):
"powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"
s = list(iterable)
return chain.from_iterable(combinations(s, r) for r in range(len(s)+1))
def unique_everseen(iterable, key=None):
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def unique_justseen(iterable, key=None):
"List unique elements, preserving order. Remember only the element just seen."
# unique_justseen('AAAABBBCCDAABBB') --> A B C D A B
# unique_justseen('ABBCcAD', str.lower) --> A B C A D
return map(next, map(operator.itemgetter(1), groupby(iterable, key)))
def iter_except(func, exception, first=None):
""" Call a function repeatedly until an exception is raised.
Converts a call-until-exception interface to an iterator interface.
Like builtins.iter(func, sentinel) but uses an exception instead
of a sentinel to end the loop.
Examples:
iter_except(functools.partial(heappop, h), IndexError) # priority queue iterator
iter_except(d.popitem, KeyError) # non-blocking dict iterator
iter_except(d.popleft, IndexError) # non-blocking deque iterator
iter_except(q.get_nowait, Queue.Empty) # loop over a producer Queue
iter_except(s.pop, KeyError) # non-blocking set iterator
"""
try:
if first is not None:
yield first() # For database APIs needing an initial cast to db.first()
while True:
yield func()
except exception:
pass
def first_true(iterable, default=False, pred=None):
"""Returns the first true value in the iterable.
If no true value is found, returns *default*
If *pred* is not None, returns the first item
for which pred(item) is true.
"""
# first_true([a,b,c], x) --> a or b or c or x
# first_true([a,b], x, f) --> a if f(a) else b if f(b) else x
return next(filter(pred, iterable), default)
def random_product(*args, repeat=1):
"Random selection from itertools.product(*args, **kwds)"
pools = [tuple(pool) for pool in args] * repeat
return tuple(map(random.choice, pools))
def random_permutation(iterable, r=None):
"Random selection from itertools.permutations(iterable, r)"
pool = tuple(iterable)
r = len(pool) if r is None else r
return tuple(random.sample(pool, r))
def random_combination(iterable, r):
"Random selection from itertools.combinations(iterable, r)"
pool = tuple(iterable)
n = len(pool)
indices = sorted(random.sample(range(n), r))
return tuple(pool[i] for i in indices)
def random_combination_with_replacement(iterable, r):
"Random selection from itertools.combinations_with_replacement(iterable, r)"
pool = tuple(iterable)
n = len(pool)
indices = sorted(random.choices(range(n), k=r))
return tuple(pool[i] for i in indices)
def nth_combination(iterable, r, index):
"Equivalent to list(combinations(iterable, r))[index]"
pool = tuple(iterable)
n = len(pool)
if r < 0 or r > n:
raise ValueError
c = 1
k = min(r, n-r)
for i in range(1, k+1):
c = c * (n - k + i) // i
if index < 0:
index += c
if index < 0 or index >= c:
raise IndexError
result = []
while r:
c, n, r = c*r//n, n-1, r-1
while index >= c:
index -= c
c, n = c*(n-r)//n, n-1
result.append(pool[-1-n])
return tuple(result)