16.6. multiprocessing — 基于进程的“线程”接口 — Python 文档

来自菜鸟教程
Python/docs/2.7/library/multiprocessing
跳转至:导航、​搜索

16.6. 多处理 — 基于进程的“线程”接口

2.6 版中的新功能。


16.6.1. 介绍

multiprocessing 是一个使用类似于 threading 模块的 API 支持生成进程的包。 multiprocessing 包提供本地和远程并发,通过使用子进程而不是线程有效地绕过 全局解释器锁 。 因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。 它可以在 Unix 和 Windows 上运行。

multiprocessing 模块还引入了在 threading 模块中没有类似物的 API。 一个主要的例子是 Pool 对象,它提供了一种方便的方法来并行化跨多个输入值的函数执行,跨进程分布输入数据(数据并行)。 以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。 这个使用 Pool 的数据并行的基本示例,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))

将打印到标准输出

[1, 4, 9]

16.6.1.1. 这过程班级

multiprocessing 中,通过创建 Process 对象然后调用其 start() 方法来生成进程。 Process 遵循 threading.Thread 的 API。 多进程程序的一个简单例子是

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

为了显示所涉及的各个进程 ID,这里是一个扩展示例:

from multiprocessing import Process
import os

def info(title):
    print title
    print 'module name:', __name__
    if hasattr(os, 'getppid'):  # only available on Unix
        print 'parent process:', os.getppid()
    print 'process id:', os.getpid()

def f(name):
    info('function f')
    print 'hello', name

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

有关为什么(在 Windows 上)需要 if __name__ == '__main__' 部分的解释,请参阅 编程指南


16.6.1.2. 在进程之间交换对象

multiprocessing支持两种进程间通信通道:

队列

Queue 类是 Queue.Queue 的近似克隆。 例如:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    # prints "[42, None, 'hello']"
    p.join()

队列是线程和进程安全的。


管道

Pipe() 函数返回一对由管道连接的连接对象,管道默认为双工(双向)。 例如:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print parent_conn.recv()   # prints "[42, None, 'hello']"
    p.join()

Pipe() 返回的两个连接对象代表管道的两端。 每个连接对象都有 send()recv() 方法(等等)。 请注意,如果两个进程(或线程)尝试同时读取或写入管道的 相同 端,则管道中的数据可能会损坏。 当然,同时使用管道不同端的进程不存在损坏的风险。


16.6.1.3. 进程间同步

multiprocessing 包含来自 threading 的所有同步原语的等价物。 例如,可以使用锁来确保一次只有一个进程打印到标准输出:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    print 'hello world', i
    l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

如果不使用来自不同进程的锁输出,很容易混淆。


16.6.1.4. 进程间共享状态

如上所述,在进行并发编程时,通常最好尽可能避免使用共享状态。 使用多个进程时尤其如此。

但是,如果您确实需要使用某些共享数据,那么 multiprocessing 提供了几种这样做的方法。

共享内存

可以使用 ValueArray 将数据存储在共享内存映射中。 例如,下面的代码

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

将打印

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

创建 numarr 时使用的 'd''i' 参数是 array 模块使用的类型代码:[X142X ] 表示双精度浮点数,'i' 表示有符号整数。 这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意 ctypes 对象。


服务器进程

Manager() 返回的管理器对象控制服务器进程,该进程保存 Python 对象并允许其他进程使用代理操作它们。

Manager() 返回的管理器将支持类型 listdictNamespaceLockRLock ]、信号量有界信号量条件事件队列、、[X2]X2] 阵列。 例如,

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    l = manager.list(range(10))

    p = Process(target=f, args=(d, l))
    p.start()
    p.join()

    print d
    print l

将打印

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。 此外,单个管理器可以由网络上不同计算机上的进程共享。 然而,它们比使用共享内存慢。


16.6.1.5. 使用工人池

Pool 类代表一个工作进程池。 它具有允许以几种不同方式将任务卸载到工作进程的方法。

例如:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    # print "[0, 1, 4,..., 81]"
    print pool.map(f, range(10))

    # print same numbers in arbitrary order
    for i in pool.imap_unordered(f, range(10)):
        print i

    # evaluate "f(20)" asynchronously
    res = pool.apply_async(f, (20,))      # runs in *only* one process
    print res.get(timeout=1)              # prints "400"

    # evaluate "os.getpid()" asynchronously
    res = pool.apply_async(os.getpid, ()) # runs in *only* one process
    print res.get(timeout=1)              # prints the PID of that process

    # launching multiple evaluations asynchronously *may* use more processes
    multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
    print [res.get(timeout=1) for res in multiple_results]

    # make a single worker sleep for 10 secs
    res = pool.apply_async(time.sleep, (10,))
    try:
        print res.get(timeout=1)
    except TimeoutError:
        print "We lacked patience and got a multiprocessing.TimeoutError"

请注意,池的方法只能由创建它的进程使用。

笔记

此包中的功能要求 __main__ 模块可由子项导入。 这在 编程指南 中有介绍,但值得在这里指出。 这意味着某些示例,例如 Pool 示例在交互式解释器中不起作用。 例如:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(如果您尝试这样做,它实际上会输出以半随机方式交错的三个完整回溯,然后您可能不得不以某种方式停止主进程。)


16.6.2. 参考

multiprocessing 包主要复制了 threading 模块的 API。

16.6.2.1. 过程和例外

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})

流程对象表示在单独流程中运行的活动。 Process 类具有 threading.Thread 的所有方法的等价物。

应始终使用关键字参数调用构造函数。 group 应该总是 None; 它的存在仅仅是为了与 threading.Thread 兼容。 target 是由 run() 方法调用的可调用对象。 它默认为 None,意味着什么都不调用。 name 是进程名。 默认情况下,唯一名称的构造形式为 'Process-N1:N2:...:Nk' 其中 N1[ X114X],N2,…,Nk 是一个整数序列,其长度由进程的 generation 决定。 args 是目标调用的参数元组。 kwargs 是目标调用的关键字参数字典。 默认情况下,没有参数传递给 target

如果子类覆盖构造函数,它必须确保在对进程执行任何其他操作之前调用基类构造函数 (Process.__init__())。

run()

表示流程活动的方法。

您可以在子类中覆盖此方法。 标准的 run() 方法调用传递给对象构造函数的可调用对象作为目标参数,如果有的话,顺序和关键字参数取自 args 和 kwargs[ X211X] 参数,分别。

start()

启动流程的活动。

每个进程对象最多必须调用一次。 它安排在单独的进程中调用对象的 run() 方法。

join([timeout])

阻塞调用线程,直到调用其 join() 方法的进程终止或直到发生可选超时。

如果 timeoutNone 则没有超时。

一个进程可以多次加入。

进程不能加入自身,因为这会导致死锁。 在进程启动之前尝试加入进程是错误的。

name

进程的名称。

该名称是一个仅用于识别目的的字符串。 它没有语义。 多个进程可能会被赋予相同的名称。 初始名称由构造函数设置。

is_alive()

返回进程是否存活。

粗略地说,进程对象从 start() 方法返回的那一刻起一直处于活动状态,直到子进程终止。

daemon

进程的守护进程标志,一个布尔值。 这必须在调用 start() 之前设置。

初始值继承自创建过程。

当一个进程退出时,它会尝试终止其所有守护进程。

请注意,不允许守护进程创建子进程。 否则,如果守护进程在其父进程退出时终止,它的子进程将成为孤儿。 此外,这些是 不是 Unix 守护进程或服务,它们是正常进程,如果非守护进程退出,它们将被终止(而不是加入)。

除了 threading.Thread API,Process 对象还支持以下属性和方法:

pid

返回进程 ID。 在进程产生之前,这将是 None

exitcode

孩子的退出代码。 如果进程尚未终止,这将是 None。 负值 -N 表示子进程被信号 N 终止。

authkey

进程的身份验证密钥(字节字符串)。

multiprocessing 初始化时,主进程会使用 os.urandom() 分配一个随机字符串。

当创建 Process 对象时,它将继承其父进程的身份验证密钥,尽管这可以通过将 authkey 设置为另一个字节字符串来更改。

请参阅 身份验证密钥

terminate()

终止进程。 在 Unix 上,这是使用 SIGTERM 信号完成的; 在 Windows 上使用 TerminateProcess()。 请注意,不会执行退出处理程序和 finally 子句等。

请注意,进程的后代进程将 不会 被终止——它们只会成为孤立的。

警告

如果在关联进程正在使用管道或队列时使用此方法,则管道或队列可能会损坏,并且可能无法被其他进程使用。 类似地,如果进程获得了锁或信号量等。 然后终止它很容易导致其他进程死锁。

注意 start()join()is_alive()terminate()exitcode方法只能由创建进程对象的进程调用。

Process的一些方法的使用示例:

>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print p, p.is_alive()
<Process(Process-1, initial)> False
>>> p.start()
>>> print p, p.is_alive()
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print p, p.is_alive()
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.BufferTooShort

Connection.recv_bytes_into() 当提供的缓冲区对象太小而无法读取消息时引发异常。

如果 eBufferTooShort 的实例,则 e.args[0] 将把消息作为字节串给出。


16.6.2.2. 管道和队列

当使用多个进程时,通常使用消息传递进行进程之间的通信,并避免使用任何同步原语,如锁。

对于传递消息,可以使用 Pipe()(用于两个进程之间的连接)或队列(允许多个生产者和消费者)。

