如何在Python3中使用ThreadPoolExecutor
作者选择了 COVID-19 Relief Fund 作为 Write for DOnations 计划的一部分来接受捐赠。
介绍
Python 线程 are a form of parallelism that allow your program to run multiple procedures at once. Parallelism in Python can also be achieved using multiple processes, but threads are particularly well suited to speeding up applications that involve significant amounts of I/O (input/output).
示例 I/O 绑定操作 包括发出 Web 请求和从文件中读取数据。 与 I/O 绑定操作相比,CPU 绑定操作(如使用 Python 标准库执行数学运算)不会从 Python 线程中获得太多好处。
Python 3 包含 ThreadPoolExecutor
实用程序,用于在线程中执行代码。
在本教程中,我们将使用 ThreadPoolExecutor
方便地发出网络请求。 我们将定义一个非常适合在线程中调用的函数,使用 ThreadPoolExecutor
来执行该函数,并处理这些执行的结果。
在本教程中,我们将发出网络请求以检查 Wikipedia 页面是否存在。
注意: I/O 绑定操作比 CPU 绑定操作更受益于线程的事实是由 Python 中称为 全局解释器锁 的特性引起的。 如果您愿意,可以在 Python 官方文档 中了解更多关于 Python 的全局解释器锁 。
先决条件
为了充分利用本教程,建议您熟悉 Python 编程和安装了 requests
的本地 Python 编程环境。
您可以查看这些教程以获取必要的背景信息:
- 如何在 Python 3 中编码
- 如何在 Ubuntu 18.04 上安装 Python 3 并设置本地编程环境
- 要将
requests
包安装到 本地 Python 编程环境 中,可以运行以下命令:
pip install --user requests==2.23.0
第 1 步 — 定义要在线程中执行的函数
让我们首先定义一个我们想在线程的帮助下执行的函数。
使用 nano
或您喜欢的文本编辑器/开发环境,您可以打开此文件:
nano wiki_page_function.py
在本教程中,我们将编写一个函数来确定维基百科页面是否存在:
wiki_page_function.py
import requests def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status
get_wiki_page_existence
函数 接受两个参数:维基百科页面的 URL (wiki_page_url
) 和 timeout
等待响应的秒数那个网址。
get_wiki_page_existence
使用 requests 包向该 URL 发出 Web 请求。 根据 HTTP response
的 状态码 ,返回一个描述页面是否存在的字符串。 不同的状态码代表一个 HTTP 请求的不同结果。 此过程假定 200
“成功”状态码表示维基百科页面存在,而 404
“未找到”状态码表示维基百科页面不存在。
如先决条件部分所述,您需要安装 requests
软件包才能运行此功能。
让我们尝试通过在 get_wiki_page_existence
函数之后添加 url
和函数调用来运行该函数:
wiki_page_function.py
. . . url = "https://en.wikipedia.org/wiki/Ocean" print(get_wiki_page_existence(wiki_page_url=url))
添加代码后,保存并关闭文件。
如果我们运行这段代码:
python wiki_page_function.py
我们将看到如下输出:
Outputhttps://en.wikipedia.org/wiki/Ocean - exists
使用有效的 Wikipedia 页面调用 get_wiki_page_existence
函数会返回一个字符串,以确认该页面确实存在。
警告: 一般来说,如果不特别注意避免并发错误,在线程之间共享 Python 对象或状态是不安全的。 在定义要在线程中执行的函数时,最好定义一个执行单个作业且不向其他线程共享或发布状态的函数。 get_wiki_page_existence
是这种函数的一个例子。
第 2 步 — 使用 ThreadPoolExecutor 在线程中执行函数
现在我们有了一个非常适合用线程调用的函数,我们可以使用 ThreadPoolExecutor
方便地执行该函数的多次调用。
让我们在 wiki_page_function.py
中将以下突出显示的代码添加到您的程序中:
wiki_page_function.py
import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = [ "https://en.wikipedia.org/wiki/Ocean", "https://en.wikipedia.org/wiki/Island", "https://en.wikipedia.org/wiki/this_page_does_not_exist", "https://en.wikipedia.org/wiki/Shark", ] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in wiki_page_urls: futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url)) for future in concurrent.futures.as_completed(futures): print(future.result())
让我们看看这段代码是如何工作的:
- 导入
concurrent.futures
以使我们能够访问ThreadPoolExecutor
。 with
语句用于创建ThreadPoolExecutor
实例executor
,该实例将在完成后立即清理线程。- 从
submitted
到executor
有四个作业:wiki_page_urls
列表中的每个 URL 一个。 - 对
submit
的每次调用都会返回一个 Future 实例,该实例存储在futures
列表中。 as_completed
函数等待每个Future
get_wiki_page_existence
调用完成,以便我们可以打印其结果。
如果我们再次运行这个程序,使用以下命令:
python wiki_page_function.py
我们将看到如下输出:
Outputhttps://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists
这个输出是有道理的:其中 3 个 URL 是有效的 Wikipedia 页面,其中一个 this_page_does_not_exist
不是。 请注意,您的输出的排序可能与此输出不同。 此示例中的 concurrent.futures.as_completed
函数会在结果可用时立即返回结果,无论作业提交的顺序如何。
第 3 步 — 处理线程中运行的函数的异常
在上一步中,get_wiki_page_existence
成功地为我们所有的调用返回了一个值。 在这一步中,我们将看到 ThreadPoolExecutor
也可以引发线程函数调用中生成的异常。
让我们考虑以下示例代码块:
wiki_page_function.py
import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = [ "https://en.wikipedia.org/wiki/Ocean", "https://en.wikipedia.org/wiki/Island", "https://en.wikipedia.org/wiki/this_page_does_not_exist", "https://en.wikipedia.org/wiki/Shark", ] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in wiki_page_urls: futures.append( executor.submit( get_wiki_page_existence, wiki_page_url=url, timeout=0.00001 ) ) for future in concurrent.futures.as_completed(futures): try: print(future.result()) except requests.ConnectTimeout: print("ConnectTimeout.")
此代码块与我们在步骤 2 中使用的代码块几乎相同,但它有两个关键区别:
- 我们现在将
timeout=0.00001
传递给get_wiki_page_existence
。 由于requests
包将无法在0.00001
秒内完成其对 Wikipedia 的 Web 请求,因此它将引发ConnectTimeout
异常。 - 我们捕获由
future.result()
引发的ConnectTimeout
异常,并在每次这样做时打印出一个字符串。
如果我们再次运行程序,我们将看到以下输出:
OutputConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.
打印了四个 ConnectTimeout
消息——我们的四个 wiki_page_urls
中的每一个都有一个,因为它们都无法在 0.00001
秒内完成,并且四个 get_wiki_page_existence
中的每一个都无法完成] 调用引发了 ConnectTimeout
异常。
您现在已经看到,如果提交给 ThreadPoolExecutor
的函数调用引发异常,则可以通过调用 Future.result
正常引发异常。 在所有提交的调用上调用 Future.result
可确保您的程序不会错过线程函数引发的任何异常。
第 4 步 — 比较有线程和无线程的执行时间
现在让我们验证使用 ThreadPoolExecutor
是否确实使您的程序更快。
首先,如果我们在没有线程的情况下运行它,让我们计时 get_wiki_page_existence
:
wiki_page_function.py
import time import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)] print("Running without threads:") without_threads_start = time.time() for url in wiki_page_urls: print(get_wiki_page_existence(wiki_page_url=url)) print("Without threads time:", time.time() - without_threads_start)
在代码示例中,我们将 get_wiki_page_existence
函数与 50 个不同的 Wikipedia 页面 URL 逐一调用。 我们使用 time.time() 函数 打印出运行程序所需的秒数。
如果我们像以前一样再次运行此代码,我们将看到如下输出:
OutputRunning without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182
为简洁起见,此输出中的条目 2-47 已被省略。
当您在机器上运行时,在 Without threads time
之后打印的秒数会有所不同——这没关系,您只是得到一个基线数字来与使用 ThreadPoolExecutor
的解决方案进行比较。 在这种情况下,它是 ~5.803
秒。
让我们通过 get_wiki_page_existence
运行相同的 50 个 Wikipedia URL,但这次使用 ThreadPoolExecutor
:
wiki_page_function.py
import time import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)] print("Running threaded:") threaded_start = time.time() with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in wiki_page_urls: futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url)) for future in concurrent.futures.as_completed(futures): print(future.result()) print("Threaded time:", time.time() - threaded_start)
该代码与我们在第 2 步中创建的代码相同,只是添加了一些打印语句,向我们显示执行代码所需的秒数。
如果我们再次运行该程序,我们将看到以下内容:
OutputRunning threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543
同样,在 Threaded time
之后打印的秒数在您的计算机上会有所不同(输出顺序也会有所不同)。
您现在可以比较使用和不使用线程获取 50 个 Wikipedia 页面 URL 的执行时间。
在本教程中使用的机器上,没有线程需要 ~5.803
秒,有线程需要 ~1.220
秒。 我们的程序使用线程运行得更快。
结论
在本教程中,您学习了如何使用 Python 3 中的 ThreadPoolExecutor
实用程序来有效地运行受 I/O 限制的代码。 您创建了一个非常适合在线程中调用的函数,学习了如何从该函数的线程执行中检索输出和异常,并观察到使用线程获得的性能提升。
从这里您可以了解更多关于 concurrent.futures 模块 提供的其他并发功能。