concurrent.futures — 启动并行任务 — Python 文档

来自菜鸟教程
Python/docs/3.10/library/concurrent.futures
跳转至:导航、​搜索

concurrent.futures — 启动并行任务

3.2 版中的新功能。


源代码: :source:`Lib/concurrent/futures/thread.py` and :source:`Lib/concurrent/futures/process.py`



concurrent.futures 模块为异步执行可调用对象提供了一个高级接口。

异步执行可以使用线程执行,使用 ThreadPoolExecutor,或单独的进程,使用 ProcessPoolExecutor。 两者都实现了相同的接口,该接口由抽象的 Executor 类定义。

执行器对象

class concurrent.futures.Executor

一个抽象类,提供异步执行调用的方法。 它不应直接使用,而应通过其具体的子类使用。

submit(fn, /, *args, **kwargs)

安排可调用对象 fnfn(*args **kwargs) 的形式执行,并返回一个表示可调用对象执行的 Future 对象。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(func, *iterables, timeout=None, chunksize=1)

类似于 map(func, *iterables) 除了:

  • iterables 立即收集而不是懒惰;

  • func 是异步执行的,并且可以同时调用多个 func

如果调用了 __next__() 并且在 timeout 秒后结果不可用,则返回的迭代器会引发 concurrent.futures.TimeoutError X193X]Executor.map()。 timeout 可以是 int 或 float。 如果未指定 timeoutNone,则等待时间没有限制。

如果 func 调用引发异常,则在从迭代器检索其值时将引发该异常。

当使用 ProcessPoolExecutor 时,这个方法将 iterables 分成许多块,作为单独的任务提交到池中。 这些块的(近似)大小可以通过将 chunksize 设置为正整数来指定。 对于非常长的可迭代对象,与默认大小 1 相比,使用较大的 chunksize 值可以显着提高性能。 使用 ThreadPoolExecutorchunksize 无效。

3.5 版更改: 添加 chunksize 参数。

shutdown(wait=True, *, cancel_futures=False)

当当前挂起的期货完成执行时,通知执行器它应该释放它正在使用的任何资源。 关闭后调用 Executor.submit()Executor.map() 将引发 RuntimeError

如果 waitTrue 则此方法将不会返回,直到所有挂起的期货都执行完毕并且与执行程序关联的资源已被释放。 如果 waitFalse 则此方法将立即返回,并且当所有挂起的期货执行完毕后,与执行器关联的资源将被释放。 无论 wait 的值如何,在所有挂起的期货执行完毕之前,整个 Python 程序都不会退出。

如果 cancel_futuresTrue,则此方法将取消所有执行者尚未开始运行的挂起期货。 无论 cancel_futures 的值如何,任何已完成或正在运行的期货都不会被取消。

如果 cancel_futureswait 都是 True,则执行器已开始运行的所有期货将在此方法返回之前完成。 剩余的期货被取消。

如果您使用 with 语句,您可以避免显式调用此方法,这将关闭 Executor(等待好像 Executor.shutdown() 被调用wait 设置为 True):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

3.9 版变更: 添加 cancel_futures



线程池执行器

ThreadPoolExecutor 是一个 Executor 子类,它使用线程池异步执行调用。

当与 Future 关联的可调用对象等待另一个 Future 的结果时,可能会发生死锁。 例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

和:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=, initializer=None, initargs=())

Executor 子类,最多使用 max_workers 个线程池异步执行调用。

initializer 是一个可选的可调用对象,在每个工作线程开始时调用; initargs 是传递给初始化程序的参数元组。 如果 initializer 引发异常,所有当前挂起的作业将引发 BrokenThreadPool,以及任何向池提交更多作业的尝试。

3.5版本变更:如果max_workersNone或不给,则默认为机器的处理器数,乘以5 , 假设 ThreadPoolExecutor 经常用于重叠 I/O 而不是 CPU 工作,并且工作人员的数量应该高于 ProcessPoolExecutor 的工作人员数量。

