diff options
author | Duprat <yduprat@gmail.com> | 2024-11-07 08:10:57 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-07 08:10:57 (GMT) |
commit | 75f7cf91ec5afc6091a0fd442a1f0435c19300b2 (patch) | |
tree | df031b829c314101ae9076f5f8f832f029d26fa2 /Lib/test/_test_multiprocessing.py | |
parent | d46d3f2ec783004f0927c9f5e6211a570360cf3b (diff) | |
download | cpython-75f7cf91ec5afc6091a0fd442a1f0435c19300b2.zip cpython-75f7cf91ec5afc6091a0fd442a1f0435c19300b2.tar.gz cpython-75f7cf91ec5afc6091a0fd442a1f0435c19300b2.tar.bz2 |
gh-125679: multiprocessing Lock and RLock - fix invalid representation string on MacOSX. (#125680)
Diffstat (limited to 'Lib/test/_test_multiprocessing.py')
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 122 |
1 files changed, 122 insertions, 0 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 77b618c..38ddb62 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1363,6 +1363,66 @@ class _TestQueue(BaseTestCase): class _TestLock(BaseTestCase): + @staticmethod + def _acquire(lock, l=None): + lock.acquire() + if l is not None: + l.append(repr(lock)) + + @staticmethod + def _acquire_event(lock, event): + lock.acquire() + event.set() + time.sleep(1.0) + + def test_repr_lock(self): + if self.TYPE != 'processes': + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + lock = self.Lock() + self.assertEqual(f'<Lock(owner=None)>', repr(lock)) + + lock.acquire() + self.assertEqual(f'<Lock(owner=MainProcess)>', repr(lock)) + lock.release() + + tname = 'T1' + l = [] + t = threading.Thread(target=self._acquire, + args=(lock, l), + name=tname) + t.start() + time.sleep(0.1) + self.assertEqual(f'<Lock(owner=MainProcess|{tname})>', l[0]) + lock.release() + + t = threading.Thread(target=self._acquire, + args=(lock,), + name=tname) + t.start() + time.sleep(0.1) + self.assertEqual('<Lock(owner=SomeOtherThread)>', repr(lock)) + lock.release() + + pname = 'P1' + l = multiprocessing.Manager().list() + p = self.Process(target=self._acquire, + args=(lock, l), + name=pname) + p.start() + p.join() + self.assertEqual(f'<Lock(owner={pname})>', l[0]) + + lock = self.Lock() + event = self.Event() + p = self.Process(target=self._acquire_event, + args=(lock, event), + name='P2') + p.start() + event.wait() + self.assertEqual(f'<Lock(owner=SomeOtherProcess)>', repr(lock)) + p.terminate() + def test_lock(self): lock = self.Lock() self.assertEqual(lock.acquire(), True) @@ -1370,6 +1430,68 @@ class _TestLock(BaseTestCase): self.assertEqual(lock.release(), None) self.assertRaises((ValueError, threading.ThreadError), lock.release) + @staticmethod + def _acquire_release(lock, timeout, l=None, n=1): + for _ in range(n): + lock.acquire() + if l is not None: + l.append(repr(lock)) + time.sleep(timeout) + for _ in range(n): + lock.release() + + def test_repr_rlock(self): + if self.TYPE != 'processes': + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + lock = self.RLock() + self.assertEqual('<RLock(None, 0)>', repr(lock)) + + n = 3 + for _ in range(n): + lock.acquire() + self.assertEqual(f'<RLock(MainProcess, {n})>', repr(lock)) + for _ in range(n): + lock.release() + + t, l = [], [] + for i in range(n): + t.append(threading.Thread(target=self._acquire_release, + args=(lock, 0.1, l, i+1), + name=f'T{i+1}')) + t[-1].start() + for t_ in t: + t_.join() + for i in range(n): + self.assertIn(f'<RLock(MainProcess|T{i+1}, {i+1})>', l) + + + t = threading.Thread(target=self._acquire_release, + args=(lock, 0.2), + name=f'T1') + t.start() + time.sleep(0.1) + self.assertEqual('<RLock(SomeOtherThread, nonzero)>', repr(lock)) + time.sleep(0.2) + + pname = 'P1' + l = multiprocessing.Manager().list() + p = self.Process(target=self._acquire_release, + args=(lock, 0.1, l), + name=pname) + p.start() + p.join() + self.assertEqual(f'<RLock({pname}, 1)>', l[0]) + + event = self.Event() + lock = self.RLock() + p = self.Process(target=self._acquire_event, + args=(lock, event)) + p.start() + event.wait() + self.assertEqual('<RLock(SomeOtherProcess, nonzero)>', repr(lock)) + p.join() + def test_rlock(self): lock = self.RLock() self.assertEqual(lock.acquire(), True) |