multiprocessing — 基于进程的并行性 — Python 文档
multiprocessing — 基于进程的并行性
源代码: :source:`库/多处理/`
介绍
multiprocessing 是一个使用类似于 threading 模块的 API 支持生成进程的包。 multiprocessing 包提供本地和远程并发,通过使用子进程而不是线程有效地绕过 全局解释器锁 。 因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。 它可以在 Unix 和 Windows 上运行。
multiprocessing 模块还引入了在 threading 模块中没有类似物的 API。 一个主要的例子是 Pool 对象,它提供了一种方便的方法来并行化跨多个输入值的函数执行,跨进程分布输入数据(数据并行)。 以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。 这个使用 Pool 的数据并行的基本示例,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
将打印到标准输出
[1, 4, 9]
Process 类
在 multiprocessing 中,通过创建 Process 对象然后调用其 start() 方法来生成进程。 Process 遵循 threading.Thread 的 API。 多进程程序的一个简单例子是
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
为了显示所涉及的各个进程 ID,这里是一个扩展示例:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
有关为什么需要 if __name__ == '__main__'
部分的解释,请参阅 编程指南 。
上下文和启动方法
根据平台的不同,multiprocessing 支持三种启动进程的方式。 这些 启动方法 是
- 产卵
父进程启动一个新的 python 解释器进程。 子进程将只继承运行进程对象 run() 方法所需的资源。 特别是,父进程中不必要的文件描述符和句柄将不会被继承。 与使用 fork 或 forkserver 相比,使用此方法启动进程相当慢。
在 Unix 和 Windows 上可用。 Windows 上的默认设置。
- 叉子
父进程使用 os.fork() 来派生 Python 解释器。 子进程在开始时实际上与父进程相同。 父进程的所有资源都由子进程继承。 请注意,安全地分叉多线程进程是有问题的。
仅在 Unix 上可用。 Unix 上的默认设置。
- 分叉服务器
当程序启动并选择forkserver启动方式时,启动一个服务器进程。 从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它派生一个新进程。 fork 服务器进程是单线程的,因此使用 os.fork() 是安全的。 没有不必要的资源被继承。
在支持通过 Unix 管道传递文件描述符的 Unix 平台上可用。
3.4 版本变更:spawn 添加到所有 unix 平台,forkserver 添加到一些 unix 平台。 子进程不再继承 Windows 上的所有父进程可继承句柄。
在 Unix 上,使用 spawn 或 forkserver 启动方法还将启动一个 信号量跟踪器 进程,该进程跟踪由程序进程创建的未链接的命名信号量。 当所有进程都退出时,信号量跟踪器会取消所有剩余信号量的链接。 通常应该没有,但如果一个进程被一个信号杀死,可能会有一些“泄漏”的信号量。 (取消指定信号量的链接是一件很严重的事情,因为系统只允许有限数量的信号量,并且在下次重新启动之前它们不会自动取消链接。)
要选择启动方法,请使用主模块的 if __name__ == '__main__'
子句中的 set_start_method()。 例如:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method() 不应在程序中多次使用。
或者,您可以使用 get_context() 来获取上下文对象。 上下文对象与多处理模块具有相同的 API,并允许在同一程序中使用多个启动方法。
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
请注意,与一个上下文相关的对象可能与不同上下文的进程不兼容。 特别是,使用 fork 上下文创建的锁不能传递给使用 spawn 或 forkserver 启动方法启动的进程。
想要使用特定启动方法的库可能应该使用 get_context() 以避免干扰库用户的选择。
警告
'spawn'
和 'forkserver'
启动方法目前不能用于“冻结”的可执行文件(即,由 PyInstaller 和 cx_Freeze 等包生成的二进制文件) Unix。 'fork'
启动方法确实有效。
在进程之间交换对象
multiprocessing支持两种进程间通信通道:
队列
Queue 类是 queue.Queue 的近似克隆。 例如:
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
队列是线程和进程安全的。
管道
Pipe() 函数返回一对由管道连接的连接对象,管道默认为双工(双向)。 例如:
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
Pipe() 返回的两个连接对象代表管道的两端。 每个连接对象都有
send()
和recv()
方法(等等)。 请注意,如果两个进程(或线程)尝试同时读取或写入管道的 相同 端,则管道中的数据可能会损坏。 当然,同时使用管道不同端的进程不存在损坏的风险。
进程间同步
multiprocessing 包含来自 threading 的所有同步原语的等价物。 例如,可以使用锁来确保一次只有一个进程打印到标准输出:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
如果不使用来自不同进程的锁输出,很容易混淆。
使用工人池
Pool 类代表一个工作进程池。 它具有允许以几种不同方式将任务卸载到工作进程的方法。
例如:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
请注意,池的方法只能由创建它的进程使用。
笔记
此包中的功能要求 __main__
模块可由子项导入。 这在 编程指南 中有介绍,但值得在这里指出。 这意味着某些示例,例如 multiprocessing.pool.Pool 示例将无法在交互式解释器中工作。 例如:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(如果您尝试这样做,它实际上会输出以半随机方式交错的三个完整回溯,然后您可能不得不以某种方式停止主进程。)
参考
multiprocessing 包主要复制了 threading 模块的 API。
进程和异常
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
流程对象表示在单独流程中运行的活动。 Process 类具有 threading.Thread 的所有方法的等价物。
应始终使用关键字参数调用构造函数。 group 应该总是
None
; 它的存在仅仅是为了与 threading.Thread 兼容。 target 是由 run() 方法调用的可调用对象。 它默认为None
,意味着什么都不调用。 name 是进程名称(有关详细信息,请参阅 name)。 args 是目标调用的参数元组。 kwargs 是目标调用的关键字参数字典。 如果提供,仅关键字 daemon 参数将进程 daemon 标志设置为True
或False
。 如果是None
(默认),这个标志将从创建过程中继承。默认情况下,没有参数传递给 target。
如果子类覆盖构造函数,它必须确保在对进程执行任何其他操作之前调用基类构造函数 (
Process.__init__()
)。在 3.3 版更改: 添加了 守护进程 参数。
- run()
表示流程活动的方法。
您可以在子类中覆盖此方法。 标准的 run() 方法调用传递给对象构造函数的可调用对象作为目标参数,如果有的话,顺序和关键字参数取自 args 和 kwargs[ X211X] 参数,分别。
- start()
启动流程的活动。
每个进程对象最多必须调用一次。 它安排在单独的进程中调用对象的 run() 方法。
- join([timeout])
如果可选参数 timeout 是
None
(默认值),则该方法将阻塞,直到调用 join() 方法的进程终止。 如果 timeout 为正数,则最多阻塞 timeout 秒。 请注意,如果该方法的进程终止或该方法超时,则该方法返回None
。 检查进程的 exitcode 以确定它是否终止。一个进程可以多次加入。
进程不能加入自身,因为这会导致死锁。 在进程启动之前尝试加入进程是错误的。
- name
进程的名称。 该名称是一个仅用于识别目的的字符串。 它没有语义。 多个进程可能会被赋予相同的名称。
初始名称由构造函数设置。 如果没有为构造函数提供显式名称,则会构造一个格式为“Process-N1:N2:...:Nk”的名称,其中每个 Nk 是其父节点的第 N 个子节点。
- is_alive()
返回进程是否存活。
粗略地说,进程对象从 start() 方法返回的那一刻起一直处于活动状态,直到子进程终止。
- daemon
进程的守护进程标志,一个布尔值。 这必须在调用 start() 之前设置。
初始值继承自创建过程。
当一个进程退出时,它会尝试终止其所有守护进程。
请注意,不允许守护进程创建子进程。 否则,如果守护进程在其父进程退出时终止,它的子进程将成为孤儿。 此外,这些是 不是 Unix 守护进程或服务,它们是正常进程,如果非守护进程退出,它们将被终止(而不是加入)。
除了 threading.Thread API,Process 对象还支持以下属性和方法:
- pid
返回进程 ID。 在进程产生之前,这将是
None
。
- exitcode
孩子的退出代码。 如果进程尚未终止,这将是
None
。 负值 -N 表示子进程被信号 N 终止。
- authkey
进程的身份验证密钥(字节字符串)。
当 multiprocessing 初始化时,主进程会使用 os.urandom() 分配一个随机字符串。
当创建 Process 对象时,它将继承其父进程的身份验证密钥,尽管这可以通过将 authkey 设置为另一个字节字符串来更改。
请参阅 身份验证密钥 。
- sentinel
系统对象的数字句柄,在进程结束时将变为“就绪”。
如果您想使用 multiprocessing.connection.wait() 一次等待多个事件,则可以使用此值。 否则调用 join() 更简单。
在 Windows 上,这是一个可用于
WaitForSingleObject
和WaitForMultipleObjects
API 调用系列的操作系统句柄。 在 Unix 上,这是一个文件描述符,可用于 select 模块中的原语。3.3 版中的新功能。
- terminate()
终止进程。 在 Unix 上,这是使用
SIGTERM
信号完成的; 在 Windows 上使用TerminateProcess()
。 请注意,不会执行退出处理程序和 finally 子句等。请注意,进程的后代进程将 不会 被终止——它们只会成为孤立的。
警告
如果在关联进程正在使用管道或队列时使用此方法,则管道或队列可能会损坏,并且可能无法被其他进程使用。 类似地,如果进程获得了锁或信号量等。 然后终止它很容易导致其他进程死锁。
- kill()
与 terminate() 相同,但在 Unix 上使用
SIGKILL
信号。3.7 版中的新功能。
- close()
关闭 Process 对象,释放与其关联的所有资源。 如果底层进程仍在运行,则会引发 ValueError。 一旦 close() 成功返回,Process 对象的大多数其他方法和属性将引发 ValueError。
3.7 版中的新功能。
注意 start()、join()、is_alive()、terminate() 和 exitcode方法只能由创建进程对象的进程调用。
Process的一些方法的使用示例:
>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process(Process-1, initial)> False >>> p.start() >>> print(p, p.is_alive()) <Process(Process-1, started)> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process(Process-1, stopped[SIGTERM])> False >>> p.exitcode == -signal.SIGTERM True
- exception multiprocessing.ProcessError
- 所有 multiprocessing 异常的基类。
- exception multiprocessing.BufferTooShort
当提供的缓冲区对象太小而无法读取消息时,
Connection.recv_bytes_into()
引发异常。如果
e
是 BufferTooShort 的实例,则e.args[0]
将把消息作为字节串给出。
- exception multiprocessing.AuthenticationError
- 出现身份验证错误时引发。
- exception multiprocessing.TimeoutError
- 超时到期时由具有超时的方法引发。
管道和队列
当使用多个进程时,通常使用消息传递进行进程之间的通信,并避免使用任何同步原语,如锁。
对于传递消息,可以使用 Pipe()(用于两个进程之间的连接)或队列(允许多个生产者和消费者)。
Queue、SimpleQueue 和 JoinableQueue 类型是基于 queue.Queue 建模的多生产者、多消费者 FIFO 队列 标准库中的类。 它们的不同之处在于 Queue 缺少 Python 2.5 的 queue.Queue 类中引入的 task_done() 和 join() 方法。
如果您使用 JoinableQueue 那么您 必须 为每个从队列中移除的任务调用 JoinableQueue.task_done() 或者用于计算未完成任务数的信号量最终可能会溢出,引发异常。
请注意,还可以使用管理器对象创建共享队列——参见 Managers。
笔记
multiprocessing 使用通常的 queue.Empty 和 queue.Full 异常来表示超时。 它们在 multiprocessing 命名空间中不可用,因此您需要从 queue 导入它们。
笔记
当一个对象被放入队列时,该对象被腌制,后台线程稍后将腌制的数据刷新到底层管道。 这会产生一些令人惊讶的后果,但不应该造成任何实际困难——如果它们真的打扰你,那么你可以改用用 管理器 创建的队列。
- 将对象放入空队列后,在队列的 empty() 方法返回 False 和 get_nowait() 可以在不引发 [ X197X]queue.Empty。
- 如果多个进程正在排队对象,则对象可能会在另一端无序接收。 但是,由同一进程排队的对象将始终按预期顺序排列。
警告
如果进程在尝试使用 Queue 时使用 Process.terminate() 或 os.kill() 被杀死,则队列中的数据很可能会损坏。 这可能会导致任何其他进程在稍后尝试使用队列时出现异常。
警告
如上所述,如果子进程已将项目放入队列(并且尚未使用 JoinableQueue.cancel_join_thread),则该进程将不会终止,直到所有缓冲的项目都已刷新到管道中。
这意味着,如果您尝试加入该进程,您可能会遇到死锁,除非您确定放入队列的所有项目都已被消耗。 类似地,如果子进程是非守护进程,那么当它尝试加入其所有非守护进程时,父进程可能会在退出时挂起。
请注意,使用管理器创建的队列不存在此问题。 请参阅 编程指南 。
有关使用队列进行进程间通信的示例,请参阅 示例 。
- multiprocessing.Pipe([duplex])
返回表示管道末端的 Connection 对象的一对
(conn1, conn2)
。如果 duplex 是
True
(默认值),则管道是双向的。 如果 duplex 是False
则管道是单向的:conn1
只能用于接收消息,而conn2
只能用于发送消息。
- class multiprocessing.Queue([maxsize])
返回使用管道和一些锁/信号量实现的进程共享队列。 当一个进程第一次将一个项目放入队列时,一个馈线线程将启动,它将对象从缓冲区传输到管道中。
标准库的 queue 模块中常见的 queue.Empty 和 queue.Full 异常被引发以发出超时信号。
Queue 实现了 queue.Queue 的所有方法,除了 task_done() 和 join()。
- qsize()
返回队列的大致大小。 由于多线程/多处理语义,这个数字是不可靠的。
请注意,这可能会在未实现
sem_getvalue()
的 Mac OS X 等 Unix 平台上引发 NotImplementedError。
- empty()
如果队列为空,则返回
True
,否则返回False
。 由于多线程/多处理语义,这是不可靠的。
- full()
如果队列已满,则返回
True
,否则返回False
。 由于多线程/多处理语义,这是不可靠的。
- put(obj[, block[, timeout]])
将 obj 放入队列中。 如果可选参数 block 是
True
(默认值)并且 timeout 是None
(默认值),则在必要时阻塞直到空闲插槽可用的。 如果 timeout 是一个正数,它最多阻塞 timeout 秒,如果在这段时间内没有可用的空闲插槽,则会引发 queue.Full 异常。 否则 (block 是False
),如果空闲插槽立即可用,则将项目放入队列,否则引发 queue.Full 异常 (timeout 在这种情况下被忽略)。
- put_nowait(obj)
相当于
put(obj, False)
。
- get([block[, timeout]])
从队列中移除并返回一个项目。 如果可选参数 block 是
True
(默认值)并且 timeout 是None
(默认值),则在必要时阻止,直到项目可用。 如果 timeout 是一个正数,它最多阻塞 timeout 秒,如果在这段时间内没有可用的项目,则会引发 queue.Empty 异常。 否则(块是False
),如果一个项目立即可用则返回一个项目,否则引发 queue.Empty 异常(timeout 在这种情况下被忽略)。
- get_nowait()
相当于
get(False)
。
multiprocessing.Queue 有一些在 queue.Queue 中没有的额外方法。 大多数代码通常不需要这些方法:
- close()
指示当前进程不会将更多数据放入此队列。 后台线程将在将所有缓冲数据刷新到管道后退出。 当队列被垃圾收集时会自动调用。
- join_thread()
加入后台线程。 这只能在 close() 被调用后使用。 它会阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道中。
默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。 进程可以调用 cancel_join_thread() 使 join_thread() 什么都不做。
- cancel_join_thread()
防止 join_thread() 阻塞。 特别是,这会阻止后台线程在进程退出时自动加入——参见 join_thread()。
此方法的更好名称可能是
allow_exit_without_flush()
。 它很可能会导致排队的数据丢失,您几乎肯定不需要使用它。 只有当您需要当前进程立即退出而不等待将排队的数据刷新到底层管道时,它才真正存在,并且您不关心丢失的数据。
笔记
此类的功能需要在主机操作系统上实现有效的共享信号量。 如果没有,该类中的功能将被禁用,并且尝试实例化 Queue 将导致 ImportError。 有关其他信息,请参阅 :issue:`3770`。 这同样适用于下面列出的任何专用队列类型。
- class multiprocessing.SimpleQueue
它是一个简化的 Queue 类型,非常接近一个锁定的 Pipe。
- empty()
如果队列为空,则返回
True
,否则返回False
。
- get()
从队列中移除并返回一个项目。
- put(item)
将 item 放入队列。
- class multiprocessing.JoinableQueue([maxsize])
JoinableQueue 是一个 Queue 子类,是一个额外具有 task_done() 和 join() 方法的队列。
- task_done()
指示以前排队的任务已完成。 由队列消费者使用。 对于用于获取任务的每个 get(),随后对 task_done() 的调用告诉队列该任务的处理已完成。
如果 join() 当前正在阻塞,它会在所有项目都处理完毕后恢复(这意味着每个已经被 放置的项目都收到了一个 task_done() 调用() 进入队列)。
如果调用次数多于放置在队列中的项目,则引发 ValueError。
- join()
阻塞直到队列中的所有项目都被获取和处理。
每当将项目添加到队列时,未完成任务的计数就会增加。 每当消费者调用 task_done() 以指示该项目已被检索并且其上的所有工作已完成时,计数就会下降。 当未完成任务的数量降为零时,join() 解除阻塞。
各种各样的
- multiprocessing.active_children()
返回当前进程的所有活动子进程的列表。
调用它具有“加入”任何已经完成的进程的副作用。
- multiprocessing.cpu_count()
返回系统中的 CPU 数量。
这个数字不等于当前进程可以使用的 CPU 数量。 可用 CPU 的数量可以通过
len(os.sched_getaffinity(0))
获得可能会引发 NotImplementedError。
也可以看看
- multiprocessing.current_process()
返回当前进程对应的Process对象。
- multiprocessing.freeze_support()
添加对使用 multiprocessing 的程序被冻结以生成 Windows 可执行文件时的支持。 (已使用 py2exe、PyInstaller 和 cx_Freeze 进行测试。)
需要在主模块的
if __name__ == '__main__'
行之后直接调用此函数。 例如:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
如果省略
freeze_support()
行,则尝试运行冻结的可执行文件将引发 RuntimeError。在 Windows 以外的任何操作系统上调用时,调用
freeze_support()
不起作用。 另外,如果模块在Windows上被Python解释器正常运行(程序没有被冻结),那么freeze_support()
是没有作用的。
- multiprocessing.get_all_start_methods()
返回支持的启动方法列表,第一个是默认方法。 可能的启动方法是
'fork'
、'spawn'
和'forkserver'
。 在 Windows 上,只有'spawn'
可用。 在 Unix 上始终支持'fork'
和'spawn'
,'fork'
是默认值。3.4 版中的新功能。
- multiprocessing.get_context(method=None)
返回一个与 multiprocessing 模块具有相同属性的上下文对象。
如果 method 是
None
,则返回默认上下文。 否则 方法 应为'fork'
、'spawn'
、'forkserver'
。 如果指定的启动方法不可用,则会引发 ValueError。3.4 版中的新功能。
- multiprocessing.get_start_method(allow_none=False)
返回用于启动进程的启动方法的名称。
如果启动方法没有被修复并且allow_none为false,那么启动方法被固定为默认值并返回名称。 如果启动方法尚未修复且 allow_none 为真,则返回
None
。返回值可以是
'fork'
、'spawn'
、'forkserver'
或None
。'fork'
是 Unix 上的默认值,而'spawn'
是 Windows 上的默认值。3.4 版中的新功能。
- multiprocessing.set_executable()
设置启动子进程时要使用的 Python 解释器的路径。 (默认情况下使用 sys.executable)。 嵌入者可能需要做一些类似的事情
set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
在他们可以创建子进程之前。
在 3.4 版更改: 现在在使用
'spawn'
启动方法时在 Unix 上支持。
- multiprocessing.set_start_method(method)
设置应该用于启动子进程的方法。 方法可以是
'fork'
、'spawn'
或'forkserver'
。请注意,这最多应调用一次,并且应在主模块的
if __name__ == '__main__'
子句中对其进行保护。3.4 版中的新功能。
连接对象
连接对象允许发送和接收可腌制的对象或字符串。 它们可以被认为是面向消息的连接套接字。
连接对象通常使用 Pipe 创建——另见 Listeners 和 Clients。
- class multiprocessing.connection.Connection
- send(obj)
将对象发送到连接的另一端,应使用 recv() 读取该对象。
该对象必须是可腌制的。 非常大的泡菜(大约 32 MiB+,但取决于操作系统)可能会引发 ValueError 异常。
- fileno()
返回连接使用的文件描述符或句柄。
- close()
关闭连接。
当连接被垃圾收集时会自动调用。
- poll([timeout])
返回是否有任何数据可供读取。
如果未指定 timeout,则它将立即返回。 如果 timeout 是一个数字,那么它指定阻塞的最长时间(以秒为单位)。 如果 timeout 是
None
,则使用无限超时。请注意,可以使用 multiprocessing.connection.wait() 一次轮询多个连接对象。
- send_bytes(buffer[, offset[, size]])
从 bytes-like object 发送字节数据作为完整的消息。
如果给出 offset,则从 buffer 中的那个位置读取数据。 如果给出 size ,那么将从缓冲区读取许多字节。 非常大的缓冲区(大约 32 MiB+,但取决于操作系统)可能会引发 ValueError 异常
- recv_bytes([maxlength])
以字符串形式返回从连接另一端发送的字节数据的完整消息。 阻塞直到有东西要接收。 如果没有任何东西可以接收并且另一端已关闭,则引发 EOFError。
如果指定了 maxlength 并且消息比 maxlength 长,则会引发 OSError 并且连接将不再可读。
- recv_bytes_into(buffer[, offset])
将连接另一端发送的字节数据的完整消息读入 buffer,并返回消息中的字节数。 阻塞直到有东西要接收。 如果没有任何东西可以接收并且另一端已关闭,则引发 EOFError。
buffer 必须是一个可写的 bytes-like object。 如果给出 offset,则消息将从该位置写入缓冲区。 偏移量必须是一个小于 buffer 的长度(以字节为单位)的非负整数。
如果缓冲区太短,则会引发
BufferTooShort
异常,并且完整的消息可用作e.args[0]
,其中e
是异常实例。
3.3 版更改: 现在可以使用 Connection.send() 和 Connection.recv() 在进程之间传输连接对象本身。
3.3 版新功能: 连接对象现在支持上下文管理协议 - 请参阅 上下文管理器类型 。 __enter__()返回连接对象,__exit__()调用close()。
例如:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
Connection.recv() 方法会自动解开它接收到的数据,除非您可以信任发送消息的进程,否则这可能存在安全风险。
因此,除非连接对象是使用 Pipe()
生成的,否则您应该只在执行某种身份验证后使用 recv() 和 send() 方法。 请参阅 身份验证密钥 。
警告
如果进程在尝试读取或写入管道时被终止,则管道中的数据可能会损坏,因为可能无法确定消息边界在哪里。
同步原语
通常同步原语在多进程程序中不像在多线程程序中那样必要。 请参阅 threading 模块的文档。
请注意,还可以使用管理器对象创建同步原语——参见 Managers。
- class multiprocessing.Barrier(parties[, action[, timeout]])
屏障对象:threading.Barrier 的克隆。
3.3 版中的新功能。
- class multiprocessing.BoundedSemaphore([value])
有界信号量对象:与 threading.BoundedSemaphore 类似。
与其接近的模拟存在一个单独的区别:它的
acquire
方法的第一个参数被命名为 block,与 Lock.acquire() 一致。笔记
在 Mac OS X 上,这与 Semaphore 没有区别,因为
sem_getvalue()
未在该平台上实现。
- class multiprocessing.Condition([lock])
条件变量:threading.Condition 的别名。
如果指定了 lock,那么它应该是来自 multiprocessing 的 Lock 或 RLock 对象。
3.3 版更改: 添加了 wait_for() 方法。
- class multiprocessing.Event
- threading.Event 的克隆。
- class multiprocessing.Lock
非递归锁对象:与 threading.Lock 的相似。 一旦进程或线程获得了锁,后续从任何进程或线程获取它的尝试都会阻塞,直到它被释放; 任何进程或线程都可以释放它。 threading.Lock 适用于线程的概念和行为在 multiprocessing.Lock 中复制,因为它适用于进程或线程,除非另有说明。
请注意, Lock 实际上是一个工厂函数,它返回一个用默认上下文初始化的
multiprocessing.synchronize.Lock
实例。Lock 支持 上下文管理器 协议,因此可以在 和 语句中使用。
- acquire(block=True, timeout=None)
获取锁,阻塞或非阻塞。
将 block 参数设置为
True
(默认值),方法调用将阻塞,直到锁处于解锁状态,然后将其设置为锁定并返回True
. 请注意,第一个参数的名称与 threading.Lock.acquire() 中的名称不同。将 block 参数设置为
False
,方法调用不会阻塞。 如果锁当前处于锁定状态,则返回False
; 否则将锁设置为锁定状态并返回True
。当使用 timeout 的正浮点值调用时,只要无法获取锁,最多阻塞 timeout 指定的秒数。 timeout 为负值的调用等效于 timeout 为零。 timeout 值为
None
(默认值)的调用将超时期限设置为无限。 请注意,对 timeout 的负值或None
值的处理与 threading.Lock.acquire() 中实现的行为不同。 如果 block 参数设置为False
并因此被忽略,则 timeout 参数没有实际意义。 如果已获取锁,则返回True
;如果超时时间已过,则返回False
。
- release()
释放锁。 这可以从任何进程或线程调用,而不仅仅是最初获取锁的进程或线程。
行为与 threading.Lock.release() 中的相同,除了在解锁的锁上调用时,会引发 ValueError。
- class multiprocessing.RLock
递归锁对象:与 threading.RLock 的相似。 递归锁必须由获取它的进程或线程释放。 一旦一个进程或线程获得了递归锁,同一个进程或线程就可以再次获得它而不会阻塞; 该进程或线程必须在每次获得它时释放它一次。
请注意, RLock 实际上是一个工厂函数,它返回一个用默认上下文初始化的
multiprocessing.synchronize.RLock
实例。RLock 支持 上下文管理器 协议,因此可用于 with 语句。
- acquire(block=True, timeout=None)
获取锁,阻塞或非阻塞。
当调用 block 参数设置为
True
时,阻塞直到锁处于解锁状态(不属于任何进程或线程),除非当前进程已经拥有该锁或线。 然后当前进程或线程获得锁的所有权(如果它尚未拥有所有权),并且锁内的递归级别递增 1,导致返回值True
。 请注意,与 threading.RLock.acquire() 的实现相比,第一个参数的行为有几个不同之处,从参数本身的名称开始。当调用 block 参数设置为
False
时,不要阻塞。 如果锁已经被另一个进程或线程获取(并因此拥有),则当前进程或线程不取得所有权并且锁内的递归级别不会改变,导致返回值 [X233X ]。 如果锁处于未锁定状态,则当前进程或线程取得所有权并递增递归级别,导致返回值True
。timeout 参数的使用和行为与 Lock.acquire() 中的相同。 请注意,timeout 的某些行为与 threading.RLock.acquire() 中实现的行为不同。
- release()
释放锁,递减递归级别。 如果递减后递归级别为零,则将锁重置为解锁(不属于任何进程或线程),并且如果任何其他进程或线程被阻塞等待锁解锁,则只允许其中一个继续进行。 如果递减后递归级别仍然不为零,则锁保持锁定并由调用进程或线程拥有。
仅当调用进程或线程拥有锁时才调用此方法。 如果此方法由除所有者之外的进程或线程调用,或者如果锁处于解锁(无主)状态,则会引发 AssertionError。 请注意,在这种情况下引发的异常类型与 threading.RLock.release() 中实现的行为不同。
- class multiprocessing.Semaphore([value])
信号量对象:与 threading.Semaphore 的相似。
与其接近的模拟存在一个单独的区别:它的
acquire
方法的第一个参数被命名为 block,与 Lock.acquire() 一致。
笔记
在 Mac OS X 上,sem_timedwait
不受支持,因此使用超时调用 acquire()
将使用睡眠循环模拟该函数的行为。
笔记
如果 Ctrl-C 生成的 SIGINT 信号到达而主线程被调用 BoundedSemaphore.acquire()
, Lock.acquire(), RLock. Acquire()、Semaphore.acquire()
、Condition.acquire()
或 Condition.wait()
则呼叫将立即中断并引发 KeyboardInterrupt。
这与 threading 的行为不同,其中 SIGINT 在等效阻塞调用正在进行时将被忽略。
笔记
此包的某些功能需要在主机操作系统上实现有效的共享信号量。 如果没有,multiprocessing.synchronize
模块将被禁用,尝试导入它会导致 ImportError。 有关其他信息,请参阅 :issue:`3770`。
经理
管理器提供了一种创建可以在不同进程之间共享的数据的方法,包括在不同机器上运行的进程之间通过网络共享。 管理器对象控制管理 共享对象 的服务器进程。 其他进程可以使用代理访问共享对象。
- multiprocessing.Manager()
- 返回一个已启动的 SyncManager 对象,该对象可用于在进程之间共享对象。 返回的管理器对象对应于生成的子进程,并具有创建共享对象并返回相应代理的方法。
一旦管理器进程被垃圾收集或它们的父进程退出,它们就会被关闭。 管理器类在 multiprocessing.managers 模块中定义:
- class multiprocessing.managers.BaseManager([address[, authkey]])
创建一个 BaseManager 对象。
一旦创建,应该调用 start() 或
get_server().serve_forever()
以确保管理器对象引用一个启动的管理器进程。address 是管理器进程监听新连接的地址。 如果 address 是
None
,则选择任意一个。authkey 是身份验证密钥,用于检查到服务器进程的传入连接的有效性。 如果 authkey 是
None
,则使用current_process().authkey
。 否则使用 authkey 并且它必须是一个字节串。- start([initializer[, initargs]])
启动一个子进程来启动管理器。 如果 initializer 不是
None
,则子进程将在启动时调用initializer(*initargs)
。
- get_server()
返回一个
Server
对象,它代表管理器控制下的实际服务器。Server
对象支持 [X34X] 方法:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
还有一个 address 属性。
- connect()
将本地管理器对象连接到远程管理器进程:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
- shutdown()
停止管理器使用的进程。 这仅在 start() 已用于启动服务器进程时可用。
这可以多次调用。
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
可用于注册类型或可调用管理器类的类方法。
typeid 是一个“类型标识符”,用于标识特定类型的共享对象。 这必须是一个字符串。
callable 是用于为此类型标识符创建对象的可调用对象。 如果管理器实例将使用 connect() 方法连接到服务器,或者如果 create_method 参数是
False
那么这可以保留为None
。proxytype 是 BaseProxy 的子类,用于为具有 typeid 的共享对象创建代理。 如果
None
则自动创建代理类。exposed 用于指定方法名称的序列,应允许使用 BaseProxy._callmethod() 访问此 typeid 的代理。 (如果 exposed 是
None
则使用proxytype._exposed_
如果它存在。)在没有指定公开列表的情况下,共享对象的所有“公共方法”将可以访问。 (这里的“公共方法”是指具有 __call__() 方法且名称不以'_'
开头的任何属性。)method_to_typeid 是一个映射,用于指定那些应该返回代理的公开方法的返回类型。 它将方法名称映射到 typeid 字符串。 (如果 method_to_typeid 是
None
则使用proxytype._method_to_typeid_
如果它存在。)如果方法的名称不是此映射的键或映射是None
那么方法返回的对象会被值复制。create_method 决定是否应该创建一个名称为 typeid 的方法,它可以用来告诉服务器进程创建一个新的共享对象并为其返回一个代理。 默认为
True
。
BaseManager 实例也有一个只读属性:
- address
经理使用的地址。
3.3 版更改: 管理器对象支持上下文管理协议 - 请参阅 上下文管理器类型 。 __enter__() 启动服务器进程(如果它还没有启动),然后返回管理器对象。 __exit__() 调用 shutdown()。
在以前的版本中 __enter__() 没有启动管理器的服务器进程,如果它尚未启动。
- class multiprocessing.managers.SyncManager
BaseManager 的子类,可用于进程同步。 这种类型的对象由
multiprocessing.Manager()
返回。它的方法创建并返回 代理对象 ,用于跨进程同步的许多常用数据类型。 这尤其包括共享列表和字典。
- Barrier(parties[, action[, timeout]])
创建一个共享的 threading.Barrier 对象并为其返回一个代理。
3.3 版中的新功能。
- BoundedSemaphore([value])
创建一个共享的 threading.BoundedSemaphore 对象并为其返回一个代理。
- Condition([lock])
创建一个共享的 threading.Condition 对象并为其返回一个代理。
如果提供了 lock,那么它应该是 threading.Lock 或 threading.RLock 对象的代理。
3.3 版更改: 添加了 wait_for() 方法。
- Event()
创建一个共享的 threading.Event 对象并为其返回一个代理。
- Lock()
创建一个共享的 threading.Lock 对象并为其返回一个代理。
- Namespace()
创建一个共享的 Namespace 对象并为其返回一个代理。
- Queue([maxsize])
创建一个共享的 queue.Queue 对象并为其返回一个代理。
- RLock()
创建一个共享的 threading.RLock 对象并为其返回一个代理。
- Semaphore([value])
创建一个共享的 threading.Semaphore 对象并为其返回一个代理。
- Array(typecode, sequence)
创建一个数组并为其返回一个代理。
- Value(typecode, value)
创建一个具有可写
value
属性的对象并为其返回一个代理。
- dict()
dict(mapping)
dict(sequence) 创建一个共享的 dict 对象并为其返回一个代理。
- list()
list(sequence) 创建一个共享的 list 对象并为其返回一个代理。
3.6 版变更:共享对象可以嵌套。 例如,共享容器对象(如共享列表)可以包含其他共享对象,这些对象都将由 SyncManager 管理和同步。
- class multiprocessing.managers.Namespace
可以注册到 SyncManager 的类型。
命名空间对象没有公共方法,但具有可写属性。 它的表示显示了其属性的值。
但是,当为命名空间对象使用代理时,以
'_'
开头的属性将是代理的属性而不是所指对象的属性:>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
定制经理
要创建自己的管理器,可以创建 BaseManager 的子类,并使用 register() 类方法向管理器类注册新类型或可调用对象。 例如:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
使用远程管理器
可以在一台机器上运行管理服务器并让客户端从其他机器上使用它(假设所涉及的防火墙允许)。
运行以下命令为远程客户端可以访问的单个共享队列创建服务器:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
一个客户端可以按如下方式访问服务器:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
另一个客户端也可以使用它:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
本地进程也可以访问该队列,在客户端使用上面的代码远程访问它:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
代理对象
代理是一个对象,它 将 指向一个共享对象,该对象(大概)存在于不同的进程中。 共享对象被称为代理的 referent。 多个代理对象可能具有相同的引用对象。
代理对象具有调用其所指对象的相应方法的方法(尽管并非所指对象的每个方法都必须通过代理可用)。 通过这种方式,可以像使用代理一样使用代理:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
请注意,将 str() 应用于代理将返回所指对象的表示,而应用 repr() 将返回代理的表示。
代理对象的一个重要特性是它们是可pickle的,因此它们可以在进程之间传递。 因此,引用对象可以包含 代理对象 。 这允许嵌套这些托管列表、字典和其他 代理对象 :
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
类似地,dict 和 list 代理可以相互嵌套:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
如果标准(非代理)list 或 dict 对象包含在所指对象中,对这些可变值的修改将不会通过管理器传播,因为代理无法知道何时其中包含的值被修改。 但是,在容器代理中存储一个值(在代理对象上触发 __setitem__
)确实会通过管理器传播,因此要有效地修改这样的项目,可以将修改后的值重新分配给容器代理:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
对于大多数用例,这种方法可能不如使用嵌套的 代理对象 方便,但也展示了对同步的一定程度的控制。
笔记
multiprocessing 中的代理类型不支持按值进行比较。 因此,例如,我们有:
>>> manager.list([1,2,3]) == [1,2,3]
False
在进行比较时,应该只使用所指对象的副本。
- class multiprocessing.managers.BaseProxy
代理对象是 BaseProxy 子类的实例。
- _callmethod(methodname[, args[, kwds]])
调用并返回代理所指对象的方法的结果。
如果
proxy
是一个代理,其所指对象是obj
那么表达式proxy._callmethod(methodname, args, kwds)
将评估表达式
getattr(obj, methodname)(*args, **kwds)
在经理的过程中。
返回的值将是调用结果的副本或新共享对象的代理 - 请参阅 BaseManager.register() 的 method_to_typeid 参数的文档。
如果调用引发异常,则由 _callmethod() 重新引发。 如果在管理器的进程中引发了其他一些异常,那么这将转换为
RemoteError
异常并由 _callmethod() 引发。请特别注意,如果 methodname 尚未被 公开 ,则会引发异常。
_callmethod() 的使用示例:
>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
- _getvalue()
返回引用对象的副本。
如果所指对象是不可选择的,那么这将引发异常。
- __repr__()
返回代理对象的表示。
- __str__()
返回所指对象的表示。
清理
代理对象使用弱引用回调,因此当它被垃圾收集时,它会从拥有其所指对象的管理器中注销自己。
当不再有任何代理引用它时,共享对象将从管理器进程中删除。
进程池
可以创建一个进程池,这些进程将使用 Pool 类执行提交给它的任务。
- class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
一个进程池对象,它控制可以提交作业的工作进程池。 它支持带有超时和回调的异步结果,并具有并行映射实现。
processes 是要使用的工作进程数。 如果 processes 是
None
,则使用 os.cpu_count() 返回的数字。如果 initializer 不是
None
,那么每个工作进程在启动时都会调用initializer(*initargs)
。maxtasksperchild 是工作进程在退出并被新工作进程替换之前可以完成的任务数,以释放未使用的资源。 默认的 maxtasksperchild 是
None
,这意味着工作进程将与池一样长。context 可用于指定用于启动工作进程的上下文。 通常使用上下文对象的函数
multiprocessing.Pool()
或 Pool() 方法创建池。 在这两种情况下,context 都被适当设置。请注意,池对象的方法只能由创建池的进程调用。
警告
multiprocessing.pool 对象具有需要通过使用池作为上下文管理器或通过调用 close() 和 terminate 正确管理的内部资源(像任何其他资源一样) () 手动。 如果不这样做,可能会导致流程在最终确定时挂起。
请注意, 不正确 依赖垃圾收集器来销毁池,因为 CPython 不保证会调用池的终结器(有关更多信息,请参阅 object.__del__()信息)。
3.2 版中的新功能: 每个孩子的最大任务数
3.4 版的新功能: 语境
笔记
池 中的工作进程通常在池的工作队列的整个持续时间内都存在。 在其他系统(例如 Apache、mod_wsgi 等)中发现的一种常见模式,用于释放工作人员持有的资源,是允许池中的工作人员在退出、清理和产生新进程之前仅完成一定数量的工作替换旧的。 Pool 的 maxtasksperchild 参数向最终用户公开了这种能力。
- apply(func[, args[, kwds]])
使用参数 args 和关键字参数 kwds 调用 func。 它阻塞直到结果准备好。 鉴于这些块,apply_async() 更适合并行执行工作。 此外, func 仅在池的其中一个工作线程中执行。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])
apply() 方法的一个变体,它返回一个结果对象。
如果指定了 callback,那么它应该是一个接受单个参数的可调用对象。 当结果准备好时 callback 应用于它,也就是说,除非调用失败,在这种情况下 error_callback 被应用。
如果指定了 error_callback,那么它应该是一个接受单个参数的可调用对象。 如果目标函数失败,则使用异常实例调用 error_callback。
回调应立即完成,否则处理结果的线程将被阻塞。
- map(func, iterable[, chunksize])
map() 内置函数的并行等效项(尽管它仅支持一个 iterable 参数,对于多个可迭代对象,请参阅 starmap())。 它阻塞直到结果准备好。
此方法将可迭代对象分成多个块,并将其作为单独的任务提交给进程池。 这些块的(近似)大小可以通过将 chunksize 设置为正整数来指定。
请注意,它可能会导致很长的迭代的高内存使用率。 考虑使用 imap() 或 imap_unordered() 和显式 chunksize 选项以获得更好的效率。
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])
map() 方法的一个变体,它返回一个结果对象。
如果指定了 callback,那么它应该是一个接受单个参数的可调用对象。 当结果准备好时 callback 应用于它,也就是说,除非调用失败,在这种情况下 error_callback 被应用。
如果指定了 error_callback,那么它应该是一个接受单个参数的可调用对象。 如果目标函数失败,则使用异常实例调用 error_callback。
回调应立即完成,否则处理结果的线程将被阻塞。
- imap(func, iterable[, chunksize])
map() 的懒惰版本。
chunksize 参数与 map() 方法使用的参数相同。 对于非常长的迭代,使用 chunksize 的大值可以使作业完成 比使用默认值
1
快得多 。此外,如果 chunksize 是
1
,那么由 imap() 方法返回的迭代器的next()
方法有一个可选的 timeout ] 参数:next(timeout)
如果在 timeout 秒内无法返回结果,将引发 multiprocessing.TimeoutError。
- imap_unordered(func, iterable[, chunksize])
与 imap() 相同,不同之处在于返回的迭代器的结果排序应被视为任意。 (只有当只有一个工作进程时,才能保证顺序是“正确的”。)
- starmap(func, iterable[, chunksize])
与 map() 类似,除了 iterable 的元素应该是作为参数解包的可迭代对象。
因此,
[(1,2), (3, 4)]
的 可迭代 导致[func(1,2), func(3,4)]
。3.3 版中的新功能。
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
starmap() 和 map_async() 的组合,它在迭代器的 iterable 上进行迭代,并调用 func 并将迭代器解包。 返回一个结果对象。
3.3 版中的新功能。
- close()
防止任何更多的任务被提交到池中。 完成所有任务后,工作进程将退出。
- terminate()
立即停止工作进程而不完成未完成的工作。 当池对象被垃圾回收时 terminate() 将被立即调用。
- join()
等待工作进程退出。 在使用 join() 之前,必须调用 close() 或 terminate()。
3.3 版新功能: 池对象现在支持上下文管理协议 - 请参阅 上下文管理器类型 。 __enter__() 返回池对象,__exit__() 调用 terminate()。
- class multiprocessing.pool.AsyncResult
Pool.apply_async() 和 Pool.map_async() 返回结果的类。
- get([timeout])
结果到达时返回。 如果 timeout 不是
None
并且结果没有在 timeout 秒内到达,那么会引发 multiprocessing.TimeoutError。 如果远程调用引发异常,那么该异常将被 get() 重新引发。
- wait([timeout])
等到结果可用或直到 timeout 秒过去。
- ready()
返回调用是否完成。
- successful()
返回调用是否在不引发异常的情况下完成。 如果结果未准备好,将引发 ValueError。
以下示例演示了池的使用:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
听众和客户
通常进程之间的消息传递是使用队列或使用 Pipe() 返回的 Connection 对象完成的。
然而,multiprocessing.connection 模块允许一些额外的灵活性。 它基本上提供了一个高级的面向消息的 API,用于处理套接字或 Windows 命名管道。 它还支持使用 hmac 模块的 digest authentication,以及同时轮询多个连接。
- multiprocessing.connection.deliver_challenge(connection, authkey)
向连接的另一端发送随机生成的消息并等待回复。
如果回复与使用 authkey 作为密钥的消息摘要匹配,则欢迎消息将发送到连接的另一端。 否则会引发 AuthenticationError。
- multiprocessing.connection.answer_challenge(connection, authkey)
接收消息,以authkey为key计算消息的摘要,然后将摘要发回。
如果未收到欢迎消息,则会引发 AuthenticationError。
- multiprocessing.connection.Client(address[, family[, authkey]])
尝试与使用地址 address 的侦听器建立连接,返回 Connection。
连接的类型由 family 参数决定,但通常可以省略,因为它通常可以从 address 的格式中推断出来。 (见地址格式)
如果给出 authkey 而不是 None,则它应该是一个字节字符串,并将用作基于 HMAC 的身份验证质询的密钥。 如果 authkey 为 None,则不进行身份验证。 AuthenticationError 如果身份验证失败,则会引发。 请参阅 身份验证密钥 。
- class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])
绑定套接字或 Windows 命名管道的包装器,它正在“侦听”连接。
address 是侦听器对象的绑定套接字或命名管道要使用的地址。
笔记
如果使用地址“0.0.0.0”,则该地址将不是 Windows 上的可连接端点。 如果您需要可连接的端点,则应使用“127.0.0.1”。
family 是要使用的套接字(或命名管道)的类型。 这可以是字符串
'AF_INET'
(对于 TCP 套接字)、'AF_UNIX'
(对于 Unix 域套接字)或'AF_PIPE'
(对于 Windows 命名管道)之一。 其中只有第一个保证可用。 如果 family 是None
,则从 address 的格式推断出该家族。 如果 address 也是None
,则选择默认值。 此默认值是假定为最快可用的系列。 请参阅 地址格式 。 请注意,如果 family 为'AF_UNIX'
且地址为None
,则套接字将在使用 tempfile.mkstemp() 创建的私有临时目录中创建.如果侦听器对象使用套接字,则 backlog(默认为 1)在绑定后传递给套接字的 listen() 方法。
如果给出 authkey 而不是 None,则它应该是一个字节字符串,并将用作基于 HMAC 的身份验证质询的密钥。 如果 authkey 为 None,则不进行身份验证。 AuthenticationError 如果身份验证失败,则会引发。 请参阅 身份验证密钥 。
- accept()
接受侦听器对象的绑定套接字或命名管道上的连接并返回 Connection 对象。 如果尝试进行身份验证但失败,则会引发 AuthenticationError。
- close()
关闭侦听器对象的绑定套接字或命名管道。 当侦听器被垃圾收集时会自动调用。 但是,建议明确调用它。
侦听器对象具有以下只读属性:
- address
侦听器对象正在使用的地址。
- last_accepted
最后接受的连接来自的地址。 如果这不可用,则它是
None
。
3.3 版新功能: 监听器对象现在支持上下文管理协议 - 请参阅 上下文管理器类型 。 __enter__() 返回监听对象,__exit__() 调用 close()。
- multiprocessing.connection.wait(object_list, timeout=None)
等待 object_list 中的对象准备就绪。 返回 object_list 中准备好的对象列表。 如果 timeout 是一个浮点数,那么调用最多会阻塞多少秒。 如果 timeout 是
None
那么它将无限期地阻塞。 负超时相当于零超时。对于 Unix 和 Windows,一个对象可以出现在 object_list 中,如果它是
一个可读的 Connection 对象;
一个连接且可读的 socket.socket 对象; 或者
当有数据可供读取或另一端已关闭时,连接或套接字对象已准备就绪。
Unix:
wait(object_list, timeout)
几乎等同于select.select(object_list, [], [], timeout)
。 不同之处在于,如果 select.select() 被信号中断,它会引发 OSError 错误编号为EINTR
,而 wait () 不会。Windows: object_list 中的一项必须是一个可等待的整数句柄(根据 Win32 函数
WaitForMultipleObjects()
的文档中使用的定义)或者它可以是具有fileno()
方法的对象,该方法返回套接字句柄或管道句柄。 (请注意,管道句柄和套接字句柄是 不是 可等待句柄。)3.3 版中的新功能。
例子
以下服务器代码创建一个使用 'secret password'
作为身份验证密钥的侦听器。 然后它等待连接并向客户端发送一些数据:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
以下代码连接到服务器并从服务器接收一些数据:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
以下代码使用 wait() 一次等待来自多个进程的消息:
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
地址格式
'AF_INET'
地址是(hostname, port)
形式的元组,其中 hostname 是字符串,port 是整数。'AF_UNIX'
地址是表示文件系统上的文件名的字符串。- *;
'AF_PIPE'
地址是以下形式的字符串r'\.\pipe{PipeName}'
。 要使用 Client() 连接到名为 ServerName 的远程计算机上的命名管道,应改用r'\ServerName\pipe{PipeName}'
形式的地址。
请注意,任何以两个反斜杠开头的字符串都被默认假定为 'AF_PIPE'
地址而不是 'AF_UNIX'
地址。
认证密钥
当使用 Connection.recv 时,接收到的数据会自动解压。 不幸的是,从不受信任的来源提取数据是一种安全风险。 因此 Listener 和 Client() 使用 hmac 模块提供摘要认证。
认证密钥是一个字节串,可以被认为是一个密码:一旦建立连接,两端将要求证明对方知道认证密钥。 (证明两端都使用相同的密钥 不是 涉及通过连接发送密钥。)
如果请求身份验证但未指定身份验证密钥,则使用 current_process().authkey
的返回值(请参阅 Process)。 该值将由当前进程创建的任何 Process 对象自动继承。 这意味着(默认情况下)多进程程序的所有进程将共享一个身份验证密钥,该密钥可在它们之间建立连接时使用。
还可以使用 os.urandom() 生成合适的身份验证密钥。
日志记录
一些日志支持是可用的。 但是请注意,logging 包不使用进程共享锁,因此来自不同进程的消息可能(取决于处理程序类型)混淆。
- multiprocessing.get_logger()
返回 multiprocessing 使用的记录器。 如有必要,将创建一个新的。
首次创建时,记录器具有级别
logging.NOTSET
并且没有默认处理程序。 默认情况下,发送到此记录器的消息不会传播到根记录器。请注意,在 Windows 上,子进程将仅继承父进程的记录器级别——记录器的任何其他自定义都不会被继承。
- multiprocessing.log_to_stderr()
- 此函数执行对 get_logger() 的调用,但除了返回由 get_logger 创建的记录器外,它还添加了一个处理程序,使用格式
'[%(levelname)s/%(processName)s] %(message)s'
将输出发送到 sys.stderr ]。
以下是打开日志记录的示例会话:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
有关日志级别的完整表,请参阅 logging 模块。
multiprocessing.dummy 模块
multiprocessing.dummy 复制了 multiprocessing 的 API,但只不过是 threading 模块的包装器。
特别是multiprocessing.dummy提供的Pool
函数返回一个ThreadPool的实例,它是Pool的一个子类,支持所有相同的方法调用但使用工作线程池而不是工作进程。
- class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])
一个线程池对象,它控制可以提交作业的工作线程池。 ThreadPool 实例与 Pool 实例的接口完全兼容,并且它们的资源也必须得到适当的管理,通过使用池作为上下文管理器或通过调用 close()[ X213X] 和 terminate() 手动。
processes 是要使用的工作线程数。 如果 processes 是
None
,则使用 os.cpu_count() 返回的数字。如果 initializer 不是
None
,那么每个工作进程在启动时都会调用initializer(*initargs)
。与 Pool 不同,无法提供 maxtasksperchild 和 context。
笔记
ThreadPool 与 Pool 共享相同的接口,它是围绕进程池设计的,并且早于 concurrent.futures 模块的引入。 因此,它继承了一些对于由线程支持的池没有意义的操作,并且它有自己的类型来表示异步作业的状态,AsyncResult,这是任何其他库都无法理解的.
用户通常应该更喜欢使用 concurrent.futures.ThreadPoolExecutor,它有一个更简单的接口,从一开始就围绕线程设计,并返回兼容的 concurrent.futures.Future 实例与许多其他库,包括 asyncio。
编程指南
使用 multiprocessing 时应遵守某些准则和习惯用法。
所有启动方法
以下内容适用于所有启动方法。
避免共享状态
应尽可能避免在进程之间转移大量数据。
最好坚持使用队列或管道进行进程之间的通信,而不是使用较低级别的同步原语。
酸洗性
确保代理方法的参数是可挑选的。
代理的线程安全
不要使用来自多个线程的代理对象,除非你用锁保护它。
(使用 same 代理的不同进程永远不会出现问题。)
加入僵尸进程
在 Unix 上,当一个进程完成但尚未加入时,它会变成僵尸。 永远不应该有很多,因为每次一个新进程启动(或 active_children() 被调用)所有尚未加入的已完成进程都将被加入。 同时调用已完成进程的 Process.is_alive 将加入该进程。 即便如此,明确加入您启动的所有流程可能是一种很好的做法。
继承比pickle/unpickle更好
当使用 spawn 或 forkserver 启动方法时,multiprocessing 中的许多类型需要是可pickle的,以便子进程可以使用它们。 但是,通常应该避免使用管道或队列将共享对象发送到其他进程。 相反,您应该安排程序,以便需要访问在别处创建的共享资源的进程可以从祖先进程继承它。
避免终止进程
使用 Process.terminate 方法停止进程很可能导致进程当前正在使用的任何共享资源(例如锁、信号量、管道和队列)损坏或对其他进程不可用。
因此,最好只考虑在从不使用任何共享资源的进程上使用 Process.terminate。
加入使用队列的进程
请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲的项目都由“馈送器”线程馈送到底层管道。 (子进程可以调用队列的 Queue.cancel_join_thread 方法来避免这种行为。)
这意味着无论何时使用队列,您都需要确保已放入队列的所有项目在加入进程之前最终将被删除。 否则,您无法确定将项目放入队列的进程会终止。 还要记住,非守护进程将自动加入。
一个会死锁的例子如下:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()
这里的解决方法是交换最后两行(或简单地删除
p.join()
行)。
显式地将资源传递给子进程
在使用 fork 启动方法的 Unix 上,子进程可以使用在使用全局资源的父进程中创建的共享资源。 但是,最好将对象作为参数传递给子进程的构造函数。
除了使代码(可能)与 Windows 和其他启动方法兼容之外,这还确保只要子进程仍然存在,对象就不会在父进程中被垃圾收集。 如果在父进程中垃圾收集对象时释放了某些资源,这可能很重要。
所以例如
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()
应该改写为
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
当心将 sys.stdin 替换为“类似文件的对象”
multiprocessing 原先无条件调用:
os.close(sys.stdin.fileno())
在
multiprocessing.Process._bootstrap()
方法中——这导致了进程中的问题。 这已更改为:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)
这解决了进程相互冲突导致错误文件描述符错误的基本问题,但给将 sys.stdin() 替换为具有输出缓冲的“类文件对象”的应用程序引入了潜在危险. 这种危险在于,如果多个进程在这个类文件对象上调用 close(),可能会导致多次将相同的数据刷新到对象中,从而导致损坏。
如果您编写一个类似文件的对象并实现自己的缓存,则可以通过在附加到缓存时存储 pid 并在 pid 更改时丢弃缓存来使其安全。 例如:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache
有关更多信息,请参阅 :issue:`5155`、:issue:`5313` 和 :issue:`5331`
spawn 和 forkserver 启动方法
有一些额外的限制不适用于 fork 启动方法。
更好的可腌制性
确保
Process.__init__()
的所有参数都是可腌制的。 此外,如果您将 Process 子类化,请确保在调用 Process.start 方法时实例将是可酸洗的。
全局变量
请记住,如果在子进程中运行的代码尝试访问全局变量,那么它看到的值(如果有)可能与 Process.start[ 时父进程中的值不同。 X205X] 被调用。
然而,只是模块级常量的全局变量不会引起任何问题。
安全导入主模块
确保新的 Python 解释器可以安全地导入主模块,而不会导致意外的副作用(例如启动新进程)。
例如,使用 spawn 或 forkserver 启动方法运行以下模块将失败并显示 RuntimeError:
from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()
相反,应该通过使用
if __name__ == '__main__':
来保护程序的“入口点”,如下所示:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()
(如果程序将正常运行而不是冻结,则可以省略
freeze_support()
行。)这允许新生成的 Python 解释器安全地导入模块,然后运行模块的
foo()
函数。如果在主模块中创建了池或管理器,则类似的限制适用。
例子
演示如何创建和使用自定义管理器和代理:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
使用 池 :
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
显示如何使用队列将任务提供给工作进程集合并收集结果的示例:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()