3.6 版新功能: 添加了 thread_name_prefix 参数以允许用户控制池创建的工作线程的 threading.Thread 名称,以便于调试。

3.7 版更改: 添加了 初始化器initargs 参数。

3.8 版本变更: max_workers 的默认值更改为 min(32, os.cpu_count() + 4)。 此默认值至少为 I/O 绑定任务保留 5 个工作线程。 它最多使用 32 个 CPU 内核来执行释放 GIL 的 CPU 绑定任务。 它避免在多核机器上隐式使用非常大的资源。

ThreadPoolExecutor 现在在启动 max_workers 工作线程之前重用空闲工作线程。

线程池执行器示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

进程池执行器

ProcessPoolExecutor 类是一个 Executor 子类,它使用进程池异步执行调用。 ProcessPoolExecutor 使用 multiprocessing 模块,这允许它绕过 Global Interpreter Lock,但也意味着只能执行和返回可拾取的对象。

__main__ 模块必须可由工作子进程导入。 这意味着 ProcessPoolExecutor 在交互式解释器中不起作用。

从提交给 ProcessPoolExecutor 的可调用对象调用 ExecutorFuture 方法将导致死锁。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

Executor 子类使用最多 max_workers 个进程池异步执行调用。 如果 max_workersNone 或未给出,则默认为机器上的处理器数量。 如果 max_workers 小于或等于 0,则会引发 ValueError。 在 Windows 上,max_workers 必须小于或等于 61。 如果不是,则将引发 ValueError。 如果 max_workersNone,则默认选择最多为 61,即使有更多处理器可用。 mp_context 可以是多处理上下文或无。 它将用于启动工作程序。 如果 mp_contextNone 或未给出,则使用默认的多处理上下文。

initializer 是一个可选的可调用对象,在每个工作进程开始时调用; initargs 是传递给初始化程序的参数元组。 如果 initializer 引发异常,所有当前挂起的作业将引发 BrokenProcessPool,以及任何向池提交更多作业的尝试。

3.3 版更改: 当其中一个工作进程突然终止时,现在会引发 BrokenProcessPool 错误。 以前,行为是未定义的,但对 executor 或其期货的操作经常会冻结或死锁。

3.7 版更改: 添加了 mp_context 参数以允许用户控制池创建的工作进程的 start_method。

添加了 initializerinitargs 参数。

ProcessPoolExecutor 示例

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

未来对象

Future 类封装了可调用的异步执行。 Future 实例由 Executor.submit() 创建。

class concurrent.futures.Future

封装可调用的异步执行。 Future 实例由 Executor.submit() 创建,除测试外不应直接创建。

cancel()

尝试取消呼叫。 如果调用当前正在执行或运行结束且无法取消,则该方法将返回 False,否则该调用将被取消,该方法将返回 True

cancelled()

如果呼叫成功取消,则返回 True

running()

如果调用当前正在执行且无法取消,则返回 True

done()

如果调用成功取消或完成运行,则返回 True

result(timeout=None)

返回调用返回的值。 如果调用尚未完成,则此方法将等待 timeout 秒。 如果调用未在 timeout 秒内完成,则会引发 concurrent.futures.TimeoutErrortimeout 可以是 int 或 float。 如果未指定 timeoutNone,则等待时间没有限制。

如果未来在完成之前被取消,那么 CancelledError 将被引发。

如果调用引发异常,则此方法将引发相同的异常。

exception(timeout=None)

返回调用引发的异常。 如果调用尚未完成,则此方法将等待 timeout 秒。 如果调用未在 timeout 秒内完成,则会引发 concurrent.futures.TimeoutErrortimeout 可以是 int 或 float。 如果未指定 timeoutNone,则等待时间没有限制。

如果未来在完成之前被取消,那么 CancelledError 将被引发。

如果调用完成而未引发,则返回 None

add_done_callback(fn)

将可调用的 fn 附加到未来。 fn 将在未来被取消或完成运行时被调用,未来作为其唯一参数。

