如何在Python3中使用ThreadPoolExecutor

来自菜鸟教程
跳转至:导航、​搜索

作者选择了 COVID-19 Relief Fund 作为 Write for DOnations 计划的一部分来接受捐赠。

介绍

Python 线程 are a form of parallelism that allow your program to run multiple procedures at once. Parallelism in Python can also be achieved using multiple processes, but threads are particularly well suited to speeding up applications that involve significant amounts of I/O (input/output).

示例 I/O 绑定操作 包括发出 Web 请求和从文件中读取数据。 与 I/O 绑定操作相比,CPU 绑定操作(如使用 Python 标准库执行数学运算)不会从 Python 线程中获得太多好处。

Python 3 包含 ThreadPoolExecutor 实用程序,用于在线程中执行代码。

在本教程中,我们将使用 ThreadPoolExecutor 方便地发出网络请求。 我们将定义一个非常适合在线程中调用的函数,使用 ThreadPoolExecutor 来执行该函数,并处理这些执行的结果。

在本教程中,我们将发出网络请求以检查 Wikipedia 页面是否存在。

注意: I/O 绑定操作比 CPU 绑定操作更受益于线程的事实是由 Python 中称为 全局解释器锁 的特性引起的。 如果您愿意,可以在 Python 官方文档 中了解更多关于 Python 的全局解释器锁


先决条件

为了充分利用本教程,建议您熟悉 Python 编程和安装了 requests 的本地 Python 编程环境。

您可以查看这些教程以获取必要的背景信息:

pip install --user requests==2.23.0

第 1 步 — 定义要在线程中执行的函数

让我们首先定义一个我们想在线程的帮助下执行的函数。

使用 nano 或您喜欢的文本编辑器/开发环境,您可以打开此文件:

nano wiki_page_function.py

在本教程中,我们将编写一个函数来确定维基百科页面是否存在:

wiki_page_function.py

import requests

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

get_wiki_page_existence 函数 接受两个参数:维基百科页面的 URL (wiki_page_url) 和 timeout 等待响应的秒数那个网址。

get_wiki_page_existence 使用 requests 包向该 URL 发出 Web 请求。 根据 HTTP response状态码 ,返回一个描述页面是否存在的字符串。 不同的状态码代表一个 HTTP 请求的不同结果。 此过程假定 200“成功”状态码表示维基百科页面存在,而 404“未找到”状态码表示维基百科页面不存在。

如先决条件部分所述,您需要安装 requests 软件包才能运行此功能。

让我们尝试通过在 get_wiki_page_existence 函数之后添加 url 和函数调用来运行该函数:

wiki_page_function.py

. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))

添加代码后,保存并关闭文件。

如果我们运行这段代码:

python wiki_page_function.py

我们将看到如下输出:

Outputhttps://en.wikipedia.org/wiki/Ocean - exists

使用有效的 Wikipedia 页面调用 get_wiki_page_existence 函数会返回一个字符串,以确认该页面确实存在。

警告: 一般来说,如果不特别注意避免并发错误,在线程之间共享 Python 对象或状态是不安全的。 在定义要在线程中执行的函数时,最好定义一个执行单个作业且不向其他线程共享或发布状态的函数。 get_wiki_page_existence 是这种函数的一个例子。


第 2 步 — 使用 ThreadPoolExecutor 在线程中执行函数

现在我们有了一个非常适合用线程调用的函数,我们可以使用 ThreadPoolExecutor 方便地执行该函数的多次调用。

让我们在 wiki_page_function.py 中将以下突出显示的代码添加到您的程序中:

wiki_page_function.py

import requests
import concurrent.futures

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())

让我们看看这段代码是如何工作的:

  • 导入 concurrent.futures 以使我们能够访问 ThreadPoolExecutor
  • with 语句用于创建 ThreadPoolExecutor 实例 executor,该实例将在完成后立即清理线程。
  • submittedexecutor 有四个作业:wiki_page_urls 列表中的每个 URL 一个。
  • submit 的每次调用都会返回一个 Future 实例,该实例存储在 futures 列表中。
  • as_completed 函数等待每个 Future get_wiki_page_existence 调用完成,以便我们可以打印其结果。

如果我们再次运行这个程序,使用以下命令:

python wiki_page_function.py

我们将看到如下输出:

Outputhttps://en.wikipedia.org/wiki/Island - exists
https://en.wikipedia.org/wiki/Ocean - exists
https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
https://en.wikipedia.org/wiki/Shark - exists