Queuemultiprocessing.queues.SimpleQueueJoinableQueue 类型是基于 Queue.Queue 建模的多生产者、多消费者 FIFO 队列] 标准库中的类。 它们的区别在于 Queue 缺少 Python 2.5 的 Queue.Queue 类中引入的 task_done()join() 方法。

如果您使用 JoinableQueue 那么您 必须 为每个从队列中移除的任务调用 JoinableQueue.task_done() 或者用于计算未完成任务数的信号量最终可能会溢出,引发异常。

请注意,还可以使用管理器对象创建共享队列——参见 Managers

笔记

multiprocessing 使用通常的 Queue.EmptyQueue.Full 异常来表示超时。 它们在 multiprocessing 命名空间中不可用,因此您需要从 Queue 导入它们。


笔记

当一个对象被放入队列时,该对象被腌制,后台线程稍后将腌制的数据刷新到底层管道。 这会产生一些令人惊讶的后果,但不应该造成任何实际困难——如果它们真的打扰你,那么你可以改用用 管理器 创建的队列。

  1. 将对象放入空队列后,在队列的 empty() 方法返回 Falseget_nowait() 可以在不引发 [ X197X]队列.空。
  2. 如果多个进程正在排队对象,则对象可能会在另一端无序接收。 但是,由同一进程排队的对象将始终按预期顺序排列。


警告

如果进程在尝试使用 Queue 时使用 Process.terminate()os.kill() 被杀死,则队列中的数据很可能会损坏。 这可能会导致任何其他进程在稍后尝试使用队列时出现异常。


警告

如上所述,如果子进程已将项目放入队列(并且尚未使用 JoinableQueue.cancel_join_thread),则该进程将不会终止,直到所有缓冲的项目都已刷新到管道中。

这意味着,如果您尝试加入该进程,您可能会遇到死锁,除非您确定放入队列的所有项目都已被消耗。 类似地,如果子进程是非守护进程,那么当它尝试加入其所有非守护进程时,父进程可能会在退出时挂起。

请注意,使用管理器创建的队列不存在此问题。 请参阅 编程指南


有关使用队列进行进程间通信的示例,请参阅 示例

multiprocessing.Pipe([duplex])

返回表示管道末端的 Connection 对象的一对 (conn1, conn2)

如果 duplexTrue(默认值),则管道是双向的。 如果 duplexFalse 则管道是单向的:conn1 只能用于接收消息,而 conn2 只能用于发送消息。

class multiprocessing.Queue([maxsize])

返回使用管道和一些锁/信号量实现的进程共享队列。 当一个进程第一次将一个项目放入队列时,一个馈线线程将启动,它将对象从缓冲区传输到管道中。

标准库的 Queue 模块中常见的 Queue.EmptyQueue.Full 异常被引发以发出超时信号。

Queue 实现了 Queue.Queue 的所有方法,除了 task_done()join()

qsize()

返回队列的大致大小。 由于多线程/多处理语义,这个数字是不可靠的。

请注意,这可能会在未实现 sem_getvalue() 的 Mac OS X 等 Unix 平台上引发 NotImplementedError

empty()

如果队列为空,则返回 True,否则返回 False。 由于多线程/多处理语义,这是不可靠的。

full()

如果队列已满,则返回 True,否则返回 False。 由于多线程/多处理语义,这是不可靠的。

put(obj[, block[, timeout]])

将 obj 放入队列中。 如果可选参数 blockTrue(默认值)并且 timeoutNone(默认值),则在必要时阻塞直到空闲插槽可用的。 如果 timeout 是一个正数,它最多阻塞 timeout 秒,如果在这段时间内没有可用的空闲插槽,则会引发 Queue.Full 异常。 否则 (blockFalse),如果空闲插槽立即可用,则将项目放在队列中,否则引发 Queue.Full 异常 (timeout 在这种情况下被忽略)。

put_nowait(obj)

相当于 put(obj, False)

get([block[, timeout]])

从队列中移除并返回一个项目。 如果可选参数 blockTrue(默认值)并且 timeoutNone(默认值),则在必要时阻止,直到项目可用。 如果 timeout 是一个正数,它最多阻塞 timeout 秒,如果在这段时间内没有可用的项目,则会引发 Queue.Empty 异常。 否则(块是 False),如果一个项目立即可用,则返回一个项目,否则引发 Queue.Empty 异常(timeout 在这种情况下被忽略)。

get_nowait()

相当于 get(False)

Queue 有一些在 Queue.Queue 中没有的额外方法。 大多数代码通常不需要这些方法:

close()

指示当前进程不会将更多数据放入此队列。 一旦将所有缓冲数据刷新到管道,后台线程将退出。 当队列被垃圾收集时会自动调用。

join_thread()

加入后台线程。 这只能在 close() 被调用后使用。 它会阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道中。

默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。 进程可以调用 cancel_join_thread() 使 join_thread() 什么都不做。

cancel_join_thread()

防止 join_thread() 阻塞。 特别是,这会阻止后台线程在进程退出时自动加入——参见 join_thread()

此方法的更好名称可能是 allow_exit_without_flush()。 它很可能会导致排队的数据丢失,您几乎肯定不需要使用它。 只有当您需要当前进程立即退出而不等待将排队的数据刷新到底层管道时,它才真正存在,并且您不关心丢失的数据。

笔记

此类的功能需要在主机操作系统上实现有效的共享信号量。 如果没有,该类中的功能将被禁用,并且尝试实例化 Queue 将导致 ImportError。 有关其他信息,请参阅 :issue:`3770`。 这同样适用于下面列出的任何专用队列类型。

class multiprocessing.queues.SimpleQueue

它是一个简化的 Queue 类型,非常接近一个锁定的 Pipe

empty()

如果队列为空,则返回 True,否则返回 False

get()

从队列中移除并返回一个项目。

put(item)

item 放入队列。

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue 是一个 Queue 子类,是一个额外具有 task_done()join() 方法的队列。

task_done()

指示以前排队的任务已完成。 由队列消费者线程使用。 对于用于获取任务的每个 get(),随后对 task_done() 的调用告诉队列该任务的处理已完成。

如果 join() 当前正在阻塞,它会在所有项目都处理完毕后恢复(意味着每个已经被 放置的项目都收到了一个 task_done() 调用() 进入队列)。

如果调用次数多于队列中放置的项目,则引发 ValueError

join()

阻塞直到队列中的所有项目都被获取和处理。

每当将项目添加到队列时,未完成任务的计数就会增加。 每当消费者线程调用 task_done() 以指示该项目已被检索并且其上的所有工作已完成时,计数就会下降。 当未完成任务的数量降为零时,join() 解除阻塞。


16.6.2.3. 各种各样的

multiprocessing.active_children()

返回当前进程的所有活动子进程的列表。

调用它具有“加入”任何已经完成的进程的副作用。

multiprocessing.cpu_count()
返回系统中的 CPU 数量。 可以提高 NotImplementedError
multiprocessing.current_process()

返回当前进程对应的Process对象。

threading.current_thread() 的类似物。

multiprocessing.freeze_support()

添加对使用 multiprocessing 的程序被冻结以生成 Windows 可执行文件时的支持。 (已使用 py2exePyInstallercx_Freeze 进行测试。)

需要在主模块的 if __name__ == '__main__' 行之后直接调用此函数。 例如:

from multiprocessing import Process, freeze_support

def f():
    print 'hello world!'

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

如果省略 freeze_support() 行,则尝试运行冻结的可执行文件将引发 RuntimeError

在 Windows 以外的任何操作系统上调用时,调用 freeze_support() 不起作用。 另外,如果模块在Windows上被Python解释器正常运行(程序没有被冻结),那么freeze_support()是没有作用的。

multiprocessing.set_executable()

设置启动子进程时要使用的 Python 解释器的路径。 (默认情况下使用 sys.executable)。 嵌入者可能需要做一些类似的事情

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

在他们可以创建子进程之前。 (仅限 Windows)

16.6.2.4. 连接对象

连接对象允许发送和接收可腌制的对象或字符串。 它们可以被认为是面向消息的连接套接字。

连接对象通常使用 Pipe 创建——另见 Listeners 和 Clients

class Connection
send(obj)

将对象发送到连接的另一端,应使用 recv() 读取该对象。

该对象必须是可腌制的。 非常大的泡菜(大约 32 MB+,但取决于操作系统)可能会引发 ValueError 异常。

recv()

使用 send() 返回从连接另一端发送的对象。 阻塞直到有东西要接收。 如果没有任何东西可接收且另一端已关闭,则升高 EOFError

fileno()

返回连接使用的文件描述符或句柄。

close()

关闭连接。

当连接被垃圾收集时会自动调用。

poll([timeout])

返回是否有任何数据可供读取。

如果未指定 timeout,则它将立即返回。 如果 timeout 是一个数字,那么它指定阻塞的最长时间(以秒为单位)。 如果 timeoutNone,则使用无限超时。

send_bytes(buffer[, offset[, size]])

从支持缓冲区接口的对象发送字节数据作为完整消息。

如果给出 offset,则从 buffer 中的该位置读取数据。 如果给出 size ,那么将从缓冲区读取许多字节。 非常大的缓冲区(大约 32 MB+,但取决于操作系统)可能会引发 ValueError 异常

recv_bytes([maxlength])

以字符串形式返回从连接另一端发送的字节数据的完整消息。 阻塞直到有东西要接收。 如果没有任何东西可接收且另一端已关闭,则升高 EOFError

如果指定了 maxlength 并且消息长于 maxlength,则引发 IOError 并且连接将不再可读。

recv_bytes_into(buffer[, offset])

