multiprocessing.shared_memory — 为跨进程直接访问提供共享内存 — Python 文档

来自菜鸟教程
Python/docs/3.10/library/multiprocessing.shared memory
跳转至:导航、​搜索

multiprocessing.shared_memory — 为跨进程直接访问提供共享内存

源代码: :source:`Lib/multiprocessing/shared_memory.py`

3.8 版中的新功能。



该模块提供了一个类 SharedMemory,用于分配和管理共享内存,供多核或对称多处理器 (SMP) 机器上的一个或多个进程访问。 为了协助共享内存的生命周期管理,尤其是跨不同进程的共享内存,multiprocessing.managers 模块中还提供了一个 BaseManager 子类 SharedMemoryManager

在这个模块中,共享内存指的是“System V 风格”的共享内存块(尽管不一定如此显式地实现),而不是指“分布式共享内存”。 这种类型的共享内存允许不同的进程潜在地读取和写入易失性内存的公共(或共享)区域。 进程通常被限制为只能访问自己的进程内存空间,但共享内存允许在进程之间共享数据,从而避免需要在包含该数据的进程之间发送消息。 与通过磁盘或套接字或其他需要序列化/反序列化和复制数据的通信共享数据相比,直接通过内存共享数据可以提供显着的性能优势。

class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0)

创建新的共享内存块或附加到现有的共享内存块。 每个共享内存块都分配了一个唯一的名称。 通过这种方式,一个进程可以创建一个具有特定名称的共享内存块,另一个进程可以使用相同的名称连接到同一个共享内存块。

作为跨进程共享数据的资源,共享内存块可能比创建它们的原始进程寿命更长。 当一个进程不再需要访问其他进程可能仍然需要的共享内存块时,应该调用 close() 方法。 当任何进程不再需要共享内存块时,应调用 unlink() 方法以确保正确清理。

name 是请求的共享内存的唯一名称,指定为字符串。 创建新的共享内存块时,如果为名称提供 None(默认值),则会生成一个新名称。

create 控制是创建新的共享内存块(True)还是附加现有的共享内存块(False)。

size 指定创建新共享内存块时请求的字节数。 由于某些平台选择根据该平台的内存页大小分配内存块,因此共享内存块的确切大小可能大于或等于请求的大小。 当附加到现有共享内存块时,size 参数将被忽略。

close()

关闭从此实例对共享内存的访问。 为了确保正确清理资源,一旦不再需要实例,所有实例都应调用 close()。 请注意,调用 close() 不会导致共享内存块本身被销毁。

unlink()

请求销毁底层共享内存块。 为了确保正确清理资源,应该在需要共享内存块的所有进程中调用一次(且仅一次)unlink()。 请求销毁后,共享内存块可能会或可能不会立即销毁,并且这种行为可能因平台而异。 在调用 unlink() 后尝试访问共享内存块内的数据可能会导致内存访问错误。 注意:最后一个放弃共享内存块的进程可能会以任一顺序调用 unlink()close()

buf

共享内存块内容的内存视图。

name

只读访问共享内存块的唯一名称。

size

只读访问共享内存块的字节大小。

以下示例演示了 SharedMemory 实例的低级使用:

>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
>>> buffer[4] = 100                           # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5])      # Access via shm_a
b'howdy'
>>> shm_b.close()   # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink()  # Call unlink only once to release the shared memory

以下示例演示了 SharedMemory 类与 NumPy 数组 的实际使用,从两个不同的 Python shell 访问相同的 numpy.ndarray

>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:]  # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name  # We did not specify a name so one was chosen for us
'psm_21467_46075'

>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([  1,   1,   2,   3,   5, 888])

>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([  1,   1,   2,   3,   5, 888])

>>> # Clean up from within the second Python shell
>>> del c  # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()

>>> # Clean up from within the first Python shell
>>> del b  # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink()  # Free and release the shared memory block at the very end
class multiprocessing.managers.SharedMemoryManager([address[, authkey]])