添加的可调用对象按照添加的顺序调用,并且始终在属于添加它们的进程的线程中调用。 如果可调用对象引发 Exception 子类,它将被记录并忽略。 如果可调用对象引发 BaseException 子类,则行为未定义。

如果future已经完成或被取消,会立即调用fn


以下 Future 方法用于单元测试和 Executor 实现。

set_running_or_notify_cancel()

在执行与 Future 相关的工作和单元测试之前,该方法应该只被 Executor 实现调用。

如果该方法返回 False 则取消 Future,即 Future.cancel() 被调用并返回 True。 任何等待 Future 完成的线程(即 通过 as_completed()wait()) 将被唤醒。

如果方法返回 True 则表示 Future 没有被取消并且已经进入运行状态,即 调用 Future.running() 将返回 True。

该方法只能调用一次,不能在调用 Future.set_result()Future.set_exception() 后调用。

set_result(result)

将与 Future 相关的工作结果设置为 result

此方法仅应由 Executor 实现和单元测试使用。

在 3.8 版更改: 如果 Future 已经完成,此方法会引发 concurrent.futures.InvalidStateError

set_exception(exception)

将与 Future 相关的工作结果设置为 Exception exception

此方法仅应由 Executor 实现和单元测试使用。

在 3.8 版更改: 如果 Future 已经完成,此方法会引发 concurrent.futures.InvalidStateError



模块功能

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待 fs 给出的 Future 实例(可能由不同的 Executor 实例创建)完成。 返回一个命名的 2 元组集合。 第一组名为 done,包含在等待完成之前完成的期货(已完成或取消的期货)。 第二组名为 not_done,包含未完成的期货(待定或正在运行的期货)。

timeout 可用于控制返回前等待的最大秒数。 timeout 可以是 int 或 float。 如果未指定 timeoutNone,则等待时间没有限制。

return_when 指示此函数应何时返回。 它必须是以下常量之一:

持续的

描述

FIRST_COMPLETED

当任何未来完成或取消时,该函数将返回。

FIRST_EXCEPTION

当任何未来通过引发异常完成时,该函数将返回。 如果没有 future 引发异常,则它相当于 ALL_COMPLETED

ALL_COMPLETED

当所有期货完成或被取消时,该函数将返回。

concurrent.futures.as_completed(fs, timeout=None)
返回由 fs 给出的 Future 实例(可能由不同的 Executor 实例创建)的迭代器,它在完成时产生期货(已完成或取消的期货)。 fs 给出的任何重复的期货将返回一次。 在调用 as_completed() 之前完成的任何期货将首先产生。 如果调用了 __next__() 并且在 timeout 秒后结果不可用,则返回的迭代器会引发 concurrent.futures.TimeoutError X193X]as_completed()。 timeout 可以是 int 或 float。 如果未指定 timeoutNone,则等待时间没有限制。

也可以看看

PEP 3148 – 期货 – 异步执行计算
描述此功能以包含在 Python 标准库中的提案。


异常类

exception concurrent.futures.CancelledError
在取消未来时引发。
exception concurrent.futures.TimeoutError
当未来的操作超过给定的超时时引发。
exception concurrent.futures.BrokenExecutor

派生自 RuntimeError,该异常类在 executor 因某种原因损坏时引发,不能用于提交或执行新任务。

3.7 版中的新功能。

exception concurrent.futures.InvalidStateError

在当前状态不允许的未来上执行操作时引发。

3.8 版中的新功能。

exception concurrent.futures.thread.BrokenThreadPool

派生自 BrokenExecutor,当 ThreadPoolExecutor 的工作人员之一初始化失败时,会引发此异常类。

3.7 版中的新功能。

exception concurrent.futures.process.BrokenProcessPool

派生自 BrokenExecutor(以前称为 RuntimeError),当 ProcessPoolExecutor 的一个工作程序以非干净方式终止时(例如,如果它是从外面杀死的)。

3.3 版中的新功能。