summaryrefslogtreecommitdiffstats
path: root/Lib/test/_test_multiprocessing.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/_test_multiprocessing.py')
-rw-r--r--Lib/test/_test_multiprocessing.py318
1 files changed, 285 insertions, 33 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 81db2c9..a860d9d 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -19,6 +19,7 @@ import random
import logging
import struct
import operator
+import pickle
import weakref
import warnings
import test.support
@@ -54,6 +55,12 @@ except ImportError:
HAS_SHAREDCTYPES = False
try:
+ from multiprocessing import shared_memory
+ HAS_SHMEM = True
+except ImportError:
+ HAS_SHMEM = False
+
+try:
import msvcrt
except ImportError:
msvcrt = None
@@ -3610,6 +3617,263 @@ class _TestSharedCTypes(BaseTestCase):
self.assertAlmostEqual(bar.y, 5.0)
self.assertEqual(bar.z, 2 ** 33)
+
+@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
+class _TestSharedMemory(BaseTestCase):
+
+ ALLOWED_TYPES = ('processes',)
+
+ @staticmethod
+ def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data):
+ if isinstance(shmem_name_or_obj, str):
+ local_sms = shared_memory.SharedMemory(shmem_name_or_obj)
+ else:
+ local_sms = shmem_name_or_obj
+ local_sms.buf[:len(binary_data)] = binary_data
+ local_sms.close()
+
+ def test_shared_memory_basics(self):
+ sms = shared_memory.SharedMemory('test01_tsmb', create=True, size=512)
+ self.addCleanup(sms.unlink)
+
+ # Verify attributes are readable.
+ self.assertEqual(sms.name, 'test01_tsmb')
+ self.assertGreaterEqual(sms.size, 512)
+ self.assertGreaterEqual(len(sms.buf), sms.size)
+
+ # Modify contents of shared memory segment through memoryview.
+ sms.buf[0] = 42
+ self.assertEqual(sms.buf[0], 42)
+
+ # Attach to existing shared memory segment.
+ also_sms = shared_memory.SharedMemory('test01_tsmb')
+ self.assertEqual(also_sms.buf[0], 42)
+ also_sms.close()
+
+ # Attach to existing shared memory segment but specify a new size.
+ same_sms = shared_memory.SharedMemory('test01_tsmb', size=20*sms.size)
+ self.assertLess(same_sms.size, 20*sms.size) # Size was ignored.
+ same_sms.close()
+
+ if shared_memory._USE_POSIX:
+ # Posix Shared Memory can only be unlinked once. Here we
+ # test an implementation detail that is not observed across
+ # all supported platforms (since WindowsNamedSharedMemory
+ # manages unlinking on its own and unlink() does nothing).
+ # True release of shared memory segment does not necessarily
+ # happen until process exits, depending on the OS platform.
+ with self.assertRaises(FileNotFoundError):
+ sms_uno = shared_memory.SharedMemory(
+ 'test01_dblunlink',
+ create=True,
+ size=5000
+ )
+
+ try:
+ self.assertGreaterEqual(sms_uno.size, 5000)
+
+ sms_duo = shared_memory.SharedMemory('test01_dblunlink')
+ sms_duo.unlink() # First shm_unlink() call.
+ sms_duo.close()
+ sms_uno.close()
+
+ finally:
+ sms_uno.unlink() # A second shm_unlink() call is bad.
+
+ with self.assertRaises(FileExistsError):
+ # Attempting to create a new shared memory segment with a
+ # name that is already in use triggers an exception.
+ there_can_only_be_one_sms = shared_memory.SharedMemory(
+ 'test01_tsmb',
+ create=True,
+ size=512
+ )
+
+ if shared_memory._USE_POSIX:
+ # Requesting creation of a shared memory segment with the option
+ # to attach to an existing segment, if that name is currently in
+ # use, should not trigger an exception.
+ # Note: Using a smaller size could possibly cause truncation of
+ # the existing segment but is OS platform dependent. In the
+ # case of MacOS/darwin, requesting a smaller size is disallowed.
+ class OptionalAttachSharedMemory(shared_memory.SharedMemory):
+ _flags = os.O_CREAT | os.O_RDWR
+ ok_if_exists_sms = OptionalAttachSharedMemory('test01_tsmb')
+ self.assertEqual(ok_if_exists_sms.size, sms.size)
+ ok_if_exists_sms.close()
+
+ # Attempting to attach to an existing shared memory segment when
+ # no segment exists with the supplied name triggers an exception.
+ with self.assertRaises(FileNotFoundError):
+ nonexisting_sms = shared_memory.SharedMemory('test01_notthere')
+ nonexisting_sms.unlink() # Error should occur on prior line.
+
+ sms.close()
+
+ def test_shared_memory_across_processes(self):
+ sms = shared_memory.SharedMemory('test02_tsmap', True, size=512)
+ self.addCleanup(sms.unlink)
+
+ # Verify remote attachment to existing block by name is working.
+ p = self.Process(
+ target=self._attach_existing_shmem_then_write,
+ args=(sms.name, b'howdy')
+ )
+ p.daemon = True
+ p.start()
+ p.join()
+ self.assertEqual(bytes(sms.buf[:5]), b'howdy')
+
+ # Verify pickling of SharedMemory instance also works.
+ p = self.Process(
+ target=self._attach_existing_shmem_then_write,
+ args=(sms, b'HELLO')
+ )
+ p.daemon = True
+ p.start()
+ p.join()
+ self.assertEqual(bytes(sms.buf[:5]), b'HELLO')
+
+ sms.close()
+
+ def test_shared_memory_SharedMemoryManager_basics(self):
+ smm1 = multiprocessing.managers.SharedMemoryManager()
+ with self.assertRaises(ValueError):
+ smm1.SharedMemory(size=9) # Fails if SharedMemoryServer not started
+ smm1.start()
+ lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ]
+ lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ]
+ doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name)
+ self.assertEqual(len(doppleganger_list0), 5)
+ doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name)
+ self.assertGreaterEqual(len(doppleganger_shm0.buf), 32)
+ held_name = lom[0].name
+ smm1.shutdown()
+ if sys.platform != "win32":
+ # Calls to unlink() have no effect on Windows platform; shared
+ # memory will only be released once final process exits.
+ with self.assertRaises(FileNotFoundError):
+ # No longer there to be attached to again.
+ absent_shm = shared_memory.SharedMemory(name=held_name)
+
+ with multiprocessing.managers.SharedMemoryManager() as smm2:
+ sl = smm2.ShareableList("howdy")
+ shm = smm2.SharedMemory(size=128)
+ held_name = sl.shm.name
+ if sys.platform != "win32":
+ with self.assertRaises(FileNotFoundError):
+ # No longer there to be attached to again.
+ absent_sl = shared_memory.ShareableList(name=held_name)
+
+
+ def test_shared_memory_ShareableList_basics(self):
+ sl = shared_memory.ShareableList(
+ ['howdy', b'HoWdY', -273.154, 100, None, True, 42]
+ )
+ self.addCleanup(sl.shm.unlink)
+
+ # Verify attributes are readable.
+ self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q')
+
+ # Exercise len().
+ self.assertEqual(len(sl), 7)
+
+ # Exercise index().
+ with warnings.catch_warnings():
+ # Suppress BytesWarning when comparing against b'HoWdY'.
+ warnings.simplefilter('ignore')
+ with self.assertRaises(ValueError):
+ sl.index('100')
+ self.assertEqual(sl.index(100), 3)
+
+ # Exercise retrieving individual values.
+ self.assertEqual(sl[0], 'howdy')
+ self.assertEqual(sl[-2], True)
+
+ # Exercise iterability.
+ self.assertEqual(
+ tuple(sl),
+ ('howdy', b'HoWdY', -273.154, 100, None, True, 42)
+ )
+
+ # Exercise modifying individual values.
+ sl[3] = 42
+ self.assertEqual(sl[3], 42)
+ sl[4] = 'some' # Change type at a given position.
+ self.assertEqual(sl[4], 'some')
+ self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q')
+ with self.assertRaises(ValueError):
+ sl[4] = 'far too many' # Exceeds available storage.
+ self.assertEqual(sl[4], 'some')
+
+ # Exercise count().
+ with warnings.catch_warnings():
+ # Suppress BytesWarning when comparing against b'HoWdY'.
+ warnings.simplefilter('ignore')
+ self.assertEqual(sl.count(42), 2)
+ self.assertEqual(sl.count(b'HoWdY'), 1)
+ self.assertEqual(sl.count(b'adios'), 0)
+
+ # Exercise creating a duplicate.
+ sl_copy = shared_memory.ShareableList(sl, name='test03_duplicate')
+ try:
+ self.assertNotEqual(sl.shm.name, sl_copy.shm.name)
+ self.assertEqual('test03_duplicate', sl_copy.shm.name)
+ self.assertEqual(list(sl), list(sl_copy))
+ self.assertEqual(sl.format, sl_copy.format)
+ sl_copy[-1] = 77
+ self.assertEqual(sl_copy[-1], 77)
+ self.assertNotEqual(sl[-1], 77)
+ sl_copy.shm.close()
+ finally:
+ sl_copy.shm.unlink()
+
+ # Obtain a second handle on the same ShareableList.
+ sl_tethered = shared_memory.ShareableList(name=sl.shm.name)
+ self.assertEqual(sl.shm.name, sl_tethered.shm.name)
+ sl_tethered[-1] = 880
+ self.assertEqual(sl[-1], 880)
+ sl_tethered.shm.close()
+
+ sl.shm.close()
+
+ # Exercise creating an empty ShareableList.
+ empty_sl = shared_memory.ShareableList()
+ try:
+ self.assertEqual(len(empty_sl), 0)
+ self.assertEqual(empty_sl.format, '')
+ self.assertEqual(empty_sl.count('any'), 0)
+ with self.assertRaises(ValueError):
+ empty_sl.index(None)
+ empty_sl.shm.close()
+ finally:
+ empty_sl.shm.unlink()
+
+ def test_shared_memory_ShareableList_pickling(self):
+ sl = shared_memory.ShareableList(range(10))
+ self.addCleanup(sl.shm.unlink)
+
+ serialized_sl = pickle.dumps(sl)
+ deserialized_sl = pickle.loads(serialized_sl)
+ self.assertTrue(
+ isinstance(deserialized_sl, shared_memory.ShareableList)
+ )
+ self.assertTrue(deserialized_sl[-1], 9)
+ self.assertFalse(sl is deserialized_sl)
+ deserialized_sl[4] = "changed"
+ self.assertEqual(sl[4], "changed")
+
+ # Verify data is not being put into the pickled representation.
+ name = 'a' * len(sl.shm.name)
+ larger_sl = shared_memory.ShareableList(range(400))
+ self.addCleanup(larger_sl.shm.unlink)
+ serialized_larger_sl = pickle.dumps(larger_sl)
+ self.assertTrue(len(serialized_sl) == len(serialized_larger_sl))
+ larger_sl.shm.close()
+
+ deserialized_sl.shm.close()
+ sl.shm.close()
+
#
#
#
@@ -4780,27 +5044,6 @@ class TestSyncManagerTypes(unittest.TestCase):
self.assertEqual(self.proc.exitcode, 0)
@classmethod
- def _test_queue(cls, obj):
- assert obj.qsize() == 2
- assert obj.full()
- assert not obj.empty()
- assert obj.get() == 5
- assert not obj.empty()
- assert obj.get() == 6
- assert obj.empty()
-
- def test_queue(self, qname="Queue"):
- o = getattr(self.manager, qname)(2)
- o.put(5)
- o.put(6)
- self.run_worker(self._test_queue, o)
- assert o.empty()
- assert not o.full()
-
- def test_joinable_queue(self):
- self.test_queue("JoinableQueue")
-
- @classmethod
def _test_event(cls, obj):
assert obj.is_set()
obj.wait()
@@ -4874,6 +5117,27 @@ class TestSyncManagerTypes(unittest.TestCase):
self.run_worker(self._test_pool, o)
@classmethod
+ def _test_queue(cls, obj):
+ assert obj.qsize() == 2
+ assert obj.full()
+ assert not obj.empty()
+ assert obj.get() == 5
+ assert not obj.empty()
+ assert obj.get() == 6
+ assert obj.empty()
+
+ def test_queue(self, qname="Queue"):
+ o = getattr(self.manager, qname)(2)
+ o.put(5)
+ o.put(6)
+ self.run_worker(self._test_queue, o)
+ assert o.empty()
+ assert not o.full()
+
+ def test_joinable_queue(self):
+ self.test_queue("JoinableQueue")
+
+ @classmethod
def _test_list(cls, obj):
assert obj[0] == 5
assert obj.count(5) == 1
@@ -4945,18 +5209,6 @@ class TestSyncManagerTypes(unittest.TestCase):
self.run_worker(self._test_namespace, o)
-try:
- import multiprocessing.shared_memory
-except ImportError:
- @unittest.skip("SharedMemoryManager not available on this platform")
- class TestSharedMemoryManagerTypes(TestSyncManagerTypes):
- pass
-else:
- class TestSharedMemoryManagerTypes(TestSyncManagerTypes):
- """Same as above but by using SharedMemoryManager."""
- manager_class = multiprocessing.shared_memory.SharedMemoryManager
-
-
class MiscTestCase(unittest.TestCase):
def test__all__(self):
# Just make sure names in blacklist are excluded