将连接另一端发送的字节数据的完整消息读入 buffer,并返回消息中的字节数。 阻塞直到有东西要接收。 如果没有任何东西可接收且另一端已关闭,则升高 EOFError

buffer 必须是满足可写缓冲区接口的对象。 如果给出 offset,则消息将从该位置写入缓冲区。 偏移量必须是一个小于 buffer 的长度(以字节为单位)的非负整数。

如果缓冲区太短,则会引发 BufferTooShort 异常,并且完整的消息可用作 e.args[0],其中 e 是异常实例。

例如:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes('thank you')
>>> a.recv_bytes()
'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

Connection.recv() 方法会自动解开它接收到的数据,除非您可以信任发送消息的进程,否则这可能存在安全风险。

因此,除非连接对象是使用 Pipe() 生成的,否则您应该只在执行某种身份验证后使用 recv()send() 方法。 请参阅 身份验证密钥


警告

如果进程在尝试读取或写入管道时被终止,则管道中的数据可能会损坏,因为可能无法确定消息边界在哪里。


16.6.2.5. 同步原语

通常同步原语在多进程程序中不像在多线程程序中那样必要。 请参阅 threading 模块的文档。

请注意,还可以使用管理器对象创建同步原语——参见 Managers

class multiprocessing.BoundedSemaphore([value])

有界信号量对象:与 threading.BoundedSemaphore 类似。

与其接近的模拟存在一个单独的区别:它的 acquire 方法的第一个参数被命名为 block 并且它支持可选的第二个参数 timeout,与 一致Lock.acquire()

笔记

在 Mac OS X 上,这与 Semaphore 没有区别,因为 sem_getvalue() 未在该平台上实现。

class multiprocessing.Condition([lock])

条件变量:threading.Condition 的克隆。

如果指定了 lock,那么它应该是来自 multiprocessingLockRLock 对象。

class multiprocessing.Event

threading.Event 的克隆。 此方法在退出时返回内部信号量的状态,因此它将始终返回 True,除非给出超时和操作超时。

2.7 版本变化: 以前,该方法总是返回 None

class multiprocessing.Lock

非递归锁对象:与 threading.Lock 的相似。 一旦进程或线程获得了锁,后续从任何进程或线程获取它的尝试都会阻塞,直到它被释放; 任何进程或线程都可以释放它。 threading.Lock 适用于线程的概念和行为在 multiprocessing.Lock 中复制,因为它适用于进程或线程,除非另有说明。

请注意, Lock 实际上是一个工厂函数,它返回一个用默认上下文初始化的 multiprocessing.synchronize.Lock 实例。

Lock 支持 上下文管理器 协议,因此可以在 语句中使用。

acquire(block=True, timeout=None)

获取锁,阻塞或非阻塞。

block 参数设置为 True(默认值),方法调用将阻塞,直到锁处于解锁状态,然后将其设置为锁定并返回 True . 请注意,第一个参数的名称与 threading.Lock.acquire() 中的名称不同。

block 参数设置为 False,方法调用不会阻塞。 如果锁当前处于锁定状态,则返回False; 否则将锁设置为锁定状态并返回 True

当使用 timeout 的正浮点值调用时,只要无法获取锁,最多阻塞 timeout 指定的秒数。 timeout 为负值的调用等效于 timeout 为零。 timeout 值为 None(默认值)的调用将超时期限设置为无限。 如果 block 参数设置为 False 并因此被忽略,则 timeout 参数没有实际意义。 如果已获取锁,则返回 True;如果超时时间已过,则返回 False。 请注意,timeout 参数在此方法的模拟中不存在,threading.Lock.acquire()

release()

释放锁。 这可以从任何进程或线程调用,而不仅仅是最初获取锁的进程或线程。

行为与 threading.Lock.release() 中的相同,除了在解锁的锁上调用时,会引发 ValueError

class multiprocessing.RLock

递归锁对象:与 threading.RLock 的相似。 递归锁必须由获取它的进程或线程释放。 一旦一个进程或线程获得了递归锁,同一个进程或线程就可以再次获得它而不会阻塞; 该进程或线程必须在每次获得它时释放它一次。

请注意, RLock 实际上是一个工厂函数,它返回一个用默认上下文初始化的 multiprocessing.synchronize.RLock 实例。

RLock 支持 上下文管理器 协议,因此可用于 with 语句。

acquire(block=True, timeout=None)

获取锁,阻塞或非阻塞。

当调用 block 参数设置为 True 时,阻塞直到锁处于解锁状态(不属于任何进程或线程),除非当前进程已经拥有该锁或线。 然后当前进程或线程获得锁的所有权(如果它尚未拥有所有权),并且锁内的递归级别递增 1,导致返回值 True。 请注意,与 threading.RLock.acquire() 的实现相比,第一个参数的行为有几个不同之处,从参数本身的名称开始。

当调用 block 参数设置为 False 时,不要阻塞。 如果锁已经被另一个进程或线程获取(并因此拥有),则当前进程或线程不取得所有权并且锁内的递归级别不会改变,导致返回值 [X233X ]。 如果锁处于未锁定状态,则当前进程或线程取得所有权并递增递归级别,导致返回值 True

timeout 参数的使用和行为与 Lock.acquire() 中的相同。 请注意,timeout 参数在此方法的模拟中不存在,threading.RLock.acquire()

release()

释放锁,递减递归级别。 如果递减后递归级别为零,则将锁重置为解锁(不属于任何进程或线程),并且如果任何其他进程或线程被阻塞,等待锁解锁,则允许其中一个继续进行。 如果递减后递归级别仍然不为零,则锁保持锁定并由调用进程或线程拥有。

仅当调用进程或线程拥有锁时才调用此方法。 如果此方法由除所有者之外的进程或线程调用,或者如果锁处于解锁(未拥有)状态,则会引发 AssertionError。 请注意,在这种情况下引发的异常类型与 threading.RLock.release() 中实现的行为不同。

class multiprocessing.Semaphore([value])

信号量对象:与 threading.Semaphore 的相似。

与其接近的模拟存在一个单独的区别:它的 acquire 方法的第一个参数被命名为 block 并且它支持可选的第二个参数 timeout,与 一致Lock.acquire()

笔记

BoundedSemaphoreLockRLockSemaphoreacquire()方法有一个超时参数,不支持穿线。 签名是acquire(block=True, timeout=None),关键字参数是可以接受的。 如果 blockTrue 并且 timeout 不是 None 那么它指定一个以秒为单位的超时。 如果 blockFalse,则忽略 timeout

在 Mac OS X 上,sem_timedwait 不受支持,因此使用超时调用 acquire() 将使用睡眠循环模拟该函数的行为。


笔记

如果 Ctrl-C 生成的 SIGINT 信号到达而主线程被调用 BoundedSemaphore.acquire(), Lock.acquire(), RLock. Acquire()Semaphore.acquire()Condition.acquire()Condition.wait() 则调用将立即中断并引发 KeyboardInterrupt

这与 threading 的行为不同,其中 SIGINT 在等效阻塞调用正在进行时将被忽略。


笔记

此包的某些功能需要在主机操作系统上实现有效的共享信号量。 如果没有,multiprocessing.synchronize 模块将被禁用,尝试导入它会导致一个 ImportError。 有关其他信息,请参阅 :issue:`3770`


16.6.2.6. 共享类型对象

可以使用可由子进程继承的共享内存来创建共享对象。

multiprocessing.Value(typecode_or_type, *args[, lock])

返回从共享内存分配的 ctypes 对象。 默认情况下,返回值实际上是对象的同步包装器。

typecode_or_type 确定返回对象的类型:它要么是 ctypes 类型,要么是 array 模块使用的那种类型的单字符类型代码。 *args 传递给类型的构造函数。

如果 lockTrue(默认值),则创建一个新的递归锁对象来同步对该值的访问。 如果 lockLockRLock 对象,那么它将用于同步对值的访问。 如果 lockFalse 则对返回对象的访问将不会自动受到锁的保护,因此它不一定是“进程安全的”。

+= 这样涉及读写的操作不是原子操作。 因此,例如,如果您想以原子方式递增共享值,仅执行此操作是不够的

counter.value += 1

假设关联的锁是递归的(默认情况下是这样),您可以改为

with counter.get_lock():
    counter.value += 1

请注意, lock 是仅关键字参数。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

返回从共享内存分配的 ctypes 数组。 默认情况下,返回值实际上是数组的同步包装器。

typecode_or_type 决定了返回数组元素的类型:它要么是一个 ctypes 类型,要么是 array 模块使用的那种类型的单字符类型代码。 如果 size_or_initializer 是一个整数,那么它决定了数组的长度,并且数组最初会被清零。 否则,size_or_initializer 是一个用于初始化数组的序列,其长度决定了数组的长度。

如果 lockTrue(默认值),则会创建一个新的锁对象来同步对该值的访问。 如果 lockLockRLock 对象,那么它将用于同步对值的访问。 如果 lockFalse 则对返回对象的访问将不会自动受到锁的保护,因此它不一定是“进程安全的”。

请注意, lock 是仅关键字参数。

请注意,ctypes.c_char 数组具有 valueraw 属性,允许使用它来存储和检索字符串。

16.6.2.6.1。 这 multiprocessing.sharedctypes 模块

multiprocessing.sharedctypes 模块提供了从共享内存中分配 ctypes 对象的函数,这些对象可以被子进程继承。

笔记

尽管可以在共享内存中存储指针,但请记住,这将引用特定进程地址空间中的位置。 但是,该指针很可能在第二个进程的上下文中无效,并且尝试从第二个进程中取消引用该指针可能会导致崩溃。


multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

返回从共享内存分配的 ctypes 数组。

typecode_or_type 决定了返回数组元素的类型:它要么是一个 ctypes 类型,要么是 array 模块使用的那种类型的单字符类型代码。 如果 size_or_initializer 是一个整数,那么它确定数组的长度,并且数组最初将被清零。 否则 size_or_initializer 是一个用于初始化数组的序列,其长度决定了数组的长度。

请注意,设置和获取元素可能是非原子的——使用 Array() 来确保使用锁自动同步访问。

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

返回从共享内存分配的 ctypes 对象。

typecode_or_type 确定返回对象的类型:它要么是 ctypes 类型,要么是 array 模块使用的那种类型的单字符类型代码。 *args 传递给类型的构造函数。

请注意,设置和获取值可能是非原子的——使用 Value() 来确保使用锁自动同步访问。

请注意,ctypes.c_char 数组具有 valueraw 属性,允许使用它来存储和检索字符串 - 请参阅 ctypes 的文档]。

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *args[, lock])

RawArray() 相同,除了取决于 lock 的值,可能会返回进程安全同步包装器而不是原始 ctypes 数组。

如果 lockTrue(默认值),则会创建一个新的锁对象来同步对该值的访问。 如果 lockLockRLock 对象,那么它将用于同步对值的访问。 如果 lockFalse 则对返回对象的访问将不会自动受到锁的保护,因此它不一定是“进程安全的”。

请注意, lock 是仅关键字参数。

multiprocessing.sharedctypes.Value(typecode_or_type, *args[, lock])

RawValue() 相同,除了取决于 lock 的值,可能会返回进程安全同步包装器而不是原始 ctypes 对象。

如果 lockTrue(默认值),则会创建一个新的锁对象来同步对该值的访问。 如果 lockLockRLock 对象,那么它将用于同步对值的访问。 如果 lockFalse 则对返回对象的访问将不会自动受到锁的保护,因此它不一定是“进程安全的”。

请注意, lock 是仅关键字参数。

multiprocessing.sharedctypes.copy(obj)
返回从共享内存分配的 ctypes 对象,它是 ctypes 对象 obj 的副本。
multiprocessing.sharedctypes.synchronized(obj[, lock])

为使用 lock 同步访问的 ctypes 对象返回进程安全包装器对象。 如果 lockNone(默认值),则会自动创建一个 multiprocessing.RLock 对象。

除了它所包装的对象之外,同步包装器还有两个方法:get_obj() 返回包装的对象,而 get_lock() 返回用于同步的锁对象。

请注意,通过包装器访问 ctypes 对象可能比访问原始 ctypes 对象慢很多。

下表比较了从共享内存创建共享 ctypes 对象的语法与普通 ctypes 语法。 (在表中 MyStructctypes.Structure 的一些子类。)

类型 使用类型的共享类型 使用类型代码的 sharedctypes
c_double(2.4) 原始值(c_double,2.4) RawValue('d', 2.4)
我的结构(4, 6) RawValue(MyStruct, 4, 6)
(c_short * 7)() 原始数组(c_short,7) RawArray('h', 7)
(c_int * 3)(9, 2, 8) RawArray(c_int, (9, 2, 8)) RawArray('i', (9, 2, 8))

下面是一个示例,其中子进程修改了多个 ctypes 对象:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', 'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print n.value
    print x.value
    print s.value
    print [(a.x, a.y) for a in A]

打印出来的结果是

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

16.6.2.7. 经理

管理器提供了一种创建可在不同进程之间共享的数据的方法。 管理器对象控制管理 共享对象 的服务器进程。 其他进程可以使用代理访问共享对象。

multiprocessing.Manager()
返回一个已启动的 SyncManager 对象,该对象可用于在进程之间共享对象。 返回的管理器对象对应于生成的子进程,并具有创建共享对象并返回相应代理的方法。

一旦管理器进程被垃圾收集或它们的父进程退出,它们就会被关闭。 管理器类在 multiprocessing.managers 模块中定义:

class multiprocessing.managers.BaseManager([address[, authkey]])

创建一个 BaseManager 对象。

一旦创建,应该调用 start()get_server().serve_forever() 以确保管理器对象引用一个启动的管理器进程。

address 是管理器进程监听新连接的地址。 如果 addressNone,则选择任意一个。

authkey 是身份验证密钥,用于检查到服务器进程的传入连接的有效性。 如果 authkeyNone,则 current_process().authkey。 否则使用 authkey 并且它必须是一个字符串。

start([initializer[, initargs]])

启动一个子进程来启动管理器。 如果 initializer 不是 None,则子进程将在启动时调用 initializer(*initargs)

get_server()

返回一个 Server 对象,它代表管理器控制下的实际服务器。 Server 对象支持 [X34X] 方法:

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey='abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server 还有一个 address 属性。

connect()

将本地管理器对象连接到远程管理器进程:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 5000), authkey='abc')
>>> m.connect()
shutdown()

停止管理器使用的进程。 这仅在 start() 已用于启动服务器进程时可用。

这可以多次调用。

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

可用于注册类型或可调用管理器类的类方法。

typeid 是一个“类型标识符”,用于标识特定类型的共享对象。 这必须是一个字符串。

callable 是用于为此类型标识符创建对象的可调用对象。 如果将使用 from_address() 类方法创建管理器实例,或者 create_method 参数为 False,则可以将其保留为 None

proxytypeBaseProxy 的子类,用于为具有 typeid 的共享对象创建代理。 如果 None 则自动创建代理类。

exposed 用于指定方法名称序列,应允许使用 BaseProxy._callmethod() 访问此 typeid 的代理。 (如果 exposedNone 则使用 proxytype._exposed_ 如果它存在。)在没有指定公开列表的情况下,共享对象的所有“公共方法”将可以访问。 (这里的“公共方法”是指具有 __call__() 方法且名称不以 '_' 开头的任何属性。)

method_to_typeid 是一个映射,用于指定那些应该返回代理的公开方法的返回类型。 它将方法名称映射到 typeid 字符串。 (如果 method_to_typeidNone 则使用 proxytype._method_to_typeid_ 如果它存在。)如果方法的名称不是此映射的键或映射是 None 那么方法返回的对象会被值复制。

create_method 决定是否应该创建一个名称为 typeid 的方法,它可以用来告诉服务器进程创建一个新的共享对象并为其返回一个代理。 默认为 True

BaseManager 实例也有一个只读属性:

address

经理使用的地址。

class multiprocessing.managers.SyncManager

BaseManager 的子类,可用于进程同步。 这种类型的对象由 multiprocessing.Manager() 返回。

它还支持创建共享列表和字典。

BoundedSemaphore([value])

创建一个共享的 threading.BoundedSemaphore 对象并为其返回一个代理。

Condition([lock])

创建一个共享的 threading.Condition 对象并为其返回一个代理。

如果提供了 lock,那么它应该是 threading.Lockthreading.RLock 对象的代理。

Event()

创建一个共享的 threading.Event 对象并为其返回一个代理。

Lock()

创建一个共享的 threading.Lock 对象并为其返回一个代理。

Namespace()

创建一个共享的 Namespace 对象并为其返回一个代理。

Queue([maxsize])

创建一个共享的 Queue.Queue 对象并为其返回一个代理。

RLock()

创建一个共享的 threading.RLock 对象并为其返回一个代理。

Semaphore([value])

创建一个共享的 threading.Semaphore 对象并为其返回一个代理。

Array(typecode, sequence)

创建一个数组并为其返回一个代理。

Value(typecode, value)

创建一个具有可写 value 属性的对象并为其返回一个代理。

dict()
dict(mapping)
dict(sequence)

创建一个共享的 dict 对象并为其返回一个代理。

list()
list(sequence)

创建一个共享的 list 对象并为其返回一个代理。

笔记

对 dict 和 list 代理中可变值或项目的修改不会通过管理器传播,因为代理无法知道其值或项目何时被修改。 要修改这样的项目,您可以将修改后的对象重新分配给容器代理:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# reassigning the dictionary, the proxy is notified of the change
lproxy[0] = d
class multiprocessing.managers.Namespace

可以注册到 SyncManager 的类型。

命名空间对象没有公共方法,但具有可写属性。 它的表示显示了其属性的值。

但是,当为命名空间对象使用代理时,以 '_' 开头的属性将是代理的属性而不是所指对象的属性:

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print Global
Namespace(x=10, y='hello')

16.6.2.7.1。 定制经理

要创建自己的管理器,可以创建 BaseManager 的子类,并使用 register() 类方法向管理器类注册新类型或可调用对象。 例如:

from multiprocessing.managers import BaseManager

class MathsClass(object):
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    manager = MyManager()
    manager.start()
    maths = manager.Maths()
    print maths.add(4, 3)         # prints 7
    print maths.mul(7, 8)         # prints 56

16.6.2.7.2. 使用远程管理器

可以在一台机器上运行管理服务器并让客户端从其他机器上使用它(假设所涉及的防火墙允许)。

运行以下命令为远程客户端可以访问的单个共享队列创建服务器:

>>> from multiprocessing.managers import BaseManager
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

一个客户端可以按如下方式访问服务器:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

另一个客户端也可以使用它:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

本地进程也可以访问该队列,在客户端使用上面的代码远程访问它:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

16.6.2.8. 代理对象

代理是一个对象,它 指向一个共享对象,该对象(大概)存在于不同的进程中。 共享对象被称为代理的 referent。 多个代理对象可能具有相同的引用对象。

