summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_importlib
diff options
context:
space:
mode:
authorAntoine Pitrou <pitrou@free.fr>2017-09-07 16:56:24 (GMT)
committerVictor Stinner <victor.stinner@gmail.com>2017-09-07 16:56:24 (GMT)
commita6a4dc816d68df04a7d592e0b6af8c7ecc4d4344 (patch)
tree1c31738009bee903417cea928e705a112aea2392 /Lib/test/test_importlib
parent1f06a680de465be0c24a78ea3b610053955daa99 (diff)
downloadcpython-a6a4dc816d68df04a7d592e0b6af8c7ecc4d4344.zip
cpython-a6a4dc816d68df04a7d592e0b6af8c7ecc4d4344.tar.gz
cpython-a6a4dc816d68df04a7d592e0b6af8c7ecc4d4344.tar.bz2
bpo-31370: Remove support for threads-less builds (#3385)
* Remove Setup.config * Always define WITH_THREAD for compatibility.
Diffstat (limited to 'Lib/test/test_importlib')
-rw-r--r--Lib/test/test_importlib/test_locks.py222
1 files changed, 100 insertions, 122 deletions
diff --git a/Lib/test/test_importlib/test_locks.py b/Lib/test/test_importlib/test_locks.py
index dbce9c2..d86172a 100644
--- a/Lib/test/test_importlib/test_locks.py
+++ b/Lib/test/test_importlib/test_locks.py
@@ -3,134 +3,112 @@ from . import util as test_util
init = test_util.import_importlib('importlib')
import sys
+import threading
import unittest
import weakref
from test import support
-
-try:
- import threading
-except ImportError:
- threading = None
-else:
- from test import lock_tests
-
-if threading is not None:
- class ModuleLockAsRLockTests:
- locktype = classmethod(lambda cls: cls.LockType("some_lock"))
-
- # _is_owned() unsupported
- test__is_owned = None
- # acquire(blocking=False) unsupported
- test_try_acquire = None
- test_try_acquire_contended = None
- # `with` unsupported
- test_with = None
- # acquire(timeout=...) unsupported
- test_timeout = None
- # _release_save() unsupported
- test_release_save_unacquired = None
- # lock status in repr unsupported
- test_repr = None
- test_locked_repr = None
-
- LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock
- for kind, splitinit in init.items()}
-
- (Frozen_ModuleLockAsRLockTests,
- Source_ModuleLockAsRLockTests
- ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests,
- LockType=LOCK_TYPES)
-else:
- LOCK_TYPES = {}
-
- class Frozen_ModuleLockAsRLockTests(unittest.TestCase):
- pass
-
- class Source_ModuleLockAsRLockTests(unittest.TestCase):
- pass
-
-
-if threading is not None:
- class DeadlockAvoidanceTests:
-
- def setUp(self):
+from test import lock_tests
+
+
+class ModuleLockAsRLockTests:
+ locktype = classmethod(lambda cls: cls.LockType("some_lock"))
+
+ # _is_owned() unsupported
+ test__is_owned = None
+ # acquire(blocking=False) unsupported
+ test_try_acquire = None
+ test_try_acquire_contended = None
+ # `with` unsupported
+ test_with = None
+ # acquire(timeout=...) unsupported
+ test_timeout = None
+ # _release_save() unsupported
+ test_release_save_unacquired = None
+ # lock status in repr unsupported
+ test_repr = None
+ test_locked_repr = None
+
+LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock
+ for kind, splitinit in init.items()}
+
+(Frozen_ModuleLockAsRLockTests,
+ Source_ModuleLockAsRLockTests
+ ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests,
+ LockType=LOCK_TYPES)
+
+
+class DeadlockAvoidanceTests:
+
+ def setUp(self):
+ try:
+ self.old_switchinterval = sys.getswitchinterval()
+ support.setswitchinterval(0.000001)
+ except AttributeError:
+ self.old_switchinterval = None
+
+ def tearDown(self):
+ if self.old_switchinterval is not None:
+ sys.setswitchinterval(self.old_switchinterval)
+
+ def run_deadlock_avoidance_test(self, create_deadlock):
+ NLOCKS = 10
+ locks = [self.LockType(str(i)) for i in range(NLOCKS)]
+ pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
+ if create_deadlock:
+ NTHREADS = NLOCKS
+ else:
+ NTHREADS = NLOCKS - 1
+ barrier = threading.Barrier(NTHREADS)
+ results = []
+
+ def _acquire(lock):
+ """Try to acquire the lock. Return True on success,
+ False on deadlock."""
try:
- self.old_switchinterval = sys.getswitchinterval()
- support.setswitchinterval(0.000001)
- except AttributeError:
- self.old_switchinterval = None
-
- def tearDown(self):
- if self.old_switchinterval is not None:
- sys.setswitchinterval(self.old_switchinterval)
-
- def run_deadlock_avoidance_test(self, create_deadlock):
- NLOCKS = 10
- locks = [self.LockType(str(i)) for i in range(NLOCKS)]
- pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
- if create_deadlock:
- NTHREADS = NLOCKS
+ lock.acquire()
+ except self.DeadlockError:
+ return False
else:
- NTHREADS = NLOCKS - 1
- barrier = threading.Barrier(NTHREADS)
- results = []
-
- def _acquire(lock):
- """Try to acquire the lock. Return True on success,
- False on deadlock."""
- try:
- lock.acquire()
- except self.DeadlockError:
- return False
- else:
- return True
-
- def f():
- a, b = pairs.pop()
- ra = _acquire(a)
- barrier.wait()
- rb = _acquire(b)
- results.append((ra, rb))
- if rb:
- b.release()
- if ra:
- a.release()
- lock_tests.Bunch(f, NTHREADS).wait_for_finished()
- self.assertEqual(len(results), NTHREADS)
- return results
-
- def test_deadlock(self):
- results = self.run_deadlock_avoidance_test(True)
- # At least one of the threads detected a potential deadlock on its
- # second acquire() call. It may be several of them, because the
- # deadlock avoidance mechanism is conservative.
- nb_deadlocks = results.count((True, False))
- self.assertGreaterEqual(nb_deadlocks, 1)
- self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
-
- def test_no_deadlock(self):
- results = self.run_deadlock_avoidance_test(False)
- self.assertEqual(results.count((True, False)), 0)
- self.assertEqual(results.count((True, True)), len(results))
-
-
- DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
- for kind, splitinit in init.items()}
-
- (Frozen_DeadlockAvoidanceTests,
- Source_DeadlockAvoidanceTests
- ) = test_util.test_both(DeadlockAvoidanceTests,
- LockType=LOCK_TYPES,
- DeadlockError=DEADLOCK_ERRORS)
-else:
- DEADLOCK_ERRORS = {}
-
- class Frozen_DeadlockAvoidanceTests(unittest.TestCase):
- pass
-
- class Source_DeadlockAvoidanceTests(unittest.TestCase):
- pass
+ return True
+
+ def f():
+ a, b = pairs.pop()
+ ra = _acquire(a)
+ barrier.wait()
+ rb = _acquire(b)
+ results.append((ra, rb))
+ if rb:
+ b.release()
+ if ra:
+ a.release()
+ lock_tests.Bunch(f, NTHREADS).wait_for_finished()
+ self.assertEqual(len(results), NTHREADS)
+ return results
+
+ def test_deadlock(self):
+ results = self.run_deadlock_avoidance_test(True)
+ # At least one of the threads detected a potential deadlock on its
+ # second acquire() call. It may be several of them, because the
+ # deadlock avoidance mechanism is conservative.
+ nb_deadlocks = results.count((True, False))
+ self.assertGreaterEqual(nb_deadlocks, 1)
+ self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
+
+ def test_no_deadlock(self):
+ results = self.run_deadlock_avoidance_test(False)
+ self.assertEqual(results.count((True, False)), 0)
+ self.assertEqual(results.count((True, True)), len(results))
+
+
+DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
+ for kind, splitinit in init.items()}
+
+(Frozen_DeadlockAvoidanceTests,
+ Source_DeadlockAvoidanceTests
+ ) = test_util.test_both(DeadlockAvoidanceTests,
+ LockType=LOCK_TYPES,
+ DeadlockError=DEADLOCK_ERRORS)
class LifetimeTests: