summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/support/__init__.py31
-rw-r--r--Lib/test/test_faulthandler.py30
-rw-r--r--Lib/test/test_os.py35
-rw-r--r--Lib/test/test_regrtest.py3
-rw-r--r--Lib/test/test_selectors.py3
-rw-r--r--Lib/test/test_site.py26
-rw-r--r--Lib/test/test_socket.py31
-rw-r--r--Lib/test/test_tcl.py22
-rw-r--r--Lib/test/test_threading.py206
9 files changed, 218 insertions, 169 deletions
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 5832ef6..5687ef9 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -860,24 +860,31 @@ if hasattr(os, "umask"):
finally:
os.umask(oldmask)
-# TEST_HOME refers to the top level directory of the "test" package
+# TEST_HOME_DIR refers to the top level directory of the "test" package
# that contains Python's regression test suite
-TEST_HOME = os.path.dirname(os.path.abspath(__file__))
+TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
+TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
-def findfile(file, here=TEST_HOME, subdir=None):
+# TEST_DATA_DIR is used as a target download location for remote resources
+TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
+
+def findfile(filename, subdir=None):
"""Try to find a file on sys.path or in the test directory. If it is not
found the argument passed to the function is returned (this does not
- necessarily signal failure; could still be the legitimate path)."""
- if os.path.isabs(file):
- return file
+ necessarily signal failure; could still be the legitimate path).
+
+ Setting *subdir* indicates a relative path to use to find the file
+ rather than looking directly in the path directories.
+ """
+ if os.path.isabs(filename):
+ return filename
if subdir is not None:
- file = os.path.join(subdir, file)
- path = sys.path
- path = [os.path.dirname(here)] + path
+ filename = os.path.join(subdir, filename)
+ path = [TEST_HOME_DIR] + sys.path
for dn in path:
- fn = os.path.join(dn, file)
+ fn = os.path.join(dn, filename)
if os.path.exists(fn): return fn
- return file
+ return filename
def create_empty_file(filename):
"""Create an empty file. If the file already exists, truncate it."""
@@ -914,7 +921,7 @@ def open_urlresource(url, *args, **kw):
filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
- fn = os.path.join(os.path.dirname(__file__), "data", filename)
+ fn = os.path.join(TEST_DATA_DIR, filename)
def check_valid_file(fn):
f = open(fn, *args, **kw)
diff --git a/Lib/test/test_faulthandler.py b/Lib/test/test_faulthandler.py
index 4a8becf..d78bcb0 100644
--- a/Lib/test/test_faulthandler.py
+++ b/Lib/test/test_faulthandler.py
@@ -265,17 +265,33 @@ faulthandler._sigsegv()
# By default, the module should be disabled
code = "import faulthandler; print(faulthandler.is_enabled())"
args = (sys.executable, '-E', '-c', code)
- # use subprocess module directly because test.script_helper adds
- # "-X faulthandler" to the command line
- stdout = subprocess.check_output(args)
- self.assertEqual(stdout.rstrip(), b"False")
+ # don't use assert_python_ok() because it always enable faulthandler
+ output = subprocess.check_output(args)
+ self.assertEqual(output.rstrip(), b"False")
def test_sys_xoptions(self):
# Test python -X faulthandler
code = "import faulthandler; print(faulthandler.is_enabled())"
- rc, stdout, stderr = assert_python_ok("-X", "faulthandler", "-c", code)
- stdout = (stdout + stderr).strip()
- self.assertEqual(stdout, b"True")
+ args = (sys.executable, "-E", "-X", "faulthandler", "-c", code)
+ # don't use assert_python_ok() because it always enable faulthandler
+ output = subprocess.check_output(args)
+ self.assertEqual(output.rstrip(), b"True")
+
+ def test_env_var(self):
+ # empty env var
+ code = "import faulthandler; print(faulthandler.is_enabled())"
+ args = (sys.executable, "-c", code)
+ env = os.environ.copy()
+ env['PYTHONFAULTHANDLER'] = ''
+ # don't use assert_python_ok() because it always enable faulthandler
+ output = subprocess.check_output(args, env=env)
+ self.assertEqual(output.rstrip(), b"False")
+
+ # non-empty env var
+ env = os.environ.copy()
+ env['PYTHONFAULTHANDLER'] = '1'
+ output = subprocess.check_output(args, env=env)
+ self.assertEqual(output.rstrip(), b"True")
def check_dump_traceback(self, filename):
"""
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index d0dd364..39b0e80 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -34,6 +34,10 @@ try:
import resource
except ImportError:
resource = None
+try:
+ import fcntl
+except ImportError:
+ fcntl = None
from test.script_helper import assert_python_ok
@@ -2300,19 +2304,38 @@ class CPUCountTests(unittest.TestCase):
class FDInheritanceTests(unittest.TestCase):
- def test_get_inheritable(self):
+ def test_get_set_inheritable(self):
fd = os.open(__file__, os.O_RDONLY)
self.addCleanup(os.close, fd)
- for inheritable in (False, True):
- os.set_inheritable(fd, inheritable)
- self.assertEqual(os.get_inheritable(fd), inheritable)
+ self.assertEqual(os.get_inheritable(fd), False)
+
+ os.set_inheritable(fd, True)
+ self.assertEqual(os.get_inheritable(fd), True)
- def test_set_inheritable(self):
+ @unittest.skipIf(fcntl is None, "need fcntl")
+ def test_get_inheritable_cloexec(self):
fd = os.open(__file__, os.O_RDONLY)
self.addCleanup(os.close, fd)
- os.set_inheritable(fd, True)
+ self.assertEqual(os.get_inheritable(fd), False)
+
+ # clear FD_CLOEXEC flag
+ flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+ flags &= ~fcntl.FD_CLOEXEC
+ fcntl.fcntl(fd, fcntl.F_SETFD, flags)
+
self.assertEqual(os.get_inheritable(fd), True)
+ @unittest.skipIf(fcntl is None, "need fcntl")
+ def test_set_inheritable_cloexec(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
+ fcntl.FD_CLOEXEC)
+
+ os.set_inheritable(fd, True)
+ self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
+ 0)
+
def test_open(self):
fd = os.open(__file__, os.O_RDONLY)
self.addCleanup(os.close, fd)
diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py
index 289fb22..353874b 100644
--- a/Lib/test/test_regrtest.py
+++ b/Lib/test/test_regrtest.py
@@ -3,6 +3,7 @@ Tests of regrtest.py.
"""
import argparse
+import faulthandler
import getopt
import os.path
import unittest
@@ -25,6 +26,8 @@ class ParseArgsTestCase(unittest.TestCase):
regrtest._parse_args([opt])
self.assertIn('Run Python regression tests.', out.getvalue())
+ @unittest.skipUnless(hasattr(faulthandler, 'dump_traceback_later'),
+ "faulthandler.dump_traceback_later() required")
def test_timeout(self):
ns = regrtest._parse_args(['--timeout', '4.2'])
self.assertEqual(ns.timeout, 4.2)
diff --git a/Lib/test/test_selectors.py b/Lib/test/test_selectors.py
index 2657a50..6ce4d8a 100644
--- a/Lib/test/test_selectors.py
+++ b/Lib/test/test_selectors.py
@@ -301,6 +301,7 @@ class BaseSelectorTestCase(unittest.TestCase):
class ScalableSelectorMixIn:
+ # see issue #18963 for why it's skipped on older OS X versions
@support.requires_mac_ver(10, 5)
@unittest.skipUnless(resource, "Test needs resource module")
def test_above_fd_setsize(self):
@@ -313,7 +314,7 @@ class ScalableSelectorMixIn:
self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE,
(soft, hard))
NUM_FDS = hard
- except OSError:
+ except (OSError, ValueError):
NUM_FDS = soft
# guard for already allocated FDs (stdin, stdout...)
diff --git a/Lib/test/test_site.py b/Lib/test/test_site.py
index 4aff932..34d83f2 100644
--- a/Lib/test/test_site.py
+++ b/Lib/test/test_site.py
@@ -5,6 +5,7 @@ executing have not been removed.
"""
import unittest
+import test.support
from test.support import run_unittest, TESTFN, EnvironmentVarGuard
from test.support import captured_stderr
import builtins
@@ -373,9 +374,10 @@ class ImportSideEffectTests(unittest.TestCase):
self.assertTrue(hasattr(builtins, "exit"))
def test_setting_copyright(self):
- # 'copyright' and 'credits' should be in builtins
+ # 'copyright', 'credits', and 'license' should be in builtins
self.assertTrue(hasattr(builtins, "copyright"))
self.assertTrue(hasattr(builtins, "credits"))
+ self.assertTrue(hasattr(builtins, "license"))
def test_setting_help(self):
# 'help' should be set in builtins
@@ -402,5 +404,27 @@ class ImportSideEffectTests(unittest.TestCase):
self.fail("sitecustomize not imported automatically")
+class LicenseURL(unittest.TestCase):
+ """Test accessibility of the license."""
+
+ @unittest.skipUnless(str(license).startswith('See http://'),
+ 'license is available as a file')
+ def test_license_page(self):
+ """urlopen should return the license page"""
+ pat = r'^See (http://www\.python\.org/download/releases/[^/]+/license/)$'
+ mo = re.search(pat, str(license))
+ self.assertIsNotNone(mo, msg='can\'t find appropriate url in license')
+ if mo is not None:
+ url = mo.group(1)
+ with test.support.transient_internet(url):
+ import urllib.request, urllib.error
+ try:
+ with urllib.request.urlopen(url) as data:
+ code = data.getcode()
+ except urllib.error.HTTPError as e:
+ code = e.code
+ self.assertEqual(code, 200, msg=url)
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 6205768..490f776 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -26,6 +26,10 @@ try:
import multiprocessing
except ImportError:
multiprocessing = False
+try:
+ import fcntl
+except ImportError:
+ fcntl = None
HOST = support.HOST
MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') ## test unicode string and carriage return
@@ -4804,6 +4808,33 @@ class InheritanceTest(unittest.TestCase):
sock.set_inheritable(False)
self.assertEqual(sock.get_inheritable(), False)
+ @unittest.skipIf(fcntl is None, "need fcntl")
+ def test_get_inheritable_cloexec(self):
+ sock = socket.socket()
+ with sock:
+ fd = sock.fileno()
+ self.assertEqual(sock.get_inheritable(), False)
+
+ # clear FD_CLOEXEC flag
+ flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+ flags &= ~fcntl.FD_CLOEXEC
+ fcntl.fcntl(fd, fcntl.F_SETFD, flags)
+
+ self.assertEqual(sock.get_inheritable(), True)
+
+ @unittest.skipIf(fcntl is None, "need fcntl")
+ def test_set_inheritable_cloexec(self):
+ sock = socket.socket()
+ with sock:
+ fd = sock.fileno()
+ self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
+ fcntl.FD_CLOEXEC)
+
+ sock.set_inheritable(True)
+ self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
+ 0)
+
+
@unittest.skipUnless(hasattr(socket, "socketpair"),
"need socket.socketpair()")
def test_socketpair(self):
diff --git a/Lib/test/test_tcl.py b/Lib/test/test_tcl.py
index 4e52fd4..cf717d8 100644
--- a/Lib/test/test_tcl.py
+++ b/Lib/test/test_tcl.py
@@ -15,6 +15,14 @@ support.import_fresh_module('tkinter')
from tkinter import Tcl
from _tkinter import TclError
+tcl_version = _tkinter.TCL_VERSION.split('.')
+try:
+ for i in range(len(tcl_version)):
+ tcl_version[i] = int(tcl_version[i])
+except ValueError:
+ pass
+tcl_version = tuple(tcl_version)
+
class TkinterTest(unittest.TestCase):
@@ -200,9 +208,12 @@ class TclTest(unittest.TestCase):
(('a', 3.4), ('a', 3.4)),
((), ()),
(call('list', 1, '2', (3.4,)), (1, '2', (3.4,))),
- (call('dict', 'create', 1, '\u20ac', b'\xe2\x82\xac', (3.4,)),
- (1, '\u20ac', '\u20ac', (3.4,))),
]
+ if tcl_version >= (8, 5):
+ testcases += [
+ (call('dict', 'create', 1, '\u20ac', b'\xe2\x82\xac', (3.4,)),
+ (1, '\u20ac', '\u20ac', (3.4,))),
+ ]
for arg, res in testcases:
self.assertEqual(splitlist(arg), res, msg=arg)
self.assertRaises(TclError, splitlist, '{')
@@ -234,9 +245,12 @@ class TclTest(unittest.TestCase):
(('a', (2, 3.4)), ('a', (2, 3.4))),
((), ()),
(call('list', 1, '2', (3.4,)), (1, '2', (3.4,))),
- (call('dict', 'create', 12, '\u20ac', b'\xe2\x82\xac', (3.4,)),
- (12, '\u20ac', '\u20ac', (3.4,))),
]
+ if tcl_version >= (8, 5):
+ testcases += [
+ (call('dict', 'create', 12, '\u20ac', b'\xe2\x82\xac', (3.4,)),
+ (12, '\u20ac', '\u20ac', (3.4,))),
+ ]
for arg, res in testcases:
self.assertEqual(split(arg), res, msg=arg)
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 971a635..75ae247 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -109,7 +109,7 @@ class ThreadTests(BaseTestCase):
if verbose:
print('waiting for all tasks to complete')
for t in threads:
- t.join(NUMTASKS)
+ t.join()
self.assertTrue(not t.is_alive())
self.assertNotEqual(t.ident, 0)
self.assertFalse(t.ident is None)
@@ -539,6 +539,40 @@ class ThreadTests(BaseTestCase):
self.assertEqual(err, b"")
self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
+ def test_tstate_lock(self):
+ # Test an implementation detail of Thread objects.
+ started = _thread.allocate_lock()
+ finish = _thread.allocate_lock()
+ started.acquire()
+ finish.acquire()
+ def f():
+ started.release()
+ finish.acquire()
+ time.sleep(0.01)
+ # The tstate lock is None until the thread is started
+ t = threading.Thread(target=f)
+ self.assertIs(t._tstate_lock, None)
+ t.start()
+ started.acquire()
+ self.assertTrue(t.is_alive())
+ # The tstate lock can't be acquired when the thread is running
+ # (or suspended).
+ tstate_lock = t._tstate_lock
+ self.assertFalse(tstate_lock.acquire(timeout=0), False)
+ finish.release()
+ # When the thread ends, the state_lock can be successfully
+ # acquired.
+ self.assertTrue(tstate_lock.acquire(timeout=5), False)
+ # But is_alive() is still True: we hold _tstate_lock now, which
+ # prevents is_alive() from knowing the thread's end-of-life C code
+ # is done.
+ self.assertTrue(t.is_alive())
+ # Let is_alive() find out the C code is done.
+ tstate_lock.release()
+ self.assertFalse(t.is_alive())
+ # And verify the thread disposed of _tstate_lock.
+ self.assertTrue(t._tstate_lock is None)
+
class ThreadJoinOnShutdown(BaseTestCase):
@@ -613,144 +647,8 @@ class ThreadJoinOnShutdown(BaseTestCase):
"""
self._run_and_join(script)
- def assertScriptHasOutput(self, script, expected_output):
- rc, out, err = assert_python_ok("-c", script)
- data = out.decode().replace('\r', '')
- self.assertEqual(data, expected_output)
-
- @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
- @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- def test_4_joining_across_fork_in_worker_thread(self):
- # There used to be a possible deadlock when forking from a child
- # thread. See http://bugs.python.org/issue6643.
-
- # The script takes the following steps:
- # - The main thread in the parent process starts a new thread and then
- # tries to join it.
- # - The join operation acquires the Lock inside the thread's _block
- # Condition. (See threading.py:Thread.join().)
- # - We stub out the acquire method on the condition to force it to wait
- # until the child thread forks. (See LOCK ACQUIRED HERE)
- # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS
- # HERE)
- # - The main thread of the parent process enters Condition.wait(),
- # which releases the lock on the child thread.
- # - The child process returns. Without the necessary fix, when the
- # main thread of the child process (which used to be the child thread
- # in the parent process) attempts to exit, it will try to acquire the
- # lock in the Thread._block Condition object and hang, because the
- # lock was held across the fork.
-
- script = """if 1:
- import os, time, threading
-
- finish_join = False
- start_fork = False
-
- def worker():
- # Wait until this thread's lock is acquired before forking to
- # create the deadlock.
- global finish_join
- while not start_fork:
- time.sleep(0.01)
- # LOCK HELD: Main thread holds lock across this call.
- childpid = os.fork()
- finish_join = True
- if childpid != 0:
- # Parent process just waits for child.
- os.waitpid(childpid, 0)
- # Child process should just return.
-
- w = threading.Thread(target=worker)
-
- # Stub out the private condition variable's lock acquire method.
- # This acquires the lock and then waits until the child has forked
- # before returning, which will release the lock soon after. If
- # someone else tries to fix this test case by acquiring this lock
- # before forking instead of resetting it, the test case will
- # deadlock when it shouldn't.
- condition = w._block
- orig_acquire = condition.acquire
- call_count_lock = threading.Lock()
- call_count = 0
- def my_acquire():
- global call_count
- global start_fork
- orig_acquire() # LOCK ACQUIRED HERE
- start_fork = True
- if call_count == 0:
- while not finish_join:
- time.sleep(0.01) # WORKER THREAD FORKS HERE
- with call_count_lock:
- call_count += 1
- condition.acquire = my_acquire
-
- w.start()
- w.join()
- print('end of main')
- """
- self.assertScriptHasOutput(script, "end of main\n")
-
- @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
- @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- def test_5_clear_waiter_locks_to_avoid_crash(self):
- # Check that a spawned thread that forks doesn't segfault on certain
- # platforms, namely OS X. This used to happen if there was a waiter
- # lock in the thread's condition variable's waiters list. Even though
- # we know the lock will be held across the fork, it is not safe to
- # release locks held across forks on all platforms, so releasing the
- # waiter lock caused a segfault on OS X. Furthermore, since locks on
- # OS X are (as of this writing) implemented with a mutex + condition
- # variable instead of a semaphore, while we know that the Python-level
- # lock will be acquired, we can't know if the internal mutex will be
- # acquired at the time of the fork.
-
- script = """if True:
- import os, time, threading
-
- start_fork = False
-
- def worker():
- # Wait until the main thread has attempted to join this thread
- # before continuing.
- while not start_fork:
- time.sleep(0.01)
- childpid = os.fork()
- if childpid != 0:
- # Parent process just waits for child.
- (cpid, rc) = os.waitpid(childpid, 0)
- assert cpid == childpid
- assert rc == 0
- print('end of worker thread')
- else:
- # Child process should just return.
- pass
-
- w = threading.Thread(target=worker)
-
- # Stub out the private condition variable's _release_save method.
- # This releases the condition's lock and flips the global that
- # causes the worker to fork. At this point, the problematic waiter
- # lock has been acquired once by the waiter and has been put onto
- # the waiters list.
- condition = w._block
- orig_release_save = condition._release_save
- def my_release_save():
- global start_fork
- orig_release_save()
- # Waiter lock held here, condition lock released.
- start_fork = True
- condition._release_save = my_release_save
-
- w.start()
- w.join()
- print('end of main thread')
- """
- output = "end of worker thread\nend of main thread\n"
- self.assertScriptHasOutput(script, output)
-
@unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- def test_6_daemon_threads(self):
+ def test_4_daemon_threads(self):
# Check that a daemon thread cannot crash the interpreter on shutdown
# by manipulating internal structures that are being disposed of in
# the main thread.
@@ -867,6 +765,38 @@ class SubinterpThreadingTests(BaseTestCase):
# The thread was joined properly.
self.assertEqual(os.read(r, 1), b"x")
+ def test_threads_join_2(self):
+ # Same as above, but a delay gets introduced after the thread's
+ # Python code returned but before the thread state is deleted.
+ # To achieve this, we register a thread-local object which sleeps
+ # a bit when deallocated.
+ r, w = os.pipe()
+ self.addCleanup(os.close, r)
+ self.addCleanup(os.close, w)
+ code = r"""if 1:
+ import os
+ import threading
+ import time
+
+ class Sleeper:
+ def __del__(self):
+ time.sleep(0.05)
+
+ tls = threading.local()
+
+ def f():
+ # Sleep a bit so that the thread is still running when
+ # Py_EndInterpreter is called.
+ time.sleep(0.05)
+ tls.x = Sleeper()
+ os.write(%d, b"x")
+ threading.Thread(target=f).start()
+ """ % (w,)
+ ret = _testcapi.run_in_subinterp(code)
+ self.assertEqual(ret, 0)
+ # The thread was joined properly.
+ self.assertEqual(os.read(r, 1), b"x")
+
def test_daemon_threads_fatal_error(self):
subinterp_code = r"""if 1:
import os