BaseManager 的子类,可用于跨进程管理共享内存块。

SharedMemoryManager 实例上调用 start() 会导致启动一个新进程。 这个新进程的唯一目的是管理通过它创建的所有共享内存块的生命周期。 要触发该进程管理的所有共享内存块的释放,请在实例上调用 shutdown()。 这会触发对该进程管理的所有 SharedMemory 对象的 SharedMemory.unlink() 调用,然后停止进程本身。 通过一个SharedMemoryManager创建SharedMemory实例,我们避免了手动跟踪和触发共享内存资源释放的需要。

此类提供用于创建和返回 SharedMemory 实例以及创建由共享内存支持的类列表对象 (ShareableList) 的方法。

有关继承的 addressauthkey 可选输入参数的描述以及如何使用它们连接到现有的 SharedMemoryManager 来自其他进程的服务。

SharedMemory(size)

创建并返回一个新的 SharedMemory 对象,以字节为单位指定 size

ShareableList(sequence)

创建并返回一个新的 ShareableList 对象,由输入 sequence 中的值初始化。

以下示例演示了 SharedMemoryManager 的基本机制:

>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
>>> smm.start()  # Start the process that manages the shared memory blocks
>>> sl = smm.ShareableList(range(4))
>>> sl
ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')
>>> smm.shutdown()  # Calls unlink() on sl, raw_shm, and another_sl

以下示例描述了通过 with 语句使用 SharedMemoryManager 对象的一种可能更方便的模式,以确保所有共享内存块在不再需要后被释放:

>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # Divide the work among two processes, storing partial results in sl
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     p2.start()  # A multiprocessing.Pool might be more efficient
...     p1.join()
...     p2.join()   # Wait for all work to complete in both processes
...     total_result = sum(sl)  # Consolidate the partial results now in sl

当在 with 语句中使用 SharedMemoryManager 时,当 with 语句的代码块完成执行时,使用该管理器创建的共享内存块都将被释放。

class multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)

提供一个可变的类似列表的对象,其中存储的所有值都存储在共享内存块中。 这将可存储值限制为仅 intfloatboolstr(每个小于 10M 字节)、bytes(小于每个不超过 10M 字节)和 None 内置数据类型。 它还与内置的 list 类型显着不同,因为这些列表不能改变它们的总长度(即 没有附加、插入等)并且不支持通过切片动态创建新的 ShareableList 实例。

sequence 用于填充新的 ShareableList 值。 设置为 None 以通过其唯一的共享内存名称附加到已经存在的 ShareableList

name 是所请求共享内存的唯一名称,如 SharedMemory 的定义中所述。 当附加到现有的 ShareableList 时,指定其共享内存块的唯一名称,同时将 sequence 设置为 None

count(value)

返回 value 的出现次数。

index(value)

返回 value 的第一个索引位置。 如果 value 不存在,则引发 ValueError

format

包含所有当前存储值使用的 struct 打包格式的只读属性。

shm

存储值的 SharedMemory 实例。

以下示例演示了 ShareableList 实例的基本用法:

>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice'  # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
  ...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a  # Use of a ShareableList after call to unlink() is unsupported

以下示例描述了一个、两个或多个进程如何通过提供其后面的共享内存块的名称来访问相同的 ShareableList

>>> b = shared_memory.ShareableList(range(5))         # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()

以下示例演示了 ShareableList(以及底层 SharedMemory)对象可以根据需要进行腌制和取消腌制。 请注意,它仍然是同一个共享对象。 发生这种情况,因为反序列化的对象具有相同的唯一名称,并且只是附加到具有相同名称的现有对象(如果该对象还活着):

>>> import pickle
>>> from multiprocessing import shared_memory
>>> sl = shared_memory.ShareableList(range(10))
>>> list(sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> deserialized_sl = pickle.loads(pickle.dumps(sl))
>>> list(deserialized_sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl[0] = -1
>>> deserialized_sl[1] = -2
>>> list(sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(deserialized_sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl.shm.close()
>>> sl.shm.unlink()