这个输出是有道理的:其中 3 个 URL 是有效的 Wikipedia 页面,其中一个 this_page_does_not_exist 不是。 请注意,您的输出的排序可能与此输出不同。 此示例中的 concurrent.futures.as_completed 函数会在结果可用时立即返回结果,无论作业提交的顺序如何。

第 3 步 — 处理线程中运行的函数的异常

在上一步中,get_wiki_page_existence 成功地为我们所有的调用返回了一个值。 在这一步中,我们将看到 ThreadPoolExecutor 也可以引发线程函数调用中生成的异常。

让我们考虑以下示例代码块:

wiki_page_function.py

import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status


wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(
            executor.submit(
                get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
            )
        )
    for future in concurrent.futures.as_completed(futures):
        try:
            print(future.result())
        except requests.ConnectTimeout:
            print("ConnectTimeout.")

此代码块与我们在步骤 2 中使用的代码块几乎相同,但它有两个关键区别:

  • 我们现在将 timeout=0.00001 传递给 get_wiki_page_existence。 由于 requests 包将无法在 0.00001 秒内完成其对 Wikipedia 的 Web 请求,因此它将引发 ConnectTimeout 异常。
  • 我们捕获由 future.result() 引发的 ConnectTimeout 异常,并在每次这样做时打印出一个字符串。

如果我们再次运行程序,我们将看到以下输出:

OutputConnectTimeout.
ConnectTimeout.
ConnectTimeout.
ConnectTimeout.

打印了四个 ConnectTimeout 消息——我们的四个 wiki_page_urls 中的每一个都有一个,因为它们都无法在 0.00001 秒内完成,并且四个 get_wiki_page_existence 中的每一个都无法完成] 调用引发了 ConnectTimeout 异常。

您现在已经看到,如果提交给 ThreadPoolExecutor 的函数调用引发异常,则可以通过调用 Future.result 正常引发异常。 在所有提交的调用上调用 Future.result 可确保您的程序不会错过线程函数引发的任何异常。

第 4 步 — 比较有线程和无线程的执行时间

现在让我们验证使用 ThreadPoolExecutor 是否确实使您的程序更快。

首先,如果我们在没有线程的情况下运行它,让我们计时 get_wiki_page_existence

wiki_page_function.py

import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
    print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)

在代码示例中,我们将 get_wiki_page_existence 函数与 50 个不同的 Wikipedia 页面 URL 逐一调用。 我们使用 time.time() 函数 打印出运行程序所需的秒数。

如果我们像以前一样再次运行此代码,我们将看到如下输出:

OutputRunning without threads:
https://en.wikipedia.org/wiki/0 - exists
https://en.wikipedia.org/wiki/1 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Without threads time: 5.803015232086182

为简洁起见,此输出中的条目 2-47 已被省略。

当您在机器上运行时,在 Without threads time 之后打印的秒数会有所不同——这没关系,您只是得到一个基线数字来与使用 ThreadPoolExecutor 的解决方案进行比较。 在这种情况下,它是 ~5.803 秒。

让我们通过 get_wiki_page_existence 运行相同的 50 个 Wikipedia URL,但这次使用 ThreadPoolExecutor

wiki_page_function.py

import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())
print("Threaded time:", time.time() - threaded_start)

该代码与我们在第 2 步中创建的代码相同,只是添加了一些打印语句,向我们显示执行代码所需的秒数。

如果我们再次运行该程序,我们将看到以下内容:

OutputRunning threaded:
https://en.wikipedia.org/wiki/1 - exists
https://en.wikipedia.org/wiki/0 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Threaded time: 1.2201685905456543

同样,在 Threaded time 之后打印的秒数在您的计算机上会有所不同(输出顺序也会有所不同)。

您现在可以比较使用和不使用线程获取 50 个 Wikipedia 页面 URL 的执行时间。

在本教程中使用的机器上,没有线程需要 ~5.803 秒,有线程需要 ~1.220 秒。 我们的程序使用线程运行得更快。

结论

在本教程中,您学习了如何使用 Python 3 中的 ThreadPoolExecutor 实用程序来有效地运行受 I/O 限制的代码。 您创建了一个非常适合在线程中调用的函数,学习了如何从该函数的线程执行中检索输出和异常,并观察到使用线程获得的性能提升。

从这里您可以了解更多关于 concurrent.futures 模块 提供的其他并发功能。