diff options
Diffstat (limited to 'Lib/test')
| -rw-r--r-- | Lib/test/support/__init__.py | 31 | ||||
| -rw-r--r-- | Lib/test/test_faulthandler.py | 30 | ||||
| -rw-r--r-- | Lib/test/test_os.py | 35 | ||||
| -rw-r--r-- | Lib/test/test_regrtest.py | 3 | ||||
| -rw-r--r-- | Lib/test/test_selectors.py | 3 | ||||
| -rw-r--r-- | Lib/test/test_site.py | 26 | ||||
| -rw-r--r-- | Lib/test/test_socket.py | 31 | ||||
| -rw-r--r-- | Lib/test/test_tcl.py | 22 | ||||
| -rw-r--r-- | Lib/test/test_threading.py | 206 |
9 files changed, 218 insertions, 169 deletions
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 5832ef6..5687ef9 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -860,24 +860,31 @@ if hasattr(os, "umask"): finally: os.umask(oldmask) -# TEST_HOME refers to the top level directory of the "test" package +# TEST_HOME_DIR refers to the top level directory of the "test" package # that contains Python's regression test suite -TEST_HOME = os.path.dirname(os.path.abspath(__file__)) +TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) +TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) -def findfile(file, here=TEST_HOME, subdir=None): +# TEST_DATA_DIR is used as a target download location for remote resources +TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") + +def findfile(filename, subdir=None): """Try to find a file on sys.path or in the test directory. If it is not found the argument passed to the function is returned (this does not - necessarily signal failure; could still be the legitimate path).""" - if os.path.isabs(file): - return file + necessarily signal failure; could still be the legitimate path). + + Setting *subdir* indicates a relative path to use to find the file + rather than looking directly in the path directories. + """ + if os.path.isabs(filename): + return filename if subdir is not None: - file = os.path.join(subdir, file) - path = sys.path - path = [os.path.dirname(here)] + path + filename = os.path.join(subdir, filename) + path = [TEST_HOME_DIR] + sys.path for dn in path: - fn = os.path.join(dn, file) + fn = os.path.join(dn, filename) if os.path.exists(fn): return fn - return file + return filename def create_empty_file(filename): """Create an empty file. If the file already exists, truncate it.""" @@ -914,7 +921,7 @@ def open_urlresource(url, *args, **kw): filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! - fn = os.path.join(os.path.dirname(__file__), "data", filename) + fn = os.path.join(TEST_DATA_DIR, filename) def check_valid_file(fn): f = open(fn, *args, **kw) diff --git a/Lib/test/test_faulthandler.py b/Lib/test/test_faulthandler.py index 4a8becf..d78bcb0 100644 --- a/Lib/test/test_faulthandler.py +++ b/Lib/test/test_faulthandler.py @@ -265,17 +265,33 @@ faulthandler._sigsegv() # By default, the module should be disabled code = "import faulthandler; print(faulthandler.is_enabled())" args = (sys.executable, '-E', '-c', code) - # use subprocess module directly because test.script_helper adds - # "-X faulthandler" to the command line - stdout = subprocess.check_output(args) - self.assertEqual(stdout.rstrip(), b"False") + # don't use assert_python_ok() because it always enable faulthandler + output = subprocess.check_output(args) + self.assertEqual(output.rstrip(), b"False") def test_sys_xoptions(self): # Test python -X faulthandler code = "import faulthandler; print(faulthandler.is_enabled())" - rc, stdout, stderr = assert_python_ok("-X", "faulthandler", "-c", code) - stdout = (stdout + stderr).strip() - self.assertEqual(stdout, b"True") + args = (sys.executable, "-E", "-X", "faulthandler", "-c", code) + # don't use assert_python_ok() because it always enable faulthandler + output = subprocess.check_output(args) + self.assertEqual(output.rstrip(), b"True") + + def test_env_var(self): + # empty env var + code = "import faulthandler; print(faulthandler.is_enabled())" + args = (sys.executable, "-c", code) + env = os.environ.copy() + env['PYTHONFAULTHANDLER'] = '' + # don't use assert_python_ok() because it always enable faulthandler + output = subprocess.check_output(args, env=env) + self.assertEqual(output.rstrip(), b"False") + + # non-empty env var + env = os.environ.copy() + env['PYTHONFAULTHANDLER'] = '1' + output = subprocess.check_output(args, env=env) + self.assertEqual(output.rstrip(), b"True") def check_dump_traceback(self, filename): """ diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index d0dd364..39b0e80 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -34,6 +34,10 @@ try: import resource except ImportError: resource = None +try: + import fcntl +except ImportError: + fcntl = None from test.script_helper import assert_python_ok @@ -2300,19 +2304,38 @@ class CPUCountTests(unittest.TestCase): class FDInheritanceTests(unittest.TestCase): - def test_get_inheritable(self): + def test_get_set_inheritable(self): fd = os.open(__file__, os.O_RDONLY) self.addCleanup(os.close, fd) - for inheritable in (False, True): - os.set_inheritable(fd, inheritable) - self.assertEqual(os.get_inheritable(fd), inheritable) + self.assertEqual(os.get_inheritable(fd), False) + + os.set_inheritable(fd, True) + self.assertEqual(os.get_inheritable(fd), True) - def test_set_inheritable(self): + @unittest.skipIf(fcntl is None, "need fcntl") + def test_get_inheritable_cloexec(self): fd = os.open(__file__, os.O_RDONLY) self.addCleanup(os.close, fd) - os.set_inheritable(fd, True) + self.assertEqual(os.get_inheritable(fd), False) + + # clear FD_CLOEXEC flag + flags = fcntl.fcntl(fd, fcntl.F_GETFD) + flags &= ~fcntl.FD_CLOEXEC + fcntl.fcntl(fd, fcntl.F_SETFD, flags) + self.assertEqual(os.get_inheritable(fd), True) + @unittest.skipIf(fcntl is None, "need fcntl") + def test_set_inheritable_cloexec(self): + fd = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd) + self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, + fcntl.FD_CLOEXEC) + + os.set_inheritable(fd, True) + self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, + 0) + def test_open(self): fd = os.open(__file__, os.O_RDONLY) self.addCleanup(os.close, fd) diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py index 289fb22..353874b 100644 --- a/Lib/test/test_regrtest.py +++ b/Lib/test/test_regrtest.py @@ -3,6 +3,7 @@ Tests of regrtest.py. """ import argparse +import faulthandler import getopt import os.path import unittest @@ -25,6 +26,8 @@ class ParseArgsTestCase(unittest.TestCase): regrtest._parse_args([opt]) self.assertIn('Run Python regression tests.', out.getvalue()) + @unittest.skipUnless(hasattr(faulthandler, 'dump_traceback_later'), + "faulthandler.dump_traceback_later() required") def test_timeout(self): ns = regrtest._parse_args(['--timeout', '4.2']) self.assertEqual(ns.timeout, 4.2) diff --git a/Lib/test/test_selectors.py b/Lib/test/test_selectors.py index 2657a50..6ce4d8a 100644 --- a/Lib/test/test_selectors.py +++ b/Lib/test/test_selectors.py @@ -301,6 +301,7 @@ class BaseSelectorTestCase(unittest.TestCase): class ScalableSelectorMixIn: + # see issue #18963 for why it's skipped on older OS X versions @support.requires_mac_ver(10, 5) @unittest.skipUnless(resource, "Test needs resource module") def test_above_fd_setsize(self): @@ -313,7 +314,7 @@ class ScalableSelectorMixIn: self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE, (soft, hard)) NUM_FDS = hard - except OSError: + except (OSError, ValueError): NUM_FDS = soft # guard for already allocated FDs (stdin, stdout...) diff --git a/Lib/test/test_site.py b/Lib/test/test_site.py index 4aff932..34d83f2 100644 --- a/Lib/test/test_site.py +++ b/Lib/test/test_site.py @@ -5,6 +5,7 @@ executing have not been removed. """ import unittest +import test.support from test.support import run_unittest, TESTFN, EnvironmentVarGuard from test.support import captured_stderr import builtins @@ -373,9 +374,10 @@ class ImportSideEffectTests(unittest.TestCase): self.assertTrue(hasattr(builtins, "exit")) def test_setting_copyright(self): - # 'copyright' and 'credits' should be in builtins + # 'copyright', 'credits', and 'license' should be in builtins self.assertTrue(hasattr(builtins, "copyright")) self.assertTrue(hasattr(builtins, "credits")) + self.assertTrue(hasattr(builtins, "license")) def test_setting_help(self): # 'help' should be set in builtins @@ -402,5 +404,27 @@ class ImportSideEffectTests(unittest.TestCase): self.fail("sitecustomize not imported automatically") +class LicenseURL(unittest.TestCase): + """Test accessibility of the license.""" + + @unittest.skipUnless(str(license).startswith('See http://'), + 'license is available as a file') + def test_license_page(self): + """urlopen should return the license page""" + pat = r'^See (http://www\.python\.org/download/releases/[^/]+/license/)$' + mo = re.search(pat, str(license)) + self.assertIsNotNone(mo, msg='can\'t find appropriate url in license') + if mo is not None: + url = mo.group(1) + with test.support.transient_internet(url): + import urllib.request, urllib.error + try: + with urllib.request.urlopen(url) as data: + code = data.getcode() + except urllib.error.HTTPError as e: + code = e.code + self.assertEqual(code, 200, msg=url) + + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 6205768..490f776 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -26,6 +26,10 @@ try: import multiprocessing except ImportError: multiprocessing = False +try: + import fcntl +except ImportError: + fcntl = None HOST = support.HOST MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') ## test unicode string and carriage return @@ -4804,6 +4808,33 @@ class InheritanceTest(unittest.TestCase): sock.set_inheritable(False) self.assertEqual(sock.get_inheritable(), False) + @unittest.skipIf(fcntl is None, "need fcntl") + def test_get_inheritable_cloexec(self): + sock = socket.socket() + with sock: + fd = sock.fileno() + self.assertEqual(sock.get_inheritable(), False) + + # clear FD_CLOEXEC flag + flags = fcntl.fcntl(fd, fcntl.F_GETFD) + flags &= ~fcntl.FD_CLOEXEC + fcntl.fcntl(fd, fcntl.F_SETFD, flags) + + self.assertEqual(sock.get_inheritable(), True) + + @unittest.skipIf(fcntl is None, "need fcntl") + def test_set_inheritable_cloexec(self): + sock = socket.socket() + with sock: + fd = sock.fileno() + self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, + fcntl.FD_CLOEXEC) + + sock.set_inheritable(True) + self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, + 0) + + @unittest.skipUnless(hasattr(socket, "socketpair"), "need socket.socketpair()") def test_socketpair(self): diff --git a/Lib/test/test_tcl.py b/Lib/test/test_tcl.py index 4e52fd4..cf717d8 100644 --- a/Lib/test/test_tcl.py +++ b/Lib/test/test_tcl.py @@ -15,6 +15,14 @@ support.import_fresh_module('tkinter') from tkinter import Tcl from _tkinter import TclError +tcl_version = _tkinter.TCL_VERSION.split('.') +try: + for i in range(len(tcl_version)): + tcl_version[i] = int(tcl_version[i]) +except ValueError: + pass +tcl_version = tuple(tcl_version) + class TkinterTest(unittest.TestCase): @@ -200,9 +208,12 @@ class TclTest(unittest.TestCase): (('a', 3.4), ('a', 3.4)), ((), ()), (call('list', 1, '2', (3.4,)), (1, '2', (3.4,))), - (call('dict', 'create', 1, '\u20ac', b'\xe2\x82\xac', (3.4,)), - (1, '\u20ac', '\u20ac', (3.4,))), ] + if tcl_version >= (8, 5): + testcases += [ + (call('dict', 'create', 1, '\u20ac', b'\xe2\x82\xac', (3.4,)), + (1, '\u20ac', '\u20ac', (3.4,))), + ] for arg, res in testcases: self.assertEqual(splitlist(arg), res, msg=arg) self.assertRaises(TclError, splitlist, '{') @@ -234,9 +245,12 @@ class TclTest(unittest.TestCase): (('a', (2, 3.4)), ('a', (2, 3.4))), ((), ()), (call('list', 1, '2', (3.4,)), (1, '2', (3.4,))), - (call('dict', 'create', 12, '\u20ac', b'\xe2\x82\xac', (3.4,)), - (12, '\u20ac', '\u20ac', (3.4,))), ] + if tcl_version >= (8, 5): + testcases += [ + (call('dict', 'create', 12, '\u20ac', b'\xe2\x82\xac', (3.4,)), + (12, '\u20ac', '\u20ac', (3.4,))), + ] for arg, res in testcases: self.assertEqual(split(arg), res, msg=arg) diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 971a635..75ae247 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -109,7 +109,7 @@ class ThreadTests(BaseTestCase): if verbose: print('waiting for all tasks to complete') for t in threads: - t.join(NUMTASKS) + t.join() self.assertTrue(not t.is_alive()) self.assertNotEqual(t.ident, 0) self.assertFalse(t.ident is None) @@ -539,6 +539,40 @@ class ThreadTests(BaseTestCase): self.assertEqual(err, b"") self.assertEqual(data, "Thread-1\nTrue\nTrue\n") + def test_tstate_lock(self): + # Test an implementation detail of Thread objects. + started = _thread.allocate_lock() + finish = _thread.allocate_lock() + started.acquire() + finish.acquire() + def f(): + started.release() + finish.acquire() + time.sleep(0.01) + # The tstate lock is None until the thread is started + t = threading.Thread(target=f) + self.assertIs(t._tstate_lock, None) + t.start() + started.acquire() + self.assertTrue(t.is_alive()) + # The tstate lock can't be acquired when the thread is running + # (or suspended). + tstate_lock = t._tstate_lock + self.assertFalse(tstate_lock.acquire(timeout=0), False) + finish.release() + # When the thread ends, the state_lock can be successfully + # acquired. + self.assertTrue(tstate_lock.acquire(timeout=5), False) + # But is_alive() is still True: we hold _tstate_lock now, which + # prevents is_alive() from knowing the thread's end-of-life C code + # is done. + self.assertTrue(t.is_alive()) + # Let is_alive() find out the C code is done. + tstate_lock.release() + self.assertFalse(t.is_alive()) + # And verify the thread disposed of _tstate_lock. + self.assertTrue(t._tstate_lock is None) + class ThreadJoinOnShutdown(BaseTestCase): @@ -613,144 +647,8 @@ class ThreadJoinOnShutdown(BaseTestCase): """ self._run_and_join(script) - def assertScriptHasOutput(self, script, expected_output): - rc, out, err = assert_python_ok("-c", script) - data = out.decode().replace('\r', '') - self.assertEqual(data, expected_output) - - @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") - @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") - def test_4_joining_across_fork_in_worker_thread(self): - # There used to be a possible deadlock when forking from a child - # thread. See http://bugs.python.org/issue6643. - - # The script takes the following steps: - # - The main thread in the parent process starts a new thread and then - # tries to join it. - # - The join operation acquires the Lock inside the thread's _block - # Condition. (See threading.py:Thread.join().) - # - We stub out the acquire method on the condition to force it to wait - # until the child thread forks. (See LOCK ACQUIRED HERE) - # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS - # HERE) - # - The main thread of the parent process enters Condition.wait(), - # which releases the lock on the child thread. - # - The child process returns. Without the necessary fix, when the - # main thread of the child process (which used to be the child thread - # in the parent process) attempts to exit, it will try to acquire the - # lock in the Thread._block Condition object and hang, because the - # lock was held across the fork. - - script = """if 1: - import os, time, threading - - finish_join = False - start_fork = False - - def worker(): - # Wait until this thread's lock is acquired before forking to - # create the deadlock. - global finish_join - while not start_fork: - time.sleep(0.01) - # LOCK HELD: Main thread holds lock across this call. - childpid = os.fork() - finish_join = True - if childpid != 0: - # Parent process just waits for child. - os.waitpid(childpid, 0) - # Child process should just return. - - w = threading.Thread(target=worker) - - # Stub out the private condition variable's lock acquire method. - # This acquires the lock and then waits until the child has forked - # before returning, which will release the lock soon after. If - # someone else tries to fix this test case by acquiring this lock - # before forking instead of resetting it, the test case will - # deadlock when it shouldn't. - condition = w._block - orig_acquire = condition.acquire - call_count_lock = threading.Lock() - call_count = 0 - def my_acquire(): - global call_count - global start_fork - orig_acquire() # LOCK ACQUIRED HERE - start_fork = True - if call_count == 0: - while not finish_join: - time.sleep(0.01) # WORKER THREAD FORKS HERE - with call_count_lock: - call_count += 1 - condition.acquire = my_acquire - - w.start() - w.join() - print('end of main') - """ - self.assertScriptHasOutput(script, "end of main\n") - - @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") - @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") - def test_5_clear_waiter_locks_to_avoid_crash(self): - # Check that a spawned thread that forks doesn't segfault on certain - # platforms, namely OS X. This used to happen if there was a waiter - # lock in the thread's condition variable's waiters list. Even though - # we know the lock will be held across the fork, it is not safe to - # release locks held across forks on all platforms, so releasing the - # waiter lock caused a segfault on OS X. Furthermore, since locks on - # OS X are (as of this writing) implemented with a mutex + condition - # variable instead of a semaphore, while we know that the Python-level - # lock will be acquired, we can't know if the internal mutex will be - # acquired at the time of the fork. - - script = """if True: - import os, time, threading - - start_fork = False - - def worker(): - # Wait until the main thread has attempted to join this thread - # before continuing. - while not start_fork: - time.sleep(0.01) - childpid = os.fork() - if childpid != 0: - # Parent process just waits for child. - (cpid, rc) = os.waitpid(childpid, 0) - assert cpid == childpid - assert rc == 0 - print('end of worker thread') - else: - # Child process should just return. - pass - - w = threading.Thread(target=worker) - - # Stub out the private condition variable's _release_save method. - # This releases the condition's lock and flips the global that - # causes the worker to fork. At this point, the problematic waiter - # lock has been acquired once by the waiter and has been put onto - # the waiters list. - condition = w._block - orig_release_save = condition._release_save - def my_release_save(): - global start_fork - orig_release_save() - # Waiter lock held here, condition lock released. - start_fork = True - condition._release_save = my_release_save - - w.start() - w.join() - print('end of main thread') - """ - output = "end of worker thread\nend of main thread\n" - self.assertScriptHasOutput(script, output) - @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") - def test_6_daemon_threads(self): + def test_4_daemon_threads(self): # Check that a daemon thread cannot crash the interpreter on shutdown # by manipulating internal structures that are being disposed of in # the main thread. @@ -867,6 +765,38 @@ class SubinterpThreadingTests(BaseTestCase): # The thread was joined properly. self.assertEqual(os.read(r, 1), b"x") + def test_threads_join_2(self): + # Same as above, but a delay gets introduced after the thread's + # Python code returned but before the thread state is deleted. + # To achieve this, we register a thread-local object which sleeps + # a bit when deallocated. + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + code = r"""if 1: + import os + import threading + import time + + class Sleeper: + def __del__(self): + time.sleep(0.05) + + tls = threading.local() + + def f(): + # Sleep a bit so that the thread is still running when + # Py_EndInterpreter is called. + time.sleep(0.05) + tls.x = Sleeper() + os.write(%d, b"x") + threading.Thread(target=f).start() + """ % (w,) + ret = _testcapi.run_in_subinterp(code) + self.assertEqual(ret, 0) + # The thread was joined properly. + self.assertEqual(os.read(r, 1), b"x") + def test_daemon_threads_fatal_error(self): subinterp_code = r"""if 1: import os |