代理对象具有调用其所指对象的相应方法的方法(尽管并非所指对象的每个方法都必须通过代理可用)。 代理通常可以以其所指对象的大多数相同方式使用:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print l
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print repr(l)
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

请注意,将 str() 应用于代理将返回所指对象的表示,而应用 repr() 将返回代理的表示。

代理对象的一个重要特性是它们是可pickle的,因此它们可以在进程之间传递。 但是请注意,如果代理被发送到相应的管理器进程,那么解压它将产生所指对象本身。 这意味着,例如,一个共享对象可以包含第二个:

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print a, b
[[../]] []
>>> b.append('hello')
>>> print a, b
[[../'hello']] ['hello']

笔记

multiprocessing 中的代理类型不支持按值进行比较。 因此,例如,我们有:

>>> manager.list([1,2,3]) == [1,2,3]
False

在进行比较时,应该只使用所指对象的副本。


class multiprocessing.managers.BaseProxy

代理对象是 BaseProxy 子类的实例。

_callmethod(methodname[, args[, kwds]])

调用并返回代理所指对象的方法的结果。

如果 proxy 是一个代理,其所指对象是 obj 那么表达式

proxy._callmethod(methodname, args, kwds)

将评估表达式

getattr(obj, methodname)(*args, **kwds)

在经理的过程中。

返回的值将是调用结果的副本或新共享对象的代理 - 请参阅 BaseManager.register()method_to_typeid 参数的文档。

如果调用引发异常,则由 _callmethod() 重新引发。 如果在管理器的进程中引发了其他一些异常,那么这将转换为 RemoteError 异常并由 _callmethod() 引发。

请特别注意,如果 methodname 尚未被 公开 ,则会引发异常。

_callmethod() 的使用示例:

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getslice__', (2, 7))   # equiv to `l[2:7]`
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))     # equiv to `l[20]`
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

返回引用对象的副本。

如果所指对象是不可选择的,那么这将引发异常。

__repr__()

返回代理对象的表示。

__str__()

返回所指对象的表示。

16.6.2.8.1。 清理

代理对象使用弱引用回调,因此当它被垃圾收集时,它会从拥有其所指对象的管理器中注销自己。

当不再有任何代理引用它时,共享对象将从管理器进程中删除。


16.6.2.9. 进程池

可以创建一个进程池,这些进程将执行使用 Pool 类提交给它的任务。

class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])

一个进程池对象,它控制可以提交作业的工作进程池。 它支持带有超时和回调的异步结果,并具有并行映射实现。

processes 是要使用的工作进程数。 如果 processesNone,则使用 cpu_count() 返回的数字。 如果 initializer 不是 None,那么每个工作进程在启动时都会调用 initializer(*initargs)

请注意,池对象的方法只能由创建池的进程调用。

2.7 版新功能:maxtasksperchild 是工作进程在退出并被新工作进程替换之前可以完成的任务数,以释放未使用的资源。 默认的 maxtasksperchildNone,这意味着工作进程将与池一样长。

笔记

Pool 中的工作进程通常会在池的工作队列的整个持续时间内存活。 在其他系统(例如 Apache、mod_wsgi 等)中发现的一种常见模式,用于释放工作人员持有的资源,是允许池中的工作人员在退出、清理和产生新进程之前仅完成一定数量的工作替换旧的。 Poolmaxtasksperchild 参数向最终用户公开了这种能力。

apply(func[, args[, kwds]])

等效于 apply() 内置函数。 它会阻塞直到结果准备好,因此 apply_async() 更适合并行执行工作。 此外, func 仅在池的其中一个工作线程中执行。

apply_async(func[, args[, kwds[, callback]]])

apply() 方法的一个变体,它返回一个结果对象。

如果指定了 callback,那么它应该是一个接受单个参数的可调用对象。 当结果准备好时 callback 应用于它(除非调用失败)。 callback 应该立即完成,否则处理结果的线程将被阻塞。

map(func, iterable[, chunksize])

map() 内置函数的并行等效项(尽管它仅支持一个 iterable 参数)。 它阻塞直到结果准备好。

此方法将可迭代对象分成多个块,并将其作为单独的任务提交给进程池。 这些块的(近似)大小可以通过将 chunksize 设置为正整数来指定。

map_async(func, iterable[, chunksize[, callback]])

map() 方法的一个变体,它返回一个结果对象。

如果指定了 callback,那么它应该是一个接受单个参数的可调用对象。 当结果准备好时 callback 应用于它(除非调用失败)。 callback 应该立即完成,否则处理结果的线程将被阻塞。

imap(func, iterable[, chunksize])

相当于 itertools.imap()

chunksize 参数与 map() 方法使用的参数相同。 对于非常长的迭代,使用 chunksize 的大值可以使作业完成 比使用默认值 1 快得多

此外,如果 chunksize1,那么由 imap() 方法返回的迭代器的 next() 方法有一个可选的 timeout ] 参数:next(timeout) 如果在 timeout 秒内无法返回结果,将引发 multiprocessing.TimeoutError

imap_unordered(func, iterable[, chunksize])

imap() 相同,不同之处在于返回的迭代器的结果排序应被视为任意。 (只有当只有一个工作进程时,才能保证顺序是“正确的”。)

close()

防止任何更多的任务被提交到池中。 完成所有任务后,工作进程将退出。

terminate()

立即停止工作进程而不完成未完成的工作。 当池对象被垃圾回收时 terminate() 将被立即调用。

join()

等待工作进程退出。 在使用 join() 之前,必须调用 close()terminate()

class multiprocessing.pool.AsyncResult

Pool.apply_async()Pool.map_async() 返回结果的类。

get([timeout])

结果到达时返回。 如果 timeout 不是 None 并且结果没有在 timeout 秒内到达,则 multiprocessing.TimeoutError 被提升。 如果远程调用引发异常,那么该异常将被 get() 重新引发。

wait([timeout])

等到结果可用或直到 timeout 秒过去。

ready()

返回调用是否完成。

successful()

返回调用是否在不引发异常的情况下完成。 如果结果未准备好,将引发 AssertionError

以下示例演示了池的使用:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    result = pool.apply_async(f, (10,))   # evaluate "f(10)" asynchronously in a single process
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow

    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

    it = pool.imap(f, range(10))
    print it.next()                       # prints "0"
    print it.next()                       # prints "1"
    print it.next(timeout=1)              # prints "4" unless your computer is *very* slow

    result = pool.apply_async(time.sleep, (10,))
    print result.get(timeout=1)           # raises multiprocessing.TimeoutError

16.6.2.10。 听众和客户

通常进程之间的消息传递是使用队列或使用 Pipe() 返回的 Connection 对象完成的。

然而,multiprocessing.connection 模块允许一些额外的灵活性。 它基本上提供了一个用于处理套接字或 Windows 命名管道的高级面向消息的 API,并且还支持使用 hmac 模块的 摘要认证

multiprocessing.connection.deliver_challenge(connection, authkey)

向连接的另一端发送随机生成的消息并等待回复。

如果回复与使用 authkey 作为密钥的消息摘要匹配,则欢迎消息将发送到连接的另一端。 否则会引发 AuthenticationError

multiprocessing.connection.answer_challenge(connection, authkey)

接收消息,以authkey为key计算消息的摘要,然后将摘要发回。

如果未收到欢迎消息,则会引发 AuthenticationError

multiprocessing.connection.Client(address[, family[, authenticate[, authkey]]])

尝试与使用地址 address 的侦听器建立连接,返回 Connection

连接的类型由 family 参数决定,但通常可以省略,因为它通常可以从 address 的格式中推断出来。 (见地址格式

如果 authenticateTrueauthkey 是字符串,则使用摘要式身份验证。 如果 authkeyNone,用于身份验证的密钥将是 authkeycurrent_process().authkey)。 如果身份验证失败,则会引发 AuthenticationError。 请参阅 身份验证密钥

class multiprocessing.connection.Listener([address[, family[, backlog[, authenticate[, authkey]]]]])

绑定套接字或 Windows 命名管道的包装器,它正在“侦听”连接。

address 是侦听器对象的绑定套接字或命名管道要使用的地址。

笔记

如果使用地址“0.0.0.0”,则该地址将不是 Windows 上的可连接端点。 如果您需要可连接的端点,则应使用“127.0.0.1”。

family 是要使用的套接字(或命名管道)的类型。 这可以是字符串 'AF_INET'(对于 TCP 套接字)、'AF_UNIX'(对于 Unix 域套接字)或 'AF_PIPE'(对于 Windows 命名管道)之一。 其中只有第一个保证可用。 如果 familyNone,则从 address 的格式推断出该家族。 如果 address 也是 None,则选择默认值。 此默认值是假定为最快可用的系列。 请参阅 地址格式 。 请注意,如果 family'AF_UNIX' 且地址为 None,则套接字将在使用 tempfile.mkstemp() 创建的私有临时目录中创建.

如果侦听器对象使用套接字,则 backlog(默认为 1)在绑定后传递给套接字的 listen() 方法。

如果 authenticateTrue(默认为 False)或 authkey 不是 None,则使用摘要式认证。

如果 authkey 是一个字符串,那么它将被用作认证密钥; 否则它必须是 None

如果 authkeyNoneauthenticateTrue,则使用 current_process().authkey 作为认证密钥。 如果 authkeyNone 并且 authenticateFalse,则不进行身份验证。 如果身份验证失败,则会引发 AuthenticationError。 请参阅 身份验证密钥

accept()

接受侦听器对象的绑定套接字或命名管道上的连接并返回 Connection 对象。 如果尝试进行身份验证但失败,则会引发 AuthenticationError

close()

关闭侦听器对象的绑定套接字或命名管道。 当侦听器被垃圾收集时会自动调用。 但是,建议明确调用它。

侦听器对象具有以下只读属性:

address

侦听器对象正在使用的地址。

last_accepted

最后接受的连接来自的地址。 如果这不可用,则它是 None

该模块定义了以下异常:

exception multiprocessing.connection.ProcessError
所有 multiprocessing 异常的基类。
exception multiprocessing.connection.BufferTooShort
Connection.recv_bytes_into() 当提供的缓冲区对象太小而无法读取消息时引发异常。
exception multiprocessing.connection.AuthenticationError
出现身份验证错误时引发。
exception multiprocessing.connection.TimeoutError
超时到期时由具有超时的方法引发。

例子

以下服务器代码创建一个使用 'secret password' 作为身份验证密钥的侦听器。 然后它等待连接并向客户端发送一些数据:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'
listener = Listener(address, authkey='secret password')

conn = listener.accept()
print 'connection accepted from', listener.last_accepted

conn.send([2.25, None, 'junk', float])

conn.send_bytes('hello')

conn.send_bytes(array('i', [42, 1729]))

conn.close()
listener.close()

以下代码连接到服务器并从服务器接收一些数据:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)
conn = Client(address, authkey='secret password')

print conn.recv()                 # => [2.25, None, 'junk', float]

print conn.recv_bytes()            # => 'hello'

arr = array('i', [0, 0, 0, 0, 0])
print conn.recv_bytes_into(arr)     # => 8
print arr                         # => array('i', [42, 1729, 0, 0, 0])

conn.close()

16.6.2.10.1。 地址格式

  • 'AF_INET' 地址是 (hostname, port) 形式的元组,其中 hostname 是字符串,port 是整数。
  • 'AF_UNIX' 地址是表示文件系统上的文件名的字符串。
  • *; 'AF_PIPE' 地址是以下形式的字符串
    r'\.\pipe{PipeName}'。 要使用 Client() 连接到名为 ServerName 的远程计算机上的命名管道,应改用 r'\ServerName\pipe{PipeName}' 形式的地址。

请注意,任何以两个反斜杠开头的字符串都被默认假定为 'AF_PIPE' 地址而不是 'AF_UNIX' 地址。


16.6.2.11。 认证密钥

当使用 Connection.recv() 时,接收到的数据会自动解压。 不幸的是,从不受信任的来源提取数据是一种安全风险。 因此 ListenerClient() 使用 hmac 模块提供摘要认证。

身份验证密钥是一个可以被认为是密码的字符串:一旦建立连接,两端将要求证明对方知道身份验证密钥。 (证明两端都使用相同的密钥 不是 涉及通过连接发送密钥。)

如果请求身份验证但未指定身份验证密钥,则使用 current_process().authkey 的返回值(请参阅 Process)。 该值将由当前进程创建的任何 Process 对象自动继承。 这意味着(默认情况下)多进程程序的所有进程将共享一个身份验证密钥,该密钥可在它们之间建立连接时使用。

还可以使用 os.urandom() 生成合适的身份验证密钥。


16.6.2.12。 日志记录

一些日志支持是可用的。 但是请注意,logging 包不使用进程共享锁,因此来自不同进程的消息可能(取决于处理程序类型)混淆。

multiprocessing.get_logger()

返回 multiprocessing 使用的记录器。 如有必要,将创建一个新的。

首次创建时,记录器具有级别 logging.NOTSET 并且没有默认处理程序。 默认情况下,发送到此记录器的消息不会传播到根记录器。

请注意,在 Windows 上,子进程将仅继承父进程的记录器级别——记录器的任何其他自定义都不会被继承。

multiprocessing.log_to_stderr()
此函数执行对 get_logger() 的调用,但除了返回由 get_logger 创建的记录器外,它还添加了一个处理程序,使用格式 '[%(levelname)s/%(processName)s] %(message)s' 将输出发送到 sys.stderr ]。

以下是打开日志记录的示例会话:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

除了具有这两个日志记录功能之外,多处理还公开了两个额外的日志记录级别属性。 它们是 SUBWARNINGSUBDEBUG。 下表说明了这些论文在正常级别层次结构中的位置。

等级 数值
SUBWARNING 25
SUBDEBUG 5

有关日志级别的完整表,请参阅 logging 模块。

这些额外的日志级别主要用于多处理模块中的某些调试消息。 下面是与上面相同的示例,除了启用了 SUBDEBUG

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(multiprocessing.SUBDEBUG)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../pymp-djGBXN/listener-...'
>>> del m
[SUBDEBUG/MainProcess] finalizer calling ...
[INFO/MainProcess] sending shutdown message to manager
[DEBUG/SyncManager-...] manager received shutdown message
[SUBDEBUG/SyncManager-...] calling <Finalize object, callback=unlink, ...
[SUBDEBUG/SyncManager-...] finalizer calling <built-in function unlink> ...
[SUBDEBUG/SyncManager-...] calling <Finalize object, dead>
[SUBDEBUG/SyncManager-...] finalizer calling <function rmtree at 0x5aa730> ...
[INFO/SyncManager-...] manager exiting with exitcode 0

16.6.2.13。 这 multiprocessing.dummy 模块

multiprocessing.dummy 复制了 multiprocessing 的 API,但只不过是 threading 模块的包装器。


16.6.3. 编程指南

使用 multiprocessing 时应遵守某些准则和习惯用法。

16.6.3.1. 所有平台

避免共享状态

应尽可能避免在进程之间转移大量数据。

最好坚持使用队列或管道进行进程之间的通信,而不是使用来自 threading 模块的较低级别的同步原语。


酸洗性

确保代理方法的参数是可挑选的。


代理的线程安全

不要使用来自多个线程的代理对象,除非你用锁保护它。

(使用 same 代理的不同进程永远不会出现问题。)


加入僵尸进程

在 Unix 上,当一个进程完成但尚未加入时,它会变成僵尸。 永远不应该有很多,因为每次一个新进程启动(或 active_children() 被调用)所有尚未加入的已完成进程都将被加入。 同时调用已完成进程的 Process.is_alive 将加入进程。 即便如此,明确加入您启动的所有流程可能是一种很好的做法。


继承比pickle/unpickle更好

在 Windows 上,来自 multiprocessing 的许多类型需要是可pickle的,以便子进程可以使用它们。 但是,通常应该避免使用管道或队列将共享对象发送到其他进程。 相反,您应该安排程序,以便需要访问在别处创建的共享资源的进程可以从祖先进程继承它。


避免终止进程

使用 Process.terminate 方法停止进程很可能导致进程当前正在使用的任何共享资源(例如锁、信号量、管道和队列)损坏或对其他进程不可用。

因此,最好只考虑在从不使用任何共享资源的进程上使用 Process.terminate


加入使用队列的进程

请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲的项目都由“馈送器”线程馈送到底层管道。 (子进程可以调用队列的 cancel_join_thread() 方法来避免这种行为。)

这意味着每当您使用队列时,您都需要确保已放入队列的所有项目在加入进程之前最终将被删除。 否则,您无法确定将项目放入队列的进程会终止。 还要记住,非守护进程将自动加入。

一个会死锁的例子如下:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

这里的解决方法是交换最后两行(或简单地删除 p.join() 行)。


显式地将资源传递给子进程

在 Unix 上,子进程可以使用在使用全局资源的父进程中创建的共享资源。 但是,最好将对象作为参数传递给子进程的构造函数。

除了使代码(可能)与 Windows 兼容之外,这还确保只要子进程仍然存在,该对象就不会在父进程中被垃圾收集。 如果在父进程中垃圾收集对象时释放了某些资源,这可能很重要。

所以例如

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

应该改写为

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

当心将 sys.stdin 替换为“类似文件的对象”

multiprocessing 原先无条件调用:

os.close(sys.stdin.fileno())

multiprocessing.Process._bootstrap() 方法中——这导致了进程中的问题。 这已更改为:

sys.stdin.close()
sys.stdin = open(os.devnull)

这解决了进程相互冲突导致错误文件描述符错误的基本问题,但对将 sys.stdin() 替换为具有输出缓冲的“类文件对象”的应用程序引入了潜在危险. 这种危险在于,如果多个进程在这个类文件对象上调用 close(),可能会导致多次将相同的数据刷新到该对象,从而导致损坏。

如果您编写一个类似文件的对象并实现自己的缓存,则可以通过在附加到缓存时存储 pid 并在 pid 更改时丢弃缓存来使其安全。 例如:

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

有关更多信息,请参阅 :issue:`5155`:issue:`5313`:issue:`5331`


16.6.3.2. 视窗

由于 Windows 缺少 os.fork() 它有一些额外的限制:

更好的可腌制性

确保 Process.__init__() 的所有参数都是可腌制的。 这尤其意味着绑定或未绑定方法不能直接用作 Windows 上的 target 参数——只需定义一个函数并使用它。

此外,如果您将 Process 子类化,请确保在调用 Process.start 方法时实例将是可酸洗的。


全局变量

请记住,如果在子进程中运行的代码尝试访问全局变量,那么它看到的值(如果有)可能与 Process.start[ 时父进程中的值不同。 X205X] 被调用。

然而,只是模块级常量的全局变量不会引起任何问题。


安全导入主模块

确保新的 Python 解释器可以安全地导入主模块,而不会导致意外的副作用(例如启动新进程)。

例如,在 Windows 下运行以下模块将失败并显示 RuntimeError

from multiprocessing import Process

def foo():
    print 'hello'

p = Process(target=foo)
p.start()

相反,应该通过使用 if __name__ == '__main__': 来保护程序的“入口点”,如下所示:

from multiprocessing import Process, freeze_support

def foo():
    print 'hello'

if __name__ == '__main__':
    freeze_support()
    p = Process(target=foo)
    p.start()

(如果程序将正常运行而不是冻结,则可以省略 freeze_support() 行。)

这允许新生成的 Python 解释器安全地导入模块,然后运行模块的 foo() 函数。

如果在主模块中创建了池或管理器,则类似的限制适用。


16.6.4. 例子

演示如何创建和使用自定义管理器和代理:

#
# This module shows how to use arbitrary callables with a subclass of
# `BaseManager`.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo(object):
    def f(self):
        print 'you called Foo.f()'
    def g(self):
        print 'you called Foo.g()'
    def _h(self):
        print 'you called Foo._h()'

# A simple generator function
def baz():
    for i in xrange(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ('next', '__next__')
    def __iter__(self):
        return self
    def next(self):
        return self._callmethod('next')
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print '-' * 20

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print '-' * 20

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print '-' * 20

    it = manager.baz()
    for i in it:
        print '<%d>' % i,
    print

    print '-' * 20

    op = manager.operator()
    print 'op.add(23, 45) =', op.add(23, 45)
    print 'op.pow(2, 94) =', op.pow(2, 94)
    print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6)
    print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3)
    print 'op._exposed_ =', op._exposed_

##

if __name__ == '__main__':
    freeze_support()
    test()

使用 Pool

#
# A test of `multiprocessing.Pool` class
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

def f(x):
    return 1.0 / (x-5.0)

def pow3(x):
    return x**3

def noop(x):
    pass

#
# Test code
#

def test():
    print 'cpu_count() = %d\n' % multiprocessing.cpu_count()

    #
    # Create pool
    #

    PROCESSES = 4
    print 'Creating pool with %d processes\n' % PROCESSES
    pool = multiprocessing.Pool(PROCESSES)
    print 'pool = %s' % pool
    print

    #
    # Tests
    #

    TASKS = [(mul, (i, 7)) for i in range(10)] + \
            [(plus, (i, 8)) for i in range(10)]

    results = [pool.apply_async(calculate, t) for t in TASKS]
    imap_it = pool.imap(calculatestar, TASKS)
    imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

    print 'Ordered results using pool.apply_async():'
    for r in results:
        print '\t', r.get()
    print

    print 'Ordered results using pool.imap():'
    for x in imap_it:
        print '\t', x
    print

    print 'Unordered results using pool.imap_unordered():'
    for x in imap_unordered_it:
        print '\t', x
    print

    print 'Ordered results using pool.map() --- will block till complete:'
    for x in pool.map(calculatestar, TASKS):
        print '\t', x
    print

    #
    # Simple benchmarks
    #

    N = 100000
    print 'def pow3(x): return x**3'

    t = time.time()
    A = map(pow3, xrange(N))
    print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
          (N, time.time() - t)

    t = time.time()
    B = pool.map(pow3, xrange(N))
    print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
          (N, time.time() - t)

    t = time.time()
    C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
    print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
          ' seconds' % (N, N//8, time.time() - t)

    assert A == B == C, (len(A), len(B), len(C))
    print

    L = [None] * 1000000
    print 'def noop(x): pass'
    print 'L = [None] * 1000000'

    t = time.time()
    A = map(noop, L)
    print '\tmap(noop, L):\n\t\t%s seconds' % \
          (time.time() - t)

    t = time.time()
    B = pool.map(noop, L)
    print '\tpool.map(noop, L):\n\t\t%s seconds' % \
          (time.time() - t)

    t = time.time()
    C = list(pool.imap(noop, L, chunksize=len(L)//8))
    print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
          (len(L)//8, time.time() - t)

    assert A == B == C, (len(A), len(B), len(C))
    print

    del A, B, C, L

    #
    # Test error handling
    #

    print 'Testing error handling:'

    try:
        print pool.apply(f, (5,))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from pool.apply()'
    else:
        raise AssertionError('expected ZeroDivisionError')

    try:
        print pool.map(f, range(10))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from pool.map()'
    else:
        raise AssertionError('expected ZeroDivisionError')

    try:
        print list(pool.imap(f, range(10)))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from list(pool.imap())'
    else:
        raise AssertionError('expected ZeroDivisionError')

    it = pool.imap(f, range(10))
    for i in range(10):
        try:
            x = it.next()
        except ZeroDivisionError:
            if i == 5:
                pass
        except StopIteration:
            break
        else:
            if i == 5:
                raise AssertionError('expected ZeroDivisionError')

    assert i == 9
    print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
    print

    #
    # Testing timeouts
    #

    print 'Testing ApplyResult.get() with timeout:',
    res = pool.apply_async(calculate, TASKS[0])
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write('\n\t%s' % res.get(0.02))
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write('.')
    print
    print

    print 'Testing IMapIterator.next() with timeout:',
    it = pool.imap(calculatestar, TASKS)
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write('\n\t%s' % it.next(0.02))
        except StopIteration:
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write('.')
    print
    print

    #
    # Testing callback
    #

    print 'Testing callback:'

    A = []
    B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

    r = pool.apply_async(mul, (7, 8), callback=A.append)
    r.wait()

    r = pool.map_async(pow3, range(10), callback=A.extend)
    r.wait()

    if A == B:
        print '\tcallbacks succeeded\n'
    else:
        print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)

    #
    # Check there are no outstanding tasks
    #

    assert not pool._cache, 'cache = %r' % pool._cache

    #
    # Check close() methods
    #

    print 'Testing close():'

    for worker in pool._pool:
        assert worker.is_alive()

    result = pool.apply_async(time.sleep, [0.5])
    pool.close()
    pool.join()

    assert result.get() is None

    for worker in pool._pool:
        assert not worker.is_alive()

    print '\tclose() succeeded\n'

    #
    # Check terminate() method
    #

    print 'Testing terminate():'

    pool = multiprocessing.Pool(2)
    DELTA = 0.1
    ignore = pool.apply(pow3, [2])
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
    pool.terminate()
    pool.join()

    for worker in pool._pool:
        assert not worker.is_alive()

    print '\tterminate() succeeded\n'

    #
    # Check garbage collection
    #

    print 'Testing garbage collection:'

    pool = multiprocessing.Pool(2)
    DELTA = 0.1
    processes = pool._pool
    ignore = pool.apply(pow3, [2])
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]

    results = pool = None

    time.sleep(DELTA * 2)

    for worker in processes:
        assert not worker.is_alive()

    print '\tgarbage collection succeeded\n'


if __name__ == '__main__':
    multiprocessing.freeze_support()

    assert len(sys.argv) in (1, 2)

    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
        print ' Using processes '.center(79, '-')
    elif sys.argv[1] == 'threads':
        print ' Using threads '.center(79, '-')
        import multiprocessing.dummy as multiprocessing
    else:
        print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
        raise SystemExit(2)

    test()

同步类型,如锁、条件和队列:

#
# A test file for the `multiprocessing` package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time, sys, random
from Queue import Empty

import multiprocessing               # may get overwritten


#### TEST_VALUE

def value_func(running, mutex):
    random.seed()
    time.sleep(random.random()*4)

    mutex.acquire()
    print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
    running.value -= 1
    mutex.release()

def test_value():
    TASKS = 10
    running = multiprocessing.Value('i', TASKS)
    mutex = multiprocessing.Lock()

    for i in range(TASKS):
        p = multiprocessing.Process(target=value_func, args=(running, mutex))
        p.start()

    while running.value > 0:
        time.sleep(0.08)
        mutex.acquire()
        print running.value,
        sys.stdout.flush()
        mutex.release()

    print
    print 'No more running processes'


#### TEST_QUEUE

def queue_func(queue):
    for i in range(30):
        time.sleep(0.5 * random.random())
        queue.put(i*i)
    queue.put('STOP')

def test_queue():
    q = multiprocessing.Queue()

    p = multiprocessing.Process(target=queue_func, args=(q,))
    p.start()

    o = None
    while o != 'STOP':
        try:
            o = q.get(timeout=0.3)
            print o,
            sys.stdout.flush()
        except Empty:
            print 'TIMEOUT'

    print


#### TEST_CONDITION

def condition_func(cond):
    cond.acquire()
    print '\t' + str(cond)
    time.sleep(2)
    print '\tchild is notifying'
    print '\t' + str(cond)
    cond.notify()
    cond.release()

def test_condition():
    cond = multiprocessing.Condition()

    p = multiprocessing.Process(target=condition_func, args=(cond,))
    print cond

    cond.acquire()
    print cond
    cond.acquire()
    print cond

    p.start()

    print 'main is waiting'
    cond.wait()
    print 'main has woken up'

    print cond
    cond.release()
    print cond
    cond.release()

    p.join()
    print cond


#### TEST_SEMAPHORE

def semaphore_func(sema, mutex, running):
    sema.acquire()

    mutex.acquire()
    running.value += 1
    print running.value, 'tasks are running'
    mutex.release()

    random.seed()
    time.sleep(random.random()*2)

    mutex.acquire()
    running.value -= 1
    print '%s has finished' % multiprocessing.current_process()
    mutex.release()

    sema.release()

def test_semaphore():
    sema = multiprocessing.Semaphore(3)
    mutex = multiprocessing.RLock()
    running = multiprocessing.Value('i', 0)

    processes = [
        multiprocessing.Process(target=semaphore_func,
                                args=(sema, mutex, running))
        for i in range(10)
        ]

    for p in processes:
        p.start()

    for p in processes:
        p.join()


#### TEST_JOIN_TIMEOUT

def join_timeout_func():
    print '\tchild sleeping'
    time.sleep(5.5)
    print '\n\tchild terminating'

def test_join_timeout():
    p = multiprocessing.Process(target=join_timeout_func)
    p.start()

    print 'waiting for process to finish'

    while 1:
        p.join(timeout=1)
        if not p.is_alive():
            break
        print '.',
        sys.stdout.flush()


#### TEST_EVENT

def event_func(event):
    print '\t%r is waiting' % multiprocessing.current_process()
    event.wait()
    print '\t%r has woken up' % multiprocessing.current_process()

def test_event():
    event = multiprocessing.Event()

    processes = [multiprocessing.Process(target=event_func, args=(event,))
                 for i in range(5)]

    for p in processes:
        p.start()

    print 'main is sleeping'
    time.sleep(2)

    print 'main is setting event'
    event.set()

    for p in processes:
        p.join()


#### TEST_SHAREDVALUES

def sharedvalues_func(values, arrays, shared_values, shared_arrays):
    for i in range(len(values)):
        v = values[i][1]
        sv = shared_values[i].value
        assert v == sv

    for i in range(len(values)):
        a = arrays[i][1]
        sa = list(shared_arrays[i][:])
        assert a == sa

    print 'Tests passed'

def test_sharedvalues():
    values = [
        ('i', 10),
        ('h', -2),
        ('d', 1.25)
        ]
    arrays = [
        ('i', range(100)),
        ('d', [0.25 * i for i in range(100)]),
        ('H', range(1000))
        ]

    shared_values = [multiprocessing.Value(id, v) for id, v in values]
    shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]

    p = multiprocessing.Process(
        target=sharedvalues_func,
        args=(values, arrays, shared_values, shared_arrays)
        )
    p.start()
    p.join()

    assert p.exitcode == 0


####

def test(namespace=multiprocessing):
    global multiprocessing

    multiprocessing = namespace

    for func in [ test_value, test_queue, test_condition,
                  test_semaphore, test_join_timeout, test_event,
                  test_sharedvalues ]:

        print '\n\t######## %s\n' % func.__name__
        func()

    ignore = multiprocessing.active_children()      # cleanup any old processes
    if hasattr(multiprocessing, '_debug_info'):
        info = multiprocessing._debug_info()
        if info:
            print info
            raise ValueError('there should be no positive refcounts left')


if __name__ == '__main__':
    multiprocessing.freeze_support()

    assert len(sys.argv) in (1, 2)

    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
        print ' Using processes '.center(79, '-')
        namespace = multiprocessing
    elif sys.argv[1] == 'manager':
        print ' Using processes and a manager '.center(79, '-')
        namespace = multiprocessing.Manager()
        namespace.Process = multiprocessing.Process
        namespace.current_process = multiprocessing.current_process
        namespace.active_children = multiprocessing.active_children
    elif sys.argv[1] == 'threads':
        print ' Using threads '.center(79, '-')
        import multiprocessing.dummy as namespace
    else:
        print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
        raise SystemExit(2)

    test(namespace)

显示如何使用队列将任务提供给工作进程集合并收集结果的示例:

#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue.  If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print 'Unordered results:'
    for i in range(len(TASKS1)):
        print '\t', done_queue.get()

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print '\t', done_queue.get()

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()

一个工作进程池如何在共享单个侦听套接字的同时运行 SimpleHTTPServer.HttpServer 实例的示例。

#
# Example where a pool of http servers share a single listening socket
#
# On Windows this module depends on the ability to pickle a socket
# object so that the worker processes can inherit a copy of the server
# object.  (We import `multiprocessing.reduction` to enable this pickling.)
#
# Not sure if we should synchronize access to `socket.accept()` method by
# using a process-shared lock -- does not seem to be necessary.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import os
import sys

from multiprocessing import Process, current_process, freeze_support
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler

if sys.platform == 'win32':
    import multiprocessing.reduction    # make sockets pickable/inheritable


def note(format, *args):
    sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))


class RequestHandler(SimpleHTTPRequestHandler):
    # we override log_message() to show which process is handling the request
    def log_message(self, format, *args):
        note(format, *args)

def serve_forever(server):
    note('starting server')
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        pass


def runpool(address, number_of_processes):
    # create a single server object -- children will each inherit a copy
    server = HTTPServer(address, RequestHandler)

    # create child processes to act as workers
    for i in range(number_of_processes-1):
        Process(target=serve_forever, args=(server,)).start()

    # main process also acts as a worker
    serve_forever(server)


def test():
    DIR = os.path.join(os.path.dirname(__file__), '..')
    ADDRESS = ('localhost', 8000)
    NUMBER_OF_PROCESSES = 4

    print 'Serving at http://%s:%d using %d worker processes' % \
          (ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
    print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']

    os.chdir(DIR)
    runpool(ADDRESS, NUMBER_OF_PROCESSES)


if __name__ == '__main__':
    freeze_support()
    test()

一些简单的基准比较 multiprocessingthreading

#
# Simple benchmarks for the multiprocessing package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time, sys, multiprocessing, threading, Queue, gc

if sys.platform == 'win32':
    _timer = time.clock
else:
    _timer = time.time

delta = 1


#### TEST_QUEUESPEED

def queuespeed_func(q, c, iterations):
    a = '0' * 256
    c.acquire()
    c.notify()
    c.release()

    for i in xrange(iterations):
        q.put(a)

    q.put('STOP')

def test_queuespeed(Process, q, c):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        p = Process(target=queuespeed_func, args=(q, c, iterations))
        c.acquire()
        p.start()
        c.wait()
        c.release()

        result = None
        t = _timer()

        while result != 'STOP':
            result = q.get()

        elapsed = _timer() - t

        p.join()

    print iterations, 'objects passed through the queue in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_PIPESPEED

def pipe_func(c, cond, iterations):
    a = '0' * 256
    cond.acquire()
    cond.notify()
    cond.release()

    for i in xrange(iterations):
        c.send(a)

    c.send('STOP')

def test_pipespeed():
    c, d = multiprocessing.Pipe()
    cond = multiprocessing.Condition()
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        p = multiprocessing.Process(target=pipe_func,
                                    args=(d, cond, iterations))
        cond.acquire()
        p.start()
        cond.wait()
        cond.release()

        result = None
        t = _timer()

        while result != 'STOP':
            result = c.recv()

        elapsed = _timer() - t
        p.join()

    print iterations, 'objects passed through connection in',elapsed,'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_SEQSPEED

def test_seqspeed(seq):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        t = _timer()

        for i in xrange(iterations):
            a = seq[5]

        elapsed = _timer()-t

    print iterations, 'iterations in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_LOCK

def test_lockspeed(l):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        t = _timer()

        for i in xrange(iterations):
            l.acquire()
            l.release()

        elapsed = _timer()-t

    print iterations, 'iterations in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_CONDITION

def conditionspeed_func(c, N):
    c.acquire()
    c.notify()

    for i in xrange(N):
        c.wait()
        c.notify()

    c.release()

def test_conditionspeed(Process, c):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        c.acquire()
        p = Process(target=conditionspeed_func, args=(c, iterations))
        p.start()

        c.wait()

        t = _timer()

        for i in xrange(iterations):
            c.notify()
            c.wait()

        elapsed = _timer()-t

        c.release()
        p.join()

    print iterations * 2, 'waits in', elapsed, 'seconds'
    print 'average number/sec:', iterations * 2 / elapsed

####

def test():
    manager = multiprocessing.Manager()

    gc.disable()

    print '\n\t######## testing Queue.Queue\n'
    test_queuespeed(threading.Thread, Queue.Queue(),
                    threading.Condition())
    print '\n\t######## testing multiprocessing.Queue\n'
    test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
                    multiprocessing.Condition())
    print '\n\t######## testing Queue managed by server process\n'
    test_queuespeed(multiprocessing.Process, manager.Queue(),
                    manager.Condition())
    print '\n\t######## testing multiprocessing.Pipe\n'
    test_pipespeed()

    print

    print '\n\t######## testing list\n'
    test_seqspeed(range(10))
    print '\n\t######## testing list managed by server process\n'
    test_seqspeed(manager.list(range(10)))
    print '\n\t######## testing Array("i", ..., lock=False)\n'
    test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
    print '\n\t######## testing Array("i", ..., lock=True)\n'
    test_seqspeed(multiprocessing.Array('i', range(10), lock=True))

    print

    print '\n\t######## testing threading.Lock\n'
    test_lockspeed(threading.Lock())
    print '\n\t######## testing threading.RLock\n'
    test_lockspeed(threading.RLock())
    print '\n\t######## testing multiprocessing.Lock\n'
    test_lockspeed(multiprocessing.Lock())
    print '\n\t######## testing multiprocessing.RLock\n'
    test_lockspeed(multiprocessing.RLock())
    print '\n\t######## testing lock managed by server process\n'
    test_lockspeed(manager.Lock())
    print '\n\t######## testing rlock managed by server process\n'
    test_lockspeed(manager.RLock())

    print

    print '\n\t######## testing threading.Condition\n'
    test_conditionspeed(threading.Thread, threading.Condition())
    print '\n\t######## testing multiprocessing.Condition\n'
    test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
    print '\n\t######## testing condition managed by a server process\n'
    test_conditionspeed(multiprocessing.Process, manager.Condition())

    gc.enable()

if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()