From 69e3bda310f55816403e4c7fda42ab96d81c31be Mon Sep 17 00:00:00 2001 From: Nick Coghlan Date: Sun, 28 Jul 2013 21:06:50 +1000 Subject: Issue #15494: test.support is now a package rather than a module Also including this change in 3.3 to help avoid spurious conflicts between the two most active branches. (Initial patch by Indra Talip) --- Lib/test/support.py | 1987 ----------------------------------------- Lib/test/support/__init__.py | 1990 ++++++++++++++++++++++++++++++++++++++++++ Lib/test/test_linecache.py | 2 +- Misc/ACKS | 1 + Misc/NEWS | 3 + 5 files changed, 1995 insertions(+), 1988 deletions(-) delete mode 100644 Lib/test/support.py create mode 100644 Lib/test/support/__init__.py diff --git a/Lib/test/support.py b/Lib/test/support.py deleted file mode 100644 index 7ab5ff7..0000000 --- a/Lib/test/support.py +++ /dev/null @@ -1,1987 +0,0 @@ -"""Supporting definitions for the Python regression tests.""" - -if __name__ != 'test.support': - raise ImportError('support must be imported from the test package') - -import contextlib -import errno -import functools -import gc -import socket -import sys -import os -import platform -import shutil -import warnings -import unittest -import importlib -import collections.abc -import re -import subprocess -import imp -import time -import sysconfig -import fnmatch -import logging.handlers -import struct -import tempfile -import _testcapi - -try: - import _thread, threading -except ImportError: - _thread = None - threading = None -try: - import multiprocessing.process -except ImportError: - multiprocessing = None - -try: - import zlib -except ImportError: - zlib = None - -try: - import gzip -except ImportError: - gzip = None - -try: - import bz2 -except ImportError: - bz2 = None - -try: - import lzma -except ImportError: - lzma = None - -__all__ = [ - "Error", "TestFailed", "ResourceDenied", "import_module", "verbose", - "use_resources", "max_memuse", "record_original_stdout", - "get_original_stdout", "unload", "unlink", "rmtree", "forget", - "is_resource_enabled", "requires", "requires_freebsd_version", - "requires_linux_version", "requires_mac_ver", "find_unused_port", - "bind_port", "IPV6_ENABLED", "is_jython", "TESTFN", "HOST", "SAVEDCWD", - "temp_cwd", "findfile", "create_empty_file", "sortdict", - "check_syntax_error", "open_urlresource", "check_warnings", "CleanImport", - "EnvironmentVarGuard", "TransientResource", "captured_stdout", - "captured_stdin", "captured_stderr", "time_out", "socket_peer_reset", - "ioerror_peer_reset", "run_with_locale", 'temp_umask', - "transient_internet", "set_memlimit", "bigmemtest", "bigaddrspacetest", - "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", - "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail", - "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754", - "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink", - "skip_unless_xattr", "import_fresh_module", "requires_zlib", - "PIPE_MAX_SIZE", "failfast", "anticipate_failure", "run_with_tz", - "requires_gzip", "requires_bz2", "requires_lzma", "suppress_crash_popup", - ] - -class Error(Exception): - """Base class for regression test exceptions.""" - -class TestFailed(Error): - """Test failed.""" - -class ResourceDenied(unittest.SkipTest): - """Test skipped because it requested a disallowed resource. - - This is raised when a test calls requires() for a resource that - has not be enabled. It is used to distinguish between expected - and unexpected skips. - """ - -@contextlib.contextmanager -def _ignore_deprecated_imports(ignore=True): - """Context manager to suppress package and module deprecation - warnings when importing them. - - If ignore is False, this context manager has no effect.""" - if ignore: - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", ".+ (module|package)", - DeprecationWarning) - yield - else: - yield - - -def import_module(name, deprecated=False): - """Import and return the module to be tested, raising SkipTest if - it is not available. - - If deprecated is True, any module or package deprecation messages - will be suppressed.""" - with _ignore_deprecated_imports(deprecated): - try: - return importlib.import_module(name) - except ImportError as msg: - raise unittest.SkipTest(str(msg)) - - -def _save_and_remove_module(name, orig_modules): - """Helper function to save and remove a module from sys.modules - - Raise ImportError if the module can't be imported.""" - # try to import the module and raise an error if it can't be imported - if name not in sys.modules: - __import__(name) - del sys.modules[name] - for modname in list(sys.modules): - if modname == name or modname.startswith(name + '.'): - orig_modules[modname] = sys.modules[modname] - del sys.modules[modname] - -def _save_and_block_module(name, orig_modules): - """Helper function to save and block a module in sys.modules - - Return True if the module was in sys.modules, False otherwise.""" - saved = True - try: - orig_modules[name] = sys.modules[name] - except KeyError: - saved = False - sys.modules[name] = None - return saved - - -def anticipate_failure(condition): - """Decorator to mark a test that is known to be broken in some cases - - Any use of this decorator should have a comment identifying the - associated tracker issue. - """ - if condition: - return unittest.expectedFailure - return lambda f: f - - -def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): - """Imports and returns a module, deliberately bypassing the sys.modules cache - and importing a fresh copy of the module. Once the import is complete, - the sys.modules cache is restored to its original state. - - Modules named in fresh are also imported anew if needed by the import. - If one of these modules can't be imported, None is returned. - - Importing of modules named in blocked is prevented while the fresh import - takes place. - - If deprecated is True, any module or package deprecation messages - will be suppressed.""" - # NOTE: test_heapq, test_json and test_warnings include extra sanity checks - # to make sure that this utility function is working as expected - with _ignore_deprecated_imports(deprecated): - # Keep track of modules saved for later restoration as well - # as those which just need a blocking entry removed - orig_modules = {} - names_to_remove = [] - _save_and_remove_module(name, orig_modules) - try: - for fresh_name in fresh: - _save_and_remove_module(fresh_name, orig_modules) - for blocked_name in blocked: - if not _save_and_block_module(blocked_name, orig_modules): - names_to_remove.append(blocked_name) - fresh_module = importlib.import_module(name) - except ImportError: - fresh_module = None - finally: - for orig_name, module in orig_modules.items(): - sys.modules[orig_name] = module - for name_to_remove in names_to_remove: - del sys.modules[name_to_remove] - return fresh_module - - -def get_attribute(obj, name): - """Get an attribute, raising SkipTest if AttributeError is raised.""" - try: - attribute = getattr(obj, name) - except AttributeError: - raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) - else: - return attribute - -verbose = 1 # Flag set to 0 by regrtest.py -use_resources = None # Flag set to [] by regrtest.py -max_memuse = 0 # Disable bigmem tests (they will still be run with - # small sizes, to make sure they work.) -real_max_memuse = 0 -failfast = False -match_tests = None - -# _original_stdout is meant to hold stdout at the time regrtest began. -# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. -# The point is to have some flavor of stdout the user can actually see. -_original_stdout = None -def record_original_stdout(stdout): - global _original_stdout - _original_stdout = stdout - -def get_original_stdout(): - return _original_stdout or sys.stdout - -def unload(name): - try: - del sys.modules[name] - except KeyError: - pass - -if sys.platform.startswith("win"): - def _waitfor(func, pathname, waitall=False): - # Peform the operation - func(pathname) - # Now setup the wait loop - if waitall: - dirname = pathname - else: - dirname, name = os.path.split(pathname) - dirname = dirname or '.' - # Check for `pathname` to be removed from the filesystem. - # The exponential backoff of the timeout amounts to a total - # of ~1 second after which the deletion is probably an error - # anyway. - # Testing on a i7@4.3GHz shows that usually only 1 iteration is - # required when contention occurs. - timeout = 0.001 - while timeout < 1.0: - # Note we are only testing for the existance of the file(s) in - # the contents of the directory regardless of any security or - # access rights. If we have made it this far, we have sufficient - # permissions to do that much using Python's equivalent of the - # Windows API FindFirstFile. - # Other Windows APIs can fail or give incorrect results when - # dealing with files that are pending deletion. - L = os.listdir(dirname) - if not (L if waitall else name in L): - return - # Increase the timeout and try again - time.sleep(timeout) - timeout *= 2 - warnings.warn('tests may fail, delete still pending for ' + pathname, - RuntimeWarning, stacklevel=4) - - def _unlink(filename): - _waitfor(os.unlink, filename) - - def _rmdir(dirname): - _waitfor(os.rmdir, dirname) - - def _rmtree(path): - def _rmtree_inner(path): - for name in os.listdir(path): - fullname = os.path.join(path, name) - if os.path.isdir(fullname): - _waitfor(_rmtree_inner, fullname, waitall=True) - os.rmdir(fullname) - else: - os.unlink(fullname) - _waitfor(_rmtree_inner, path, waitall=True) - _waitfor(os.rmdir, path) -else: - _unlink = os.unlink - _rmdir = os.rmdir - _rmtree = shutil.rmtree - -def unlink(filename): - try: - _unlink(filename) - except OSError as error: - # The filename need not exist. - if error.errno not in (errno.ENOENT, errno.ENOTDIR): - raise - -def rmdir(dirname): - try: - _rmdir(dirname) - except OSError as error: - # The directory need not exist. - if error.errno != errno.ENOENT: - raise - -def rmtree(path): - try: - _rmtree(path) - except OSError as error: - if error.errno != errno.ENOENT: - raise - -def make_legacy_pyc(source): - """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location. - - The choice of .pyc or .pyo extension is done based on the __debug__ flag - value. - - :param source: The file system path to the source file. The source file - does not need to exist, however the PEP 3147 pyc file must exist. - :return: The file system path to the legacy pyc file. - """ - pyc_file = imp.cache_from_source(source) - up_one = os.path.dirname(os.path.abspath(source)) - legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o')) - os.rename(pyc_file, legacy_pyc) - return legacy_pyc - -def forget(modname): - """'Forget' a module was ever imported. - - This removes the module from sys.modules and deletes any PEP 3147 or - legacy .pyc and .pyo files. - """ - unload(modname) - for dirname in sys.path: - source = os.path.join(dirname, modname + '.py') - # It doesn't matter if they exist or not, unlink all possible - # combinations of PEP 3147 and legacy pyc and pyo files. - unlink(source + 'c') - unlink(source + 'o') - unlink(imp.cache_from_source(source, debug_override=True)) - unlink(imp.cache_from_source(source, debug_override=False)) - -# On some platforms, should not run gui test even if it is allowed -# in `use_resources'. -if sys.platform.startswith('win'): - import ctypes - import ctypes.wintypes - def _is_gui_available(): - UOI_FLAGS = 1 - WSF_VISIBLE = 0x0001 - class USEROBJECTFLAGS(ctypes.Structure): - _fields_ = [("fInherit", ctypes.wintypes.BOOL), - ("fReserved", ctypes.wintypes.BOOL), - ("dwFlags", ctypes.wintypes.DWORD)] - dll = ctypes.windll.user32 - h = dll.GetProcessWindowStation() - if not h: - raise ctypes.WinError() - uof = USEROBJECTFLAGS() - needed = ctypes.wintypes.DWORD() - res = dll.GetUserObjectInformationW(h, - UOI_FLAGS, - ctypes.byref(uof), - ctypes.sizeof(uof), - ctypes.byref(needed)) - if not res: - raise ctypes.WinError() - return bool(uof.dwFlags & WSF_VISIBLE) -else: - def _is_gui_available(): - return True - -def is_resource_enabled(resource): - """Test whether a resource is enabled. Known resources are set by - regrtest.py.""" - return use_resources is not None and resource in use_resources - -def requires(resource, msg=None): - """Raise ResourceDenied if the specified resource is not available. - - If the caller's module is __main__ then automatically return True. The - possibility of False being returned occurs when regrtest.py is - executing. - """ - if resource == 'gui' and not _is_gui_available(): - raise unittest.SkipTest("Cannot use the 'gui' resource") - # see if the caller's module is __main__ - if so, treat as if - # the resource was set - if sys._getframe(1).f_globals.get("__name__") == "__main__": - return - if not is_resource_enabled(resource): - if msg is None: - msg = "Use of the %r resource not enabled" % resource - raise ResourceDenied(msg) - -def _requires_unix_version(sysname, min_version): - """Decorator raising SkipTest if the OS is `sysname` and the version is less - than `min_version`. - - For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if - the FreeBSD version is less than 7.2. - """ - def decorator(func): - @functools.wraps(func) - def wrapper(*args, **kw): - if platform.system() == sysname: - version_txt = platform.release().split('-', 1)[0] - try: - version = tuple(map(int, version_txt.split('.'))) - except ValueError: - pass - else: - if version < min_version: - min_version_txt = '.'.join(map(str, min_version)) - raise unittest.SkipTest( - "%s version %s or higher required, not %s" - % (sysname, min_version_txt, version_txt)) - return wrapper - return decorator - -def requires_freebsd_version(*min_version): - """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is - less than `min_version`. - - For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD - version is less than 7.2. - """ - return _requires_unix_version('FreeBSD', min_version) - -def requires_linux_version(*min_version): - """Decorator raising SkipTest if the OS is Linux and the Linux version is - less than `min_version`. - - For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux - version is less than 2.6.32. - """ - return _requires_unix_version('Linux', min_version) - -def requires_mac_ver(*min_version): - """Decorator raising SkipTest if the OS is Mac OS X and the OS X - version if less than min_version. - - For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version - is lesser than 10.5. - """ - def decorator(func): - @functools.wraps(func) - def wrapper(*args, **kw): - if sys.platform == 'darwin': - version_txt = platform.mac_ver()[0] - try: - version = tuple(map(int, version_txt.split('.'))) - except ValueError: - pass - else: - if version < min_version: - min_version_txt = '.'.join(map(str, min_version)) - raise unittest.SkipTest( - "Mac OS X %s or higher required, not %s" - % (min_version_txt, version_txt)) - return func(*args, **kw) - wrapper.min_version = min_version - return wrapper - return decorator - - -HOST = 'localhost' - -def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): - """Returns an unused port that should be suitable for binding. This is - achieved by creating a temporary socket with the same family and type as - the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to - the specified host address (defaults to 0.0.0.0) with the port set to 0, - eliciting an unused ephemeral port from the OS. The temporary socket is - then closed and deleted, and the ephemeral port is returned. - - Either this method or bind_port() should be used for any tests where a - server socket needs to be bound to a particular port for the duration of - the test. Which one to use depends on whether the calling code is creating - a python socket, or if an unused port needs to be provided in a constructor - or passed to an external program (i.e. the -accept argument to openssl's - s_server mode). Always prefer bind_port() over find_unused_port() where - possible. Hard coded ports should *NEVER* be used. As soon as a server - socket is bound to a hard coded port, the ability to run multiple instances - of the test simultaneously on the same host is compromised, which makes the - test a ticking time bomb in a buildbot environment. On Unix buildbots, this - may simply manifest as a failed test, which can be recovered from without - intervention in most cases, but on Windows, the entire python process can - completely and utterly wedge, requiring someone to log in to the buildbot - and manually kill the affected process. - - (This is easy to reproduce on Windows, unfortunately, and can be traced to - the SO_REUSEADDR socket option having different semantics on Windows versus - Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, - listen and then accept connections on identical host/ports. An EADDRINUSE - socket.error will be raised at some point (depending on the platform and - the order bind and listen were called on each socket). - - However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE - will ever be raised when attempting to bind two identical host/ports. When - accept() is called on each socket, the second caller's process will steal - the port from the first caller, leaving them both in an awkwardly wedged - state where they'll no longer respond to any signals or graceful kills, and - must be forcibly killed via OpenProcess()/TerminateProcess(). - - The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option - instead of SO_REUSEADDR, which effectively affords the same semantics as - SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open - Source world compared to Windows ones, this is a common mistake. A quick - look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when - openssl.exe is called with the 's_server' option, for example. See - http://bugs.python.org/issue2550 for more info. The following site also - has a very thorough description about the implications of both REUSEADDR - and EXCLUSIVEADDRUSE on Windows: - http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) - - XXX: although this approach is a vast improvement on previous attempts to - elicit unused ports, it rests heavily on the assumption that the ephemeral - port returned to us by the OS won't immediately be dished back out to some - other process when we close and delete our temporary socket but before our - calling code has a chance to bind the returned port. We can deal with this - issue if/when we come across it. - """ - - tempsock = socket.socket(family, socktype) - port = bind_port(tempsock) - tempsock.close() - del tempsock - return port - -def bind_port(sock, host=HOST): - """Bind the socket to a free port and return the port number. Relies on - ephemeral ports in order to ensure we are using an unbound port. This is - important as many tests may be running simultaneously, especially in a - buildbot environment. This method raises an exception if the sock.family - is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR - or SO_REUSEPORT set on it. Tests should *never* set these socket options - for TCP/IP sockets. The only case for setting these options is testing - multicasting via multiple UDP sockets. - - Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. - on Windows), it will be set on the socket. This will prevent anyone else - from bind()'ing to our host/port for the duration of the test. - """ - - if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: - if hasattr(socket, 'SO_REUSEADDR'): - if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: - raise TestFailed("tests should never set the SO_REUSEADDR " \ - "socket option on TCP/IP sockets!") - if hasattr(socket, 'SO_REUSEPORT'): - if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: - raise TestFailed("tests should never set the SO_REUSEPORT " \ - "socket option on TCP/IP sockets!") - if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): - sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) - - sock.bind((host, 0)) - port = sock.getsockname()[1] - return port - -def _is_ipv6_enabled(): - """Check whether IPv6 is enabled on this host.""" - if socket.has_ipv6: - sock = None - try: - sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - sock.bind(('::1', 0)) - return True - except (socket.error, socket.gaierror): - pass - finally: - if sock: - sock.close() - return False - -IPV6_ENABLED = _is_ipv6_enabled() - - -# A constant likely larger than the underlying OS pipe buffer size, to -# make writes blocking. -# Windows limit seems to be around 512 B, and many Unix kernels have a -# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. -# (see issue #17835 for a discussion of this number). -PIPE_MAX_SIZE = 4 *1024 * 1024 + 1 - - -# decorator for skipping tests on non-IEEE 754 platforms -requires_IEEE_754 = unittest.skipUnless( - float.__getformat__("double").startswith("IEEE"), - "test requires IEEE 754 doubles") - -requires_zlib = unittest.skipUnless(zlib, 'requires zlib') - -requires_gzip = unittest.skipUnless(gzip, 'requires gzip') - -requires_bz2 = unittest.skipUnless(bz2, 'requires bz2') - -requires_lzma = unittest.skipUnless(lzma, 'requires lzma') - -is_jython = sys.platform.startswith('java') - -# Filename used for testing -if os.name == 'java': - # Jython disallows @ in module names - TESTFN = '$test' -else: - TESTFN = '@test' - -# Disambiguate TESTFN for parallel testing, while letting it remain a valid -# module name. -TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) - -# FS_NONASCII: non-ASCII character encodable by os.fsencode(), -# or None if there is no such character. -FS_NONASCII = None -for character in ( - # First try printable and common characters to have a readable filename. - # For each character, the encoding list are just example of encodings able - # to encode the character (the list is not exhaustive). - - # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 - '\u00E6', - # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 - '\u0130', - # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 - '\u0141', - # U+03C6 (Greek Small Letter Phi): cp1253 - '\u03C6', - # U+041A (Cyrillic Capital Letter Ka): cp1251 - '\u041A', - # U+05D0 (Hebrew Letter Alef): Encodable to cp424 - '\u05D0', - # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic - '\u060C', - # U+062A (Arabic Letter Teh): cp720 - '\u062A', - # U+0E01 (Thai Character Ko Kai): cp874 - '\u0E01', - - # Then try more "special" characters. "special" because they may be - # interpreted or displayed differently depending on the exact locale - # encoding and the font. - - # U+00A0 (No-Break Space) - '\u00A0', - # U+20AC (Euro Sign) - '\u20AC', -): - try: - os.fsdecode(os.fsencode(character)) - except UnicodeError: - pass - else: - FS_NONASCII = character - break - -# TESTFN_UNICODE is a non-ascii filename -TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f" -if sys.platform == 'darwin': - # In Mac OS X's VFS API file names are, by definition, canonically - # decomposed Unicode, encoded using UTF-8. See QA1173: - # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html - import unicodedata - TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) -TESTFN_ENCODING = sys.getfilesystemencoding() - -# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be -# encoded by the filesystem encoding (in strict mode). It can be None if we -# cannot generate such filename. -TESTFN_UNENCODABLE = None -if os.name in ('nt', 'ce'): - # skip win32s (0) or Windows 9x/ME (1) - if sys.getwindowsversion().platform >= 2: - # Different kinds of characters from various languages to minimize the - # probability that the whole name is encodable to MBCS (issue #9819) - TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80" - try: - TESTFN_UNENCODABLE.encode(TESTFN_ENCODING) - except UnicodeEncodeError: - pass - else: - print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). ' - 'Unicode filename tests may not be effective' - % (TESTFN_UNENCODABLE, TESTFN_ENCODING)) - TESTFN_UNENCODABLE = None -# Mac OS X denies unencodable filenames (invalid utf-8) -elif sys.platform != 'darwin': - try: - # ascii and utf-8 cannot encode the byte 0xff - b'\xff'.decode(TESTFN_ENCODING) - except UnicodeDecodeError: - # 0xff will be encoded using the surrogate character u+DCFF - TESTFN_UNENCODABLE = TESTFN \ - + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape') - else: - # File system encoding (eg. ISO-8859-* encodings) can encode - # the byte 0xff. Skip some unicode filename tests. - pass - -# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be -# decoded from the filesystem encoding (in strict mode). It can be None if we -# cannot generate such filename (ex: the latin1 encoding can decode any byte -# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks -# to the surrogateescape error handler (PEP 383), but not from the filesystem -# encoding in strict mode. -TESTFN_UNDECODABLE = None -for name in ( - # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows - # accepts it to create a file or a directory, or don't accept to enter to - # such directory (when the bytes name is used). So test b'\xe7' first: it is - # not decodable from cp932. - b'\xe7w\xf0', - # undecodable from ASCII, UTF-8 - b'\xff', - # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 - # and cp857 - b'\xae\xd5' - # undecodable from UTF-8 (UNIX and Mac OS X) - b'\xed\xb2\x80', b'\xed\xb4\x80', - # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, - # cp1253, cp1254, cp1255, cp1257, cp1258 - b'\x81\x98', -): - try: - name.decode(TESTFN_ENCODING) - except UnicodeDecodeError: - TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name - break - -if FS_NONASCII: - TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII -else: - TESTFN_NONASCII = None - -# Save the initial cwd -SAVEDCWD = os.getcwd() - -@contextlib.contextmanager -def temp_cwd(name='tempcwd', quiet=False, path=None): - """ - Context manager that temporarily changes the CWD. - - An existing path may be provided as *path*, in which case this - function makes no changes to the file system. - - Otherwise, the new CWD is created in the current directory and it's - named *name*. If *quiet* is False (default) and it's not possible to - create or change the CWD, an error is raised. If it's True, only a - warning is raised and the original CWD is used. - """ - saved_dir = os.getcwd() - is_temporary = False - if path is None: - path = name - try: - os.mkdir(name) - is_temporary = True - except OSError: - if not quiet: - raise - warnings.warn('tests may fail, unable to create temp CWD ' + name, - RuntimeWarning, stacklevel=3) - try: - os.chdir(path) - except OSError: - if not quiet: - raise - warnings.warn('tests may fail, unable to change the CWD to ' + path, - RuntimeWarning, stacklevel=3) - try: - yield os.getcwd() - finally: - os.chdir(saved_dir) - if is_temporary: - rmtree(name) - - -if hasattr(os, "umask"): - @contextlib.contextmanager - def temp_umask(umask): - """Context manager that temporarily sets the process umask.""" - oldmask = os.umask(umask) - try: - yield - finally: - os.umask(oldmask) - - -def findfile(file, here=__file__, subdir=None): - """Try to find a file on sys.path and the working directory. If it is not - found the argument passed to the function is returned (this does not - necessarily signal failure; could still be the legitimate path).""" - if os.path.isabs(file): - return file - if subdir is not None: - file = os.path.join(subdir, file) - path = sys.path - path = [os.path.dirname(here)] + path - for dn in path: - fn = os.path.join(dn, file) - if os.path.exists(fn): return fn - return file - -def create_empty_file(filename): - """Create an empty file. If the file already exists, truncate it.""" - fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) - os.close(fd) - -def sortdict(dict): - "Like repr(dict), but in sorted order." - items = sorted(dict.items()) - reprpairs = ["%r: %r" % pair for pair in items] - withcommas = ", ".join(reprpairs) - return "{%s}" % withcommas - -def make_bad_fd(): - """ - Create an invalid file descriptor by opening and closing a file and return - its fd. - """ - file = open(TESTFN, "wb") - try: - return file.fileno() - finally: - file.close() - unlink(TESTFN) - -def check_syntax_error(testcase, statement): - testcase.assertRaises(SyntaxError, compile, statement, - '', 'exec') - -def open_urlresource(url, *args, **kw): - import urllib.request, urllib.parse - - check = kw.pop('check', None) - - filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! - - fn = os.path.join(os.path.dirname(__file__), "data", filename) - - def check_valid_file(fn): - f = open(fn, *args, **kw) - if check is None: - return f - elif check(f): - f.seek(0) - return f - f.close() - - if os.path.exists(fn): - f = check_valid_file(fn) - if f is not None: - return f - unlink(fn) - - # Verify the requirement before downloading the file - requires('urlfetch') - - print('\tfetching %s ...' % url, file=get_original_stdout()) - f = urllib.request.urlopen(url, timeout=15) - try: - with open(fn, "wb") as out: - s = f.read() - while s: - out.write(s) - s = f.read() - finally: - f.close() - - f = check_valid_file(fn) - if f is not None: - return f - raise TestFailed('invalid resource %r' % fn) - - -class WarningsRecorder(object): - """Convenience wrapper for the warnings list returned on - entry to the warnings.catch_warnings() context manager. - """ - def __init__(self, warnings_list): - self._warnings = warnings_list - self._last = 0 - - def __getattr__(self, attr): - if len(self._warnings) > self._last: - return getattr(self._warnings[-1], attr) - elif attr in warnings.WarningMessage._WARNING_DETAILS: - return None - raise AttributeError("%r has no attribute %r" % (self, attr)) - - @property - def warnings(self): - return self._warnings[self._last:] - - def reset(self): - self._last = len(self._warnings) - - -def _filterwarnings(filters, quiet=False): - """Catch the warnings, then check if all the expected - warnings have been raised and re-raise unexpected warnings. - If 'quiet' is True, only re-raise the unexpected warnings. - """ - # Clear the warning registry of the calling module - # in order to re-raise the warnings. - frame = sys._getframe(2) - registry = frame.f_globals.get('__warningregistry__') - if registry: - registry.clear() - with warnings.catch_warnings(record=True) as w: - # Set filter "always" to record all warnings. Because - # test_warnings swap the module, we need to look up in - # the sys.modules dictionary. - sys.modules['warnings'].simplefilter("always") - yield WarningsRecorder(w) - # Filter the recorded warnings - reraise = list(w) - missing = [] - for msg, cat in filters: - seen = False - for w in reraise[:]: - warning = w.message - # Filter out the matching messages - if (re.match(msg, str(warning), re.I) and - issubclass(warning.__class__, cat)): - seen = True - reraise.remove(w) - if not seen and not quiet: - # This filter caught nothing - missing.append((msg, cat.__name__)) - if reraise: - raise AssertionError("unhandled warning %s" % reraise[0]) - if missing: - raise AssertionError("filter (%r, %s) did not catch any warning" % - missing[0]) - - -@contextlib.contextmanager -def check_warnings(*filters, **kwargs): - """Context manager to silence warnings. - - Accept 2-tuples as positional arguments: - ("message regexp", WarningCategory) - - Optional argument: - - if 'quiet' is True, it does not fail if a filter catches nothing - (default True without argument, - default False if some filters are defined) - - Without argument, it defaults to: - check_warnings(("", Warning), quiet=True) - """ - quiet = kwargs.get('quiet') - if not filters: - filters = (("", Warning),) - # Preserve backward compatibility - if quiet is None: - quiet = True - return _filterwarnings(filters, quiet) - - -class CleanImport(object): - """Context manager to force import to return a new module reference. - - This is useful for testing module-level behaviours, such as - the emission of a DeprecationWarning on import. - - Use like this: - - with CleanImport("foo"): - importlib.import_module("foo") # new reference - """ - - def __init__(self, *module_names): - self.original_modules = sys.modules.copy() - for module_name in module_names: - if module_name in sys.modules: - module = sys.modules[module_name] - # It is possible that module_name is just an alias for - # another module (e.g. stub for modules renamed in 3.x). - # In that case, we also need delete the real module to clear - # the import cache. - if module.__name__ != module_name: - del sys.modules[module.__name__] - del sys.modules[module_name] - - def __enter__(self): - return self - - def __exit__(self, *ignore_exc): - sys.modules.update(self.original_modules) - - -class EnvironmentVarGuard(collections.abc.MutableMapping): - - """Class to help protect the environment variable properly. Can be used as - a context manager.""" - - def __init__(self): - self._environ = os.environ - self._changed = {} - - def __getitem__(self, envvar): - return self._environ[envvar] - - def __setitem__(self, envvar, value): - # Remember the initial value on the first access - if envvar not in self._changed: - self._changed[envvar] = self._environ.get(envvar) - self._environ[envvar] = value - - def __delitem__(self, envvar): - # Remember the initial value on the first access - if envvar not in self._changed: - self._changed[envvar] = self._environ.get(envvar) - if envvar in self._environ: - del self._environ[envvar] - - def keys(self): - return self._environ.keys() - - def __iter__(self): - return iter(self._environ) - - def __len__(self): - return len(self._environ) - - def set(self, envvar, value): - self[envvar] = value - - def unset(self, envvar): - del self[envvar] - - def __enter__(self): - return self - - def __exit__(self, *ignore_exc): - for (k, v) in self._changed.items(): - if v is None: - if k in self._environ: - del self._environ[k] - else: - self._environ[k] = v - os.environ = self._environ - - -class DirsOnSysPath(object): - """Context manager to temporarily add directories to sys.path. - - This makes a copy of sys.path, appends any directories given - as positional arguments, then reverts sys.path to the copied - settings when the context ends. - - Note that *all* sys.path modifications in the body of the - context manager, including replacement of the object, - will be reverted at the end of the block. - """ - - def __init__(self, *paths): - self.original_value = sys.path[:] - self.original_object = sys.path - sys.path.extend(paths) - - def __enter__(self): - return self - - def __exit__(self, *ignore_exc): - sys.path = self.original_object - sys.path[:] = self.original_value - - -class TransientResource(object): - - """Raise ResourceDenied if an exception is raised while the context manager - is in effect that matches the specified exception and attributes.""" - - def __init__(self, exc, **kwargs): - self.exc = exc - self.attrs = kwargs - - def __enter__(self): - return self - - def __exit__(self, type_=None, value=None, traceback=None): - """If type_ is a subclass of self.exc and value has attributes matching - self.attrs, raise ResourceDenied. Otherwise let the exception - propagate (if any).""" - if type_ is not None and issubclass(self.exc, type_): - for attr, attr_value in self.attrs.items(): - if not hasattr(value, attr): - break - if getattr(value, attr) != attr_value: - break - else: - raise ResourceDenied("an optional resource is not available") - -# Context managers that raise ResourceDenied when various issues -# with the Internet connection manifest themselves as exceptions. -# XXX deprecate these and use transient_internet() instead -time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) -socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) -ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) - - -@contextlib.contextmanager -def transient_internet(resource_name, *, timeout=30.0, errnos=()): - """Return a context manager that raises ResourceDenied when various issues - with the Internet connection manifest themselves as exceptions.""" - default_errnos = [ - ('ECONNREFUSED', 111), - ('ECONNRESET', 104), - ('EHOSTUNREACH', 113), - ('ENETUNREACH', 101), - ('ETIMEDOUT', 110), - ] - default_gai_errnos = [ - ('EAI_AGAIN', -3), - ('EAI_FAIL', -4), - ('EAI_NONAME', -2), - ('EAI_NODATA', -5), - # Encountered when trying to resolve IPv6-only hostnames - ('WSANO_DATA', 11004), - ] - - denied = ResourceDenied("Resource %r is not available" % resource_name) - captured_errnos = errnos - gai_errnos = [] - if not captured_errnos: - captured_errnos = [getattr(errno, name, num) - for (name, num) in default_errnos] - gai_errnos = [getattr(socket, name, num) - for (name, num) in default_gai_errnos] - - def filter_error(err): - n = getattr(err, 'errno', None) - if (isinstance(err, socket.timeout) or - (isinstance(err, socket.gaierror) and n in gai_errnos) or - n in captured_errnos): - if not verbose: - sys.stderr.write(denied.args[0] + "\n") - raise denied from err - - old_timeout = socket.getdefaulttimeout() - try: - if timeout is not None: - socket.setdefaulttimeout(timeout) - yield - except IOError as err: - # urllib can wrap original socket errors multiple times (!), we must - # unwrap to get at the original error. - while True: - a = err.args - if len(a) >= 1 and isinstance(a[0], IOError): - err = a[0] - # The error can also be wrapped as args[1]: - # except socket.error as msg: - # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2]) - elif len(a) >= 2 and isinstance(a[1], IOError): - err = a[1] - else: - break - filter_error(err) - raise - # XXX should we catch generic exceptions and look for their - # __cause__ or __context__? - finally: - socket.setdefaulttimeout(old_timeout) - - -@contextlib.contextmanager -def captured_output(stream_name): - """Return a context manager used by captured_stdout/stdin/stderr - that temporarily replaces the sys stream *stream_name* with a StringIO.""" - import io - orig_stdout = getattr(sys, stream_name) - setattr(sys, stream_name, io.StringIO()) - try: - yield getattr(sys, stream_name) - finally: - setattr(sys, stream_name, orig_stdout) - -def captured_stdout(): - """Capture the output of sys.stdout: - - with captured_stdout() as stdout: - print("hello") - self.assertEqual(stdout.getvalue(), "hello\n") - """ - return captured_output("stdout") - -def captured_stderr(): - """Capture the output of sys.stderr: - - with captured_stderr() as stderr: - print("hello", file=sys.stderr) - self.assertEqual(stderr.getvalue(), "hello\n") - """ - return captured_output("stderr") - -def captured_stdin(): - """Capture the input to sys.stdin: - - with captured_stdin() as stdin: - stdin.write('hello\n') - stdin.seek(0) - # call test code that consumes from sys.stdin - captured = input() - self.assertEqual(captured, "hello") - """ - return captured_output("stdin") - - -def gc_collect(): - """Force as many objects as possible to be collected. - - In non-CPython implementations of Python, this is needed because timely - deallocation is not guaranteed by the garbage collector. (Even in CPython - this can be the case in case of reference cycles.) This means that __del__ - methods may be called later than expected and weakrefs may remain alive for - longer than expected. This function tries its best to force all garbage - objects to disappear. - """ - gc.collect() - if is_jython: - time.sleep(0.1) - gc.collect() - gc.collect() - -@contextlib.contextmanager -def disable_gc(): - have_gc = gc.isenabled() - gc.disable() - try: - yield - finally: - if have_gc: - gc.enable() - - -def python_is_optimized(): - """Find if Python was built with optimizations.""" - cflags = sysconfig.get_config_var('PY_CFLAGS') or '' - final_opt = "" - for opt in cflags.split(): - if opt.startswith('-O'): - final_opt = opt - return final_opt != '' and final_opt != '-O0' - - -_header = 'nP' -_align = '0n' -if hasattr(sys, "gettotalrefcount"): - _header = '2P' + _header - _align = '0P' -_vheader = _header + 'n' - -def calcobjsize(fmt): - return struct.calcsize(_header + fmt + _align) - -def calcvobjsize(fmt): - return struct.calcsize(_vheader + fmt + _align) - - -_TPFLAGS_HAVE_GC = 1<<14 -_TPFLAGS_HEAPTYPE = 1<<9 - -def check_sizeof(test, o, size): - result = sys.getsizeof(o) - # add GC header size - if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ - ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): - size += _testcapi.SIZEOF_PYGC_HEAD - msg = 'wrong size for %s: got %d, expected %d' \ - % (type(o), result, size) - test.assertEqual(result, size, msg) - -#======================================================================= -# Decorator for running a function in a different locale, correctly resetting -# it afterwards. - -def run_with_locale(catstr, *locales): - def decorator(func): - def inner(*args, **kwds): - try: - import locale - category = getattr(locale, catstr) - orig_locale = locale.setlocale(category) - except AttributeError: - # if the test author gives us an invalid category string - raise - except: - # cannot retrieve original locale, so do nothing - locale = orig_locale = None - else: - for loc in locales: - try: - locale.setlocale(category, loc) - break - except: - pass - - # now run the function, resetting the locale on exceptions - try: - return func(*args, **kwds) - finally: - if locale and orig_locale: - locale.setlocale(category, orig_locale) - inner.__name__ = func.__name__ - inner.__doc__ = func.__doc__ - return inner - return decorator - -#======================================================================= -# Decorator for running a function in a specific timezone, correctly -# resetting it afterwards. - -def run_with_tz(tz): - def decorator(func): - def inner(*args, **kwds): - try: - tzset = time.tzset - except AttributeError: - raise unittest.SkipTest("tzset required") - if 'TZ' in os.environ: - orig_tz = os.environ['TZ'] - else: - orig_tz = None - os.environ['TZ'] = tz - tzset() - - # now run the function, resetting the tz on exceptions - try: - return func(*args, **kwds) - finally: - if orig_tz is None: - del os.environ['TZ'] - else: - os.environ['TZ'] = orig_tz - time.tzset() - - inner.__name__ = func.__name__ - inner.__doc__ = func.__doc__ - return inner - return decorator - -#======================================================================= -# Big-memory-test support. Separate from 'resources' because memory use -# should be configurable. - -# Some handy shorthands. Note that these are used for byte-limits as well -# as size-limits, in the various bigmem tests -_1M = 1024*1024 -_1G = 1024 * _1M -_2G = 2 * _1G -_4G = 4 * _1G - -MAX_Py_ssize_t = sys.maxsize - -def set_memlimit(limit): - global max_memuse - global real_max_memuse - sizes = { - 'k': 1024, - 'm': _1M, - 'g': _1G, - 't': 1024*_1G, - } - m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, - re.IGNORECASE | re.VERBOSE) - if m is None: - raise ValueError('Invalid memory limit %r' % (limit,)) - memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) - real_max_memuse = memlimit - if memlimit > MAX_Py_ssize_t: - memlimit = MAX_Py_ssize_t - if memlimit < _2G - 1: - raise ValueError('Memory limit %r too low to be useful' % (limit,)) - max_memuse = memlimit - -class _MemoryWatchdog: - """An object which periodically watches the process' memory consumption - and prints it out. - """ - - def __init__(self): - self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) - self.started = False - - def start(self): - try: - f = open(self.procfile, 'r') - except OSError as e: - warnings.warn('/proc not available for stats: {}'.format(e), - RuntimeWarning) - sys.stderr.flush() - return - - watchdog_script = findfile("memory_watchdog.py") - self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], - stdin=f, stderr=subprocess.DEVNULL) - f.close() - self.started = True - - def stop(self): - if self.started: - self.mem_watchdog.terminate() - self.mem_watchdog.wait() - - -def bigmemtest(size, memuse, dry_run=True): - """Decorator for bigmem tests. - - 'minsize' is the minimum useful size for the test (in arbitrary, - test-interpreted units.) 'memuse' is the number of 'bytes per size' for - the test, or a good estimate of it. - - if 'dry_run' is False, it means the test doesn't support dummy runs - when -M is not specified. - """ - def decorator(f): - def wrapper(self): - size = wrapper.size - memuse = wrapper.memuse - if not real_max_memuse: - maxsize = 5147 - else: - maxsize = size - - if ((real_max_memuse or not dry_run) - and real_max_memuse < maxsize * memuse): - raise unittest.SkipTest( - "not enough memory: %.1fG minimum needed" - % (size * memuse / (1024 ** 3))) - - if real_max_memuse and verbose: - print() - print(" ... expected peak memory use: {peak:.1f}G" - .format(peak=size * memuse / (1024 ** 3))) - watchdog = _MemoryWatchdog() - watchdog.start() - else: - watchdog = None - - try: - return f(self, maxsize) - finally: - if watchdog: - watchdog.stop() - - wrapper.size = size - wrapper.memuse = memuse - return wrapper - return decorator - -def bigaddrspacetest(f): - """Decorator for tests that fill the address space.""" - def wrapper(self): - if max_memuse < MAX_Py_ssize_t: - if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: - raise unittest.SkipTest( - "not enough memory: try a 32-bit build instead") - else: - raise unittest.SkipTest( - "not enough memory: %.1fG minimum needed" - % (MAX_Py_ssize_t / (1024 ** 3))) - else: - return f(self) - return wrapper - -#======================================================================= -# unittest integration. - -class BasicTestRunner: - def run(self, test): - result = unittest.TestResult() - test(result) - return result - -def _id(obj): - return obj - -def requires_resource(resource): - if resource == 'gui' and not _is_gui_available(): - return unittest.skip("resource 'gui' is not available") - if is_resource_enabled(resource): - return _id - else: - return unittest.skip("resource {0!r} is not enabled".format(resource)) - -def cpython_only(test): - """ - Decorator for tests only applicable on CPython. - """ - return impl_detail(cpython=True)(test) - -def impl_detail(msg=None, **guards): - if check_impl_detail(**guards): - return _id - if msg is None: - guardnames, default = _parse_guards(guards) - if default: - msg = "implementation detail not available on {0}" - else: - msg = "implementation detail specific to {0}" - guardnames = sorted(guardnames.keys()) - msg = msg.format(' or '.join(guardnames)) - return unittest.skip(msg) - -def _parse_guards(guards): - # Returns a tuple ({platform_name: run_me}, default_value) - if not guards: - return ({'cpython': True}, False) - is_true = list(guards.values())[0] - assert list(guards.values()) == [is_true] * len(guards) # all True or all False - return (guards, not is_true) - -# Use the following check to guard CPython's implementation-specific tests -- -# or to run them only on the implementation(s) guarded by the arguments. -def check_impl_detail(**guards): - """This function returns True or False depending on the host platform. - Examples: - if check_impl_detail(): # only on CPython (default) - if check_impl_detail(jython=True): # only on Jython - if check_impl_detail(cpython=False): # everywhere except on CPython - """ - guards, default = _parse_guards(guards) - return guards.get(platform.python_implementation().lower(), default) - - -def no_tracing(func): - """Decorator to temporarily turn off tracing for the duration of a test.""" - if not hasattr(sys, 'gettrace'): - return func - else: - @functools.wraps(func) - def wrapper(*args, **kwargs): - original_trace = sys.gettrace() - try: - sys.settrace(None) - return func(*args, **kwargs) - finally: - sys.settrace(original_trace) - return wrapper - - -def refcount_test(test): - """Decorator for tests which involve reference counting. - - To start, the decorator does not run the test if is not run by CPython. - After that, any trace function is unset during the test to prevent - unexpected refcounts caused by the trace function. - - """ - return no_tracing(cpython_only(test)) - - -def _filter_suite(suite, pred): - """Recursively filter test cases in a suite based on a predicate.""" - newtests = [] - for test in suite._tests: - if isinstance(test, unittest.TestSuite): - _filter_suite(test, pred) - newtests.append(test) - else: - if pred(test): - newtests.append(test) - suite._tests = newtests - -def _run_suite(suite): - """Run tests from a unittest.TestSuite-derived class.""" - if verbose: - runner = unittest.TextTestRunner(sys.stdout, verbosity=2, - failfast=failfast) - else: - runner = BasicTestRunner() - - result = runner.run(suite) - if not result.wasSuccessful(): - if len(result.errors) == 1 and not result.failures: - err = result.errors[0][1] - elif len(result.failures) == 1 and not result.errors: - err = result.failures[0][1] - else: - err = "multiple errors occurred" - if not verbose: err += "; run in verbose mode for details" - raise TestFailed(err) - - -def run_unittest(*classes): - """Run tests from unittest.TestCase-derived classes.""" - valid_types = (unittest.TestSuite, unittest.TestCase) - suite = unittest.TestSuite() - for cls in classes: - if isinstance(cls, str): - if cls in sys.modules: - suite.addTest(unittest.findTestCases(sys.modules[cls])) - else: - raise ValueError("str arguments must be keys in sys.modules") - elif isinstance(cls, valid_types): - suite.addTest(cls) - else: - suite.addTest(unittest.makeSuite(cls)) - def case_pred(test): - if match_tests is None: - return True - for name in test.id().split("."): - if fnmatch.fnmatchcase(name, match_tests): - return True - return False - _filter_suite(suite, case_pred) - _run_suite(suite) - -#======================================================================= -# Check for the presence of docstrings. - -HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or - sys.platform == 'win32' or - sysconfig.get_config_var('WITH_DOC_STRINGS')) - -requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, - "test requires docstrings") - - -#======================================================================= -# doctest driver. - -def run_doctest(module, verbosity=None, optionflags=0): - """Run doctest on the given module. Return (#failures, #tests). - - If optional argument verbosity is not specified (or is None), pass - support's belief about verbosity on to doctest. Else doctest's - usual behavior is used (it searches sys.argv for -v). - """ - - import doctest - - if verbosity is None: - verbosity = verbose - else: - verbosity = None - - f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) - if f: - raise TestFailed("%d of %d doctests failed" % (f, t)) - if verbose: - print('doctest (%s) ... %d tests with zero failures' % - (module.__name__, t)) - return f, t - - -#======================================================================= -# Support for saving and restoring the imported modules. - -def modules_setup(): - return sys.modules.copy(), - -def modules_cleanup(oldmodules): - # Encoders/decoders are registered permanently within the internal - # codec cache. If we destroy the corresponding modules their - # globals will be set to None which will trip up the cached functions. - encodings = [(k, v) for k, v in sys.modules.items() - if k.startswith('encodings.')] - sys.modules.clear() - sys.modules.update(encodings) - # XXX: This kind of problem can affect more than just encodings. In particular - # extension modules (such as _ssl) don't cope with reloading properly. - # Really, test modules should be cleaning out the test specific modules they - # know they added (ala test_runpy) rather than relying on this function (as - # test_importhooks and test_pkg do currently). - # Implicitly imported *real* modules should be left alone (see issue 10556). - sys.modules.update(oldmodules) - -#======================================================================= -# Threading support to prevent reporting refleaks when running regrtest.py -R - -# NOTE: we use thread._count() rather than threading.enumerate() (or the -# moral equivalent thereof) because a threading.Thread object is still alive -# until its __bootstrap() method has returned, even after it has been -# unregistered from the threading module. -# thread._count(), on the other hand, only gets decremented *after* the -# __bootstrap() method has returned, which gives us reliable reference counts -# at the end of a test run. - -def threading_setup(): - if _thread: - return _thread._count(), threading._dangling.copy() - else: - return 1, () - -def threading_cleanup(*original_values): - if not _thread: - return - _MAX_COUNT = 10 - for count in range(_MAX_COUNT): - values = _thread._count(), threading._dangling - if values == original_values: - break - time.sleep(0.1) - gc_collect() - # XXX print a warning in case of failure? - -def reap_threads(func): - """Use this function when threads are being used. This will - ensure that the threads are cleaned up even when the test fails. - If threading is unavailable this function does nothing. - """ - if not _thread: - return func - - @functools.wraps(func) - def decorator(*args): - key = threading_setup() - try: - return func(*args) - finally: - threading_cleanup(*key) - return decorator - -def reap_children(): - """Use this function at the end of test_main() whenever sub-processes - are started. This will help ensure that no extra children (zombies) - stick around to hog resources and create problems when looking - for refleaks. - """ - - # Reap all our dead child processes so we don't leave zombies around. - # These hog resources and might be causing some of the buildbots to die. - if hasattr(os, 'waitpid'): - any_process = -1 - while True: - try: - # This will raise an exception on Windows. That's ok. - pid, status = os.waitpid(any_process, os.WNOHANG) - if pid == 0: - break - except: - break - -@contextlib.contextmanager -def swap_attr(obj, attr, new_val): - """Temporary swap out an attribute with a new object. - - Usage: - with swap_attr(obj, "attr", 5): - ... - - This will set obj.attr to 5 for the duration of the with: block, - restoring the old value at the end of the block. If `attr` doesn't - exist on `obj`, it will be created and then deleted at the end of the - block. - """ - if hasattr(obj, attr): - real_val = getattr(obj, attr) - setattr(obj, attr, new_val) - try: - yield - finally: - setattr(obj, attr, real_val) - else: - setattr(obj, attr, new_val) - try: - yield - finally: - delattr(obj, attr) - -@contextlib.contextmanager -def swap_item(obj, item, new_val): - """Temporary swap out an item with a new object. - - Usage: - with swap_item(obj, "item", 5): - ... - - This will set obj["item"] to 5 for the duration of the with: block, - restoring the old value at the end of the block. If `item` doesn't - exist on `obj`, it will be created and then deleted at the end of the - block. - """ - if item in obj: - real_val = obj[item] - obj[item] = new_val - try: - yield - finally: - obj[item] = real_val - else: - obj[item] = new_val - try: - yield - finally: - del obj[item] - -def strip_python_stderr(stderr): - """Strip the stderr of a Python process from potential debug output - emitted by the interpreter. - - This will typically be run on the result of the communicate() method - of a subprocess.Popen object. - """ - stderr = re.sub(br"\[\d+ refs\]\r?\n?", b"", stderr).strip() - return stderr - -def args_from_interpreter_flags(): - """Return a list of command-line arguments reproducing the current - settings in sys.flags and sys.warnoptions.""" - return subprocess._args_from_interpreter_flags() - -#============================================================ -# Support for assertions about logging. -#============================================================ - -class TestHandler(logging.handlers.BufferingHandler): - def __init__(self, matcher): - # BufferingHandler takes a "capacity" argument - # so as to know when to flush. As we're overriding - # shouldFlush anyway, we can set a capacity of zero. - # You can call flush() manually to clear out the - # buffer. - logging.handlers.BufferingHandler.__init__(self, 0) - self.matcher = matcher - - def shouldFlush(self): - return False - - def emit(self, record): - self.format(record) - self.buffer.append(record.__dict__) - - def matches(self, **kwargs): - """ - Look for a saved dict whose keys/values match the supplied arguments. - """ - result = False - for d in self.buffer: - if self.matcher.matches(d, **kwargs): - result = True - break - return result - -class Matcher(object): - - _partial_matches = ('msg', 'message') - - def matches(self, d, **kwargs): - """ - Try to match a single dict with the supplied arguments. - - Keys whose values are strings and which are in self._partial_matches - will be checked for partial (i.e. substring) matches. You can extend - this scheme to (for example) do regular expression matching, etc. - """ - result = True - for k in kwargs: - v = kwargs[k] - dv = d.get(k) - if not self.match_value(k, dv, v): - result = False - break - return result - - def match_value(self, k, dv, v): - """ - Try to match a single stored value (dv) with a supplied value (v). - """ - if type(v) != type(dv): - result = False - elif type(dv) is not str or k not in self._partial_matches: - result = (v == dv) - else: - result = dv.find(v) >= 0 - return result - - -_can_symlink = None -def can_symlink(): - global _can_symlink - if _can_symlink is not None: - return _can_symlink - symlink_path = TESTFN + "can_symlink" - try: - os.symlink(TESTFN, symlink_path) - can = True - except (OSError, NotImplementedError, AttributeError): - can = False - else: - os.remove(symlink_path) - _can_symlink = can - return can - -def skip_unless_symlink(test): - """Skip decorator for tests that require functional symlink""" - ok = can_symlink() - msg = "Requires functional symlink implementation" - return test if ok else unittest.skip(msg)(test) - -_can_xattr = None -def can_xattr(): - global _can_xattr - if _can_xattr is not None: - return _can_xattr - if not hasattr(os, "setxattr"): - can = False - else: - tmp_fp, tmp_name = tempfile.mkstemp() - try: - with open(TESTFN, "wb") as fp: - try: - # TESTFN & tempfile may use different file systems with - # different capabilities - os.setxattr(tmp_fp, b"user.test", b"") - os.setxattr(fp.fileno(), b"user.test", b"") - # Kernels < 2.6.39 don't respect setxattr flags. - kernel_version = platform.release() - m = re.match("2.6.(\d{1,2})", kernel_version) - can = m is None or int(m.group(1)) >= 39 - except OSError: - can = False - finally: - unlink(TESTFN) - unlink(tmp_name) - _can_xattr = can - return can - -def skip_unless_xattr(test): - """Skip decorator for tests that require functional extended attributes""" - ok = can_xattr() - msg = "no non-broken extended attribute support" - return test if ok else unittest.skip(msg)(test) - - -if sys.platform.startswith('win'): - @contextlib.contextmanager - def suppress_crash_popup(): - """Disable Windows Error Reporting dialogs using SetErrorMode.""" - # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621%28v=vs.85%29.aspx - # GetErrorMode is not available on Windows XP and Windows Server 2003, - # but SetErrorMode returns the previous value, so we can use that - import ctypes - k32 = ctypes.windll.kernel32 - SEM_NOGPFAULTERRORBOX = 0x02 - old_error_mode = k32.SetErrorMode(SEM_NOGPFAULTERRORBOX) - k32.SetErrorMode(old_error_mode | SEM_NOGPFAULTERRORBOX) - try: - yield - finally: - k32.SetErrorMode(old_error_mode) -else: - # this is a no-op for other platforms - @contextlib.contextmanager - def suppress_crash_popup(): - yield - - -def patch(test_instance, object_to_patch, attr_name, new_value): - """Override 'object_to_patch'.'attr_name' with 'new_value'. - - Also, add a cleanup procedure to 'test_instance' to restore - 'object_to_patch' value for 'attr_name'. - The 'attr_name' should be a valid attribute for 'object_to_patch'. - - """ - # check that 'attr_name' is a real attribute for 'object_to_patch' - # will raise AttributeError if it does not exist - getattr(object_to_patch, attr_name) - - # keep a copy of the old value - attr_is_local = False - try: - old_value = object_to_patch.__dict__[attr_name] - except (AttributeError, KeyError): - old_value = getattr(object_to_patch, attr_name, None) - else: - attr_is_local = True - - # restore the value when the test is done - def cleanup(): - if attr_is_local: - setattr(object_to_patch, attr_name, old_value) - else: - delattr(object_to_patch, attr_name) - - test_instance.addCleanup(cleanup) - - # actually override the attribute - setattr(object_to_patch, attr_name, new_value) diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py new file mode 100644 index 0000000..3ebc49d --- /dev/null +++ b/Lib/test/support/__init__.py @@ -0,0 +1,1990 @@ +"""Supporting definitions for the Python regression tests.""" + +if __name__ != 'test.support': + raise ImportError('support must be imported from the test package') + +import contextlib +import errno +import functools +import gc +import socket +import sys +import os +import platform +import shutil +import warnings +import unittest +import importlib +import collections.abc +import re +import subprocess +import imp +import time +import sysconfig +import fnmatch +import logging.handlers +import struct +import tempfile +import _testcapi + +try: + import _thread, threading +except ImportError: + _thread = None + threading = None +try: + import multiprocessing.process +except ImportError: + multiprocessing = None + +try: + import zlib +except ImportError: + zlib = None + +try: + import gzip +except ImportError: + gzip = None + +try: + import bz2 +except ImportError: + bz2 = None + +try: + import lzma +except ImportError: + lzma = None + +__all__ = [ + "Error", "TestFailed", "ResourceDenied", "import_module", "verbose", + "use_resources", "max_memuse", "record_original_stdout", + "get_original_stdout", "unload", "unlink", "rmtree", "forget", + "is_resource_enabled", "requires", "requires_freebsd_version", + "requires_linux_version", "requires_mac_ver", "find_unused_port", + "bind_port", "IPV6_ENABLED", "is_jython", "TESTFN", "HOST", "SAVEDCWD", + "temp_cwd", "findfile", "create_empty_file", "sortdict", + "check_syntax_error", "open_urlresource", "check_warnings", "CleanImport", + "EnvironmentVarGuard", "TransientResource", "captured_stdout", + "captured_stdin", "captured_stderr", "time_out", "socket_peer_reset", + "ioerror_peer_reset", "run_with_locale", 'temp_umask', + "transient_internet", "set_memlimit", "bigmemtest", "bigaddrspacetest", + "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", + "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail", + "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754", + "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink", + "skip_unless_xattr", "import_fresh_module", "requires_zlib", + "PIPE_MAX_SIZE", "failfast", "anticipate_failure", "run_with_tz", + "requires_gzip", "requires_bz2", "requires_lzma", "suppress_crash_popup", + ] + +class Error(Exception): + """Base class for regression test exceptions.""" + +class TestFailed(Error): + """Test failed.""" + +class ResourceDenied(unittest.SkipTest): + """Test skipped because it requested a disallowed resource. + + This is raised when a test calls requires() for a resource that + has not be enabled. It is used to distinguish between expected + and unexpected skips. + """ + +@contextlib.contextmanager +def _ignore_deprecated_imports(ignore=True): + """Context manager to suppress package and module deprecation + warnings when importing them. + + If ignore is False, this context manager has no effect.""" + if ignore: + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", ".+ (module|package)", + DeprecationWarning) + yield + else: + yield + + +def import_module(name, deprecated=False): + """Import and return the module to be tested, raising SkipTest if + it is not available. + + If deprecated is True, any module or package deprecation messages + will be suppressed.""" + with _ignore_deprecated_imports(deprecated): + try: + return importlib.import_module(name) + except ImportError as msg: + raise unittest.SkipTest(str(msg)) + + +def _save_and_remove_module(name, orig_modules): + """Helper function to save and remove a module from sys.modules + + Raise ImportError if the module can't be imported.""" + # try to import the module and raise an error if it can't be imported + if name not in sys.modules: + __import__(name) + del sys.modules[name] + for modname in list(sys.modules): + if modname == name or modname.startswith(name + '.'): + orig_modules[modname] = sys.modules[modname] + del sys.modules[modname] + +def _save_and_block_module(name, orig_modules): + """Helper function to save and block a module in sys.modules + + Return True if the module was in sys.modules, False otherwise.""" + saved = True + try: + orig_modules[name] = sys.modules[name] + except KeyError: + saved = False + sys.modules[name] = None + return saved + + +def anticipate_failure(condition): + """Decorator to mark a test that is known to be broken in some cases + + Any use of this decorator should have a comment identifying the + associated tracker issue. + """ + if condition: + return unittest.expectedFailure + return lambda f: f + + +def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): + """Imports and returns a module, deliberately bypassing the sys.modules cache + and importing a fresh copy of the module. Once the import is complete, + the sys.modules cache is restored to its original state. + + Modules named in fresh are also imported anew if needed by the import. + If one of these modules can't be imported, None is returned. + + Importing of modules named in blocked is prevented while the fresh import + takes place. + + If deprecated is True, any module or package deprecation messages + will be suppressed.""" + # NOTE: test_heapq, test_json and test_warnings include extra sanity checks + # to make sure that this utility function is working as expected + with _ignore_deprecated_imports(deprecated): + # Keep track of modules saved for later restoration as well + # as those which just need a blocking entry removed + orig_modules = {} + names_to_remove = [] + _save_and_remove_module(name, orig_modules) + try: + for fresh_name in fresh: + _save_and_remove_module(fresh_name, orig_modules) + for blocked_name in blocked: + if not _save_and_block_module(blocked_name, orig_modules): + names_to_remove.append(blocked_name) + fresh_module = importlib.import_module(name) + except ImportError: + fresh_module = None + finally: + for orig_name, module in orig_modules.items(): + sys.modules[orig_name] = module + for name_to_remove in names_to_remove: + del sys.modules[name_to_remove] + return fresh_module + + +def get_attribute(obj, name): + """Get an attribute, raising SkipTest if AttributeError is raised.""" + try: + attribute = getattr(obj, name) + except AttributeError: + raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) + else: + return attribute + +verbose = 1 # Flag set to 0 by regrtest.py +use_resources = None # Flag set to [] by regrtest.py +max_memuse = 0 # Disable bigmem tests (they will still be run with + # small sizes, to make sure they work.) +real_max_memuse = 0 +failfast = False +match_tests = None + +# _original_stdout is meant to hold stdout at the time regrtest began. +# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. +# The point is to have some flavor of stdout the user can actually see. +_original_stdout = None +def record_original_stdout(stdout): + global _original_stdout + _original_stdout = stdout + +def get_original_stdout(): + return _original_stdout or sys.stdout + +def unload(name): + try: + del sys.modules[name] + except KeyError: + pass + +if sys.platform.startswith("win"): + def _waitfor(func, pathname, waitall=False): + # Peform the operation + func(pathname) + # Now setup the wait loop + if waitall: + dirname = pathname + else: + dirname, name = os.path.split(pathname) + dirname = dirname or '.' + # Check for `pathname` to be removed from the filesystem. + # The exponential backoff of the timeout amounts to a total + # of ~1 second after which the deletion is probably an error + # anyway. + # Testing on a i7@4.3GHz shows that usually only 1 iteration is + # required when contention occurs. + timeout = 0.001 + while timeout < 1.0: + # Note we are only testing for the existance of the file(s) in + # the contents of the directory regardless of any security or + # access rights. If we have made it this far, we have sufficient + # permissions to do that much using Python's equivalent of the + # Windows API FindFirstFile. + # Other Windows APIs can fail or give incorrect results when + # dealing with files that are pending deletion. + L = os.listdir(dirname) + if not (L if waitall else name in L): + return + # Increase the timeout and try again + time.sleep(timeout) + timeout *= 2 + warnings.warn('tests may fail, delete still pending for ' + pathname, + RuntimeWarning, stacklevel=4) + + def _unlink(filename): + _waitfor(os.unlink, filename) + + def _rmdir(dirname): + _waitfor(os.rmdir, dirname) + + def _rmtree(path): + def _rmtree_inner(path): + for name in os.listdir(path): + fullname = os.path.join(path, name) + if os.path.isdir(fullname): + _waitfor(_rmtree_inner, fullname, waitall=True) + os.rmdir(fullname) + else: + os.unlink(fullname) + _waitfor(_rmtree_inner, path, waitall=True) + _waitfor(os.rmdir, path) +else: + _unlink = os.unlink + _rmdir = os.rmdir + _rmtree = shutil.rmtree + +def unlink(filename): + try: + _unlink(filename) + except OSError as error: + # The filename need not exist. + if error.errno not in (errno.ENOENT, errno.ENOTDIR): + raise + +def rmdir(dirname): + try: + _rmdir(dirname) + except OSError as error: + # The directory need not exist. + if error.errno != errno.ENOENT: + raise + +def rmtree(path): + try: + _rmtree(path) + except OSError as error: + if error.errno != errno.ENOENT: + raise + +def make_legacy_pyc(source): + """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location. + + The choice of .pyc or .pyo extension is done based on the __debug__ flag + value. + + :param source: The file system path to the source file. The source file + does not need to exist, however the PEP 3147 pyc file must exist. + :return: The file system path to the legacy pyc file. + """ + pyc_file = imp.cache_from_source(source) + up_one = os.path.dirname(os.path.abspath(source)) + legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o')) + os.rename(pyc_file, legacy_pyc) + return legacy_pyc + +def forget(modname): + """'Forget' a module was ever imported. + + This removes the module from sys.modules and deletes any PEP 3147 or + legacy .pyc and .pyo files. + """ + unload(modname) + for dirname in sys.path: + source = os.path.join(dirname, modname + '.py') + # It doesn't matter if they exist or not, unlink all possible + # combinations of PEP 3147 and legacy pyc and pyo files. + unlink(source + 'c') + unlink(source + 'o') + unlink(imp.cache_from_source(source, debug_override=True)) + unlink(imp.cache_from_source(source, debug_override=False)) + +# On some platforms, should not run gui test even if it is allowed +# in `use_resources'. +if sys.platform.startswith('win'): + import ctypes + import ctypes.wintypes + def _is_gui_available(): + UOI_FLAGS = 1 + WSF_VISIBLE = 0x0001 + class USEROBJECTFLAGS(ctypes.Structure): + _fields_ = [("fInherit", ctypes.wintypes.BOOL), + ("fReserved", ctypes.wintypes.BOOL), + ("dwFlags", ctypes.wintypes.DWORD)] + dll = ctypes.windll.user32 + h = dll.GetProcessWindowStation() + if not h: + raise ctypes.WinError() + uof = USEROBJECTFLAGS() + needed = ctypes.wintypes.DWORD() + res = dll.GetUserObjectInformationW(h, + UOI_FLAGS, + ctypes.byref(uof), + ctypes.sizeof(uof), + ctypes.byref(needed)) + if not res: + raise ctypes.WinError() + return bool(uof.dwFlags & WSF_VISIBLE) +else: + def _is_gui_available(): + return True + +def is_resource_enabled(resource): + """Test whether a resource is enabled. Known resources are set by + regrtest.py.""" + return use_resources is not None and resource in use_resources + +def requires(resource, msg=None): + """Raise ResourceDenied if the specified resource is not available. + + If the caller's module is __main__ then automatically return True. The + possibility of False being returned occurs when regrtest.py is + executing. + """ + if resource == 'gui' and not _is_gui_available(): + raise unittest.SkipTest("Cannot use the 'gui' resource") + # see if the caller's module is __main__ - if so, treat as if + # the resource was set + if sys._getframe(1).f_globals.get("__name__") == "__main__": + return + if not is_resource_enabled(resource): + if msg is None: + msg = "Use of the %r resource not enabled" % resource + raise ResourceDenied(msg) + +def _requires_unix_version(sysname, min_version): + """Decorator raising SkipTest if the OS is `sysname` and the version is less + than `min_version`. + + For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if + the FreeBSD version is less than 7.2. + """ + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kw): + if platform.system() == sysname: + version_txt = platform.release().split('-', 1)[0] + try: + version = tuple(map(int, version_txt.split('.'))) + except ValueError: + pass + else: + if version < min_version: + min_version_txt = '.'.join(map(str, min_version)) + raise unittest.SkipTest( + "%s version %s or higher required, not %s" + % (sysname, min_version_txt, version_txt)) + return wrapper + return decorator + +def requires_freebsd_version(*min_version): + """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is + less than `min_version`. + + For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD + version is less than 7.2. + """ + return _requires_unix_version('FreeBSD', min_version) + +def requires_linux_version(*min_version): + """Decorator raising SkipTest if the OS is Linux and the Linux version is + less than `min_version`. + + For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux + version is less than 2.6.32. + """ + return _requires_unix_version('Linux', min_version) + +def requires_mac_ver(*min_version): + """Decorator raising SkipTest if the OS is Mac OS X and the OS X + version if less than min_version. + + For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version + is lesser than 10.5. + """ + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kw): + if sys.platform == 'darwin': + version_txt = platform.mac_ver()[0] + try: + version = tuple(map(int, version_txt.split('.'))) + except ValueError: + pass + else: + if version < min_version: + min_version_txt = '.'.join(map(str, min_version)) + raise unittest.SkipTest( + "Mac OS X %s or higher required, not %s" + % (min_version_txt, version_txt)) + return func(*args, **kw) + wrapper.min_version = min_version + return wrapper + return decorator + + +HOST = 'localhost' + +def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): + """Returns an unused port that should be suitable for binding. This is + achieved by creating a temporary socket with the same family and type as + the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to + the specified host address (defaults to 0.0.0.0) with the port set to 0, + eliciting an unused ephemeral port from the OS. The temporary socket is + then closed and deleted, and the ephemeral port is returned. + + Either this method or bind_port() should be used for any tests where a + server socket needs to be bound to a particular port for the duration of + the test. Which one to use depends on whether the calling code is creating + a python socket, or if an unused port needs to be provided in a constructor + or passed to an external program (i.e. the -accept argument to openssl's + s_server mode). Always prefer bind_port() over find_unused_port() where + possible. Hard coded ports should *NEVER* be used. As soon as a server + socket is bound to a hard coded port, the ability to run multiple instances + of the test simultaneously on the same host is compromised, which makes the + test a ticking time bomb in a buildbot environment. On Unix buildbots, this + may simply manifest as a failed test, which can be recovered from without + intervention in most cases, but on Windows, the entire python process can + completely and utterly wedge, requiring someone to log in to the buildbot + and manually kill the affected process. + + (This is easy to reproduce on Windows, unfortunately, and can be traced to + the SO_REUSEADDR socket option having different semantics on Windows versus + Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, + listen and then accept connections on identical host/ports. An EADDRINUSE + socket.error will be raised at some point (depending on the platform and + the order bind and listen were called on each socket). + + However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE + will ever be raised when attempting to bind two identical host/ports. When + accept() is called on each socket, the second caller's process will steal + the port from the first caller, leaving them both in an awkwardly wedged + state where they'll no longer respond to any signals or graceful kills, and + must be forcibly killed via OpenProcess()/TerminateProcess(). + + The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option + instead of SO_REUSEADDR, which effectively affords the same semantics as + SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open + Source world compared to Windows ones, this is a common mistake. A quick + look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when + openssl.exe is called with the 's_server' option, for example. See + http://bugs.python.org/issue2550 for more info. The following site also + has a very thorough description about the implications of both REUSEADDR + and EXCLUSIVEADDRUSE on Windows: + http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) + + XXX: although this approach is a vast improvement on previous attempts to + elicit unused ports, it rests heavily on the assumption that the ephemeral + port returned to us by the OS won't immediately be dished back out to some + other process when we close and delete our temporary socket but before our + calling code has a chance to bind the returned port. We can deal with this + issue if/when we come across it. + """ + + tempsock = socket.socket(family, socktype) + port = bind_port(tempsock) + tempsock.close() + del tempsock + return port + +def bind_port(sock, host=HOST): + """Bind the socket to a free port and return the port number. Relies on + ephemeral ports in order to ensure we are using an unbound port. This is + important as many tests may be running simultaneously, especially in a + buildbot environment. This method raises an exception if the sock.family + is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR + or SO_REUSEPORT set on it. Tests should *never* set these socket options + for TCP/IP sockets. The only case for setting these options is testing + multicasting via multiple UDP sockets. + + Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. + on Windows), it will be set on the socket. This will prevent anyone else + from bind()'ing to our host/port for the duration of the test. + """ + + if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: + if hasattr(socket, 'SO_REUSEADDR'): + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: + raise TestFailed("tests should never set the SO_REUSEADDR " \ + "socket option on TCP/IP sockets!") + if hasattr(socket, 'SO_REUSEPORT'): + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: + raise TestFailed("tests should never set the SO_REUSEPORT " \ + "socket option on TCP/IP sockets!") + if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) + + sock.bind((host, 0)) + port = sock.getsockname()[1] + return port + +def _is_ipv6_enabled(): + """Check whether IPv6 is enabled on this host.""" + if socket.has_ipv6: + sock = None + try: + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.bind(('::1', 0)) + return True + except (socket.error, socket.gaierror): + pass + finally: + if sock: + sock.close() + return False + +IPV6_ENABLED = _is_ipv6_enabled() + + +# A constant likely larger than the underlying OS pipe buffer size, to +# make writes blocking. +# Windows limit seems to be around 512 B, and many Unix kernels have a +# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. +# (see issue #17835 for a discussion of this number). +PIPE_MAX_SIZE = 4 *1024 * 1024 + 1 + + +# decorator for skipping tests on non-IEEE 754 platforms +requires_IEEE_754 = unittest.skipUnless( + float.__getformat__("double").startswith("IEEE"), + "test requires IEEE 754 doubles") + +requires_zlib = unittest.skipUnless(zlib, 'requires zlib') + +requires_gzip = unittest.skipUnless(gzip, 'requires gzip') + +requires_bz2 = unittest.skipUnless(bz2, 'requires bz2') + +requires_lzma = unittest.skipUnless(lzma, 'requires lzma') + +is_jython = sys.platform.startswith('java') + +# Filename used for testing +if os.name == 'java': + # Jython disallows @ in module names + TESTFN = '$test' +else: + TESTFN = '@test' + +# Disambiguate TESTFN for parallel testing, while letting it remain a valid +# module name. +TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) + +# FS_NONASCII: non-ASCII character encodable by os.fsencode(), +# or None if there is no such character. +FS_NONASCII = None +for character in ( + # First try printable and common characters to have a readable filename. + # For each character, the encoding list are just example of encodings able + # to encode the character (the list is not exhaustive). + + # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 + '\u00E6', + # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 + '\u0130', + # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 + '\u0141', + # U+03C6 (Greek Small Letter Phi): cp1253 + '\u03C6', + # U+041A (Cyrillic Capital Letter Ka): cp1251 + '\u041A', + # U+05D0 (Hebrew Letter Alef): Encodable to cp424 + '\u05D0', + # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic + '\u060C', + # U+062A (Arabic Letter Teh): cp720 + '\u062A', + # U+0E01 (Thai Character Ko Kai): cp874 + '\u0E01', + + # Then try more "special" characters. "special" because they may be + # interpreted or displayed differently depending on the exact locale + # encoding and the font. + + # U+00A0 (No-Break Space) + '\u00A0', + # U+20AC (Euro Sign) + '\u20AC', +): + try: + os.fsdecode(os.fsencode(character)) + except UnicodeError: + pass + else: + FS_NONASCII = character + break + +# TESTFN_UNICODE is a non-ascii filename +TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f" +if sys.platform == 'darwin': + # In Mac OS X's VFS API file names are, by definition, canonically + # decomposed Unicode, encoded using UTF-8. See QA1173: + # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html + import unicodedata + TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) +TESTFN_ENCODING = sys.getfilesystemencoding() + +# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be +# encoded by the filesystem encoding (in strict mode). It can be None if we +# cannot generate such filename. +TESTFN_UNENCODABLE = None +if os.name in ('nt', 'ce'): + # skip win32s (0) or Windows 9x/ME (1) + if sys.getwindowsversion().platform >= 2: + # Different kinds of characters from various languages to minimize the + # probability that the whole name is encodable to MBCS (issue #9819) + TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80" + try: + TESTFN_UNENCODABLE.encode(TESTFN_ENCODING) + except UnicodeEncodeError: + pass + else: + print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). ' + 'Unicode filename tests may not be effective' + % (TESTFN_UNENCODABLE, TESTFN_ENCODING)) + TESTFN_UNENCODABLE = None +# Mac OS X denies unencodable filenames (invalid utf-8) +elif sys.platform != 'darwin': + try: + # ascii and utf-8 cannot encode the byte 0xff + b'\xff'.decode(TESTFN_ENCODING) + except UnicodeDecodeError: + # 0xff will be encoded using the surrogate character u+DCFF + TESTFN_UNENCODABLE = TESTFN \ + + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape') + else: + # File system encoding (eg. ISO-8859-* encodings) can encode + # the byte 0xff. Skip some unicode filename tests. + pass + +# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be +# decoded from the filesystem encoding (in strict mode). It can be None if we +# cannot generate such filename (ex: the latin1 encoding can decode any byte +# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks +# to the surrogateescape error handler (PEP 383), but not from the filesystem +# encoding in strict mode. +TESTFN_UNDECODABLE = None +for name in ( + # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows + # accepts it to create a file or a directory, or don't accept to enter to + # such directory (when the bytes name is used). So test b'\xe7' first: it is + # not decodable from cp932. + b'\xe7w\xf0', + # undecodable from ASCII, UTF-8 + b'\xff', + # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 + # and cp857 + b'\xae\xd5' + # undecodable from UTF-8 (UNIX and Mac OS X) + b'\xed\xb2\x80', b'\xed\xb4\x80', + # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, + # cp1253, cp1254, cp1255, cp1257, cp1258 + b'\x81\x98', +): + try: + name.decode(TESTFN_ENCODING) + except UnicodeDecodeError: + TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name + break + +if FS_NONASCII: + TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII +else: + TESTFN_NONASCII = None + +# Save the initial cwd +SAVEDCWD = os.getcwd() + +@contextlib.contextmanager +def temp_cwd(name='tempcwd', quiet=False, path=None): + """ + Context manager that temporarily changes the CWD. + + An existing path may be provided as *path*, in which case this + function makes no changes to the file system. + + Otherwise, the new CWD is created in the current directory and it's + named *name*. If *quiet* is False (default) and it's not possible to + create or change the CWD, an error is raised. If it's True, only a + warning is raised and the original CWD is used. + """ + saved_dir = os.getcwd() + is_temporary = False + if path is None: + path = name + try: + os.mkdir(name) + is_temporary = True + except OSError: + if not quiet: + raise + warnings.warn('tests may fail, unable to create temp CWD ' + name, + RuntimeWarning, stacklevel=3) + try: + os.chdir(path) + except OSError: + if not quiet: + raise + warnings.warn('tests may fail, unable to change the CWD to ' + path, + RuntimeWarning, stacklevel=3) + try: + yield os.getcwd() + finally: + os.chdir(saved_dir) + if is_temporary: + rmtree(name) + + +if hasattr(os, "umask"): + @contextlib.contextmanager + def temp_umask(umask): + """Context manager that temporarily sets the process umask.""" + oldmask = os.umask(umask) + try: + yield + finally: + os.umask(oldmask) + +# TEST_HOME refers to the top level directory of the "test" package +# that contains Python's regression test suite +TEST_HOME = os.path.dirname(os.path.abspath(__file__)) + +def findfile(file, here=TEST_HOME, subdir=None): + """Try to find a file on sys.path or in the test directory. If it is not + found the argument passed to the function is returned (this does not + necessarily signal failure; could still be the legitimate path).""" + if os.path.isabs(file): + return file + if subdir is not None: + file = os.path.join(subdir, file) + path = sys.path + path = [os.path.dirname(here)] + path + for dn in path: + fn = os.path.join(dn, file) + if os.path.exists(fn): return fn + return file + +def create_empty_file(filename): + """Create an empty file. If the file already exists, truncate it.""" + fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) + os.close(fd) + +def sortdict(dict): + "Like repr(dict), but in sorted order." + items = sorted(dict.items()) + reprpairs = ["%r: %r" % pair for pair in items] + withcommas = ", ".join(reprpairs) + return "{%s}" % withcommas + +def make_bad_fd(): + """ + Create an invalid file descriptor by opening and closing a file and return + its fd. + """ + file = open(TESTFN, "wb") + try: + return file.fileno() + finally: + file.close() + unlink(TESTFN) + +def check_syntax_error(testcase, statement): + testcase.assertRaises(SyntaxError, compile, statement, + '', 'exec') + +def open_urlresource(url, *args, **kw): + import urllib.request, urllib.parse + + check = kw.pop('check', None) + + filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! + + fn = os.path.join(os.path.dirname(__file__), "data", filename) + + def check_valid_file(fn): + f = open(fn, *args, **kw) + if check is None: + return f + elif check(f): + f.seek(0) + return f + f.close() + + if os.path.exists(fn): + f = check_valid_file(fn) + if f is not None: + return f + unlink(fn) + + # Verify the requirement before downloading the file + requires('urlfetch') + + print('\tfetching %s ...' % url, file=get_original_stdout()) + f = urllib.request.urlopen(url, timeout=15) + try: + with open(fn, "wb") as out: + s = f.read() + while s: + out.write(s) + s = f.read() + finally: + f.close() + + f = check_valid_file(fn) + if f is not None: + return f + raise TestFailed('invalid resource %r' % fn) + + +class WarningsRecorder(object): + """Convenience wrapper for the warnings list returned on + entry to the warnings.catch_warnings() context manager. + """ + def __init__(self, warnings_list): + self._warnings = warnings_list + self._last = 0 + + def __getattr__(self, attr): + if len(self._warnings) > self._last: + return getattr(self._warnings[-1], attr) + elif attr in warnings.WarningMessage._WARNING_DETAILS: + return None + raise AttributeError("%r has no attribute %r" % (self, attr)) + + @property + def warnings(self): + return self._warnings[self._last:] + + def reset(self): + self._last = len(self._warnings) + + +def _filterwarnings(filters, quiet=False): + """Catch the warnings, then check if all the expected + warnings have been raised and re-raise unexpected warnings. + If 'quiet' is True, only re-raise the unexpected warnings. + """ + # Clear the warning registry of the calling module + # in order to re-raise the warnings. + frame = sys._getframe(2) + registry = frame.f_globals.get('__warningregistry__') + if registry: + registry.clear() + with warnings.catch_warnings(record=True) as w: + # Set filter "always" to record all warnings. Because + # test_warnings swap the module, we need to look up in + # the sys.modules dictionary. + sys.modules['warnings'].simplefilter("always") + yield WarningsRecorder(w) + # Filter the recorded warnings + reraise = list(w) + missing = [] + for msg, cat in filters: + seen = False + for w in reraise[:]: + warning = w.message + # Filter out the matching messages + if (re.match(msg, str(warning), re.I) and + issubclass(warning.__class__, cat)): + seen = True + reraise.remove(w) + if not seen and not quiet: + # This filter caught nothing + missing.append((msg, cat.__name__)) + if reraise: + raise AssertionError("unhandled warning %s" % reraise[0]) + if missing: + raise AssertionError("filter (%r, %s) did not catch any warning" % + missing[0]) + + +@contextlib.contextmanager +def check_warnings(*filters, **kwargs): + """Context manager to silence warnings. + + Accept 2-tuples as positional arguments: + ("message regexp", WarningCategory) + + Optional argument: + - if 'quiet' is True, it does not fail if a filter catches nothing + (default True without argument, + default False if some filters are defined) + + Without argument, it defaults to: + check_warnings(("", Warning), quiet=True) + """ + quiet = kwargs.get('quiet') + if not filters: + filters = (("", Warning),) + # Preserve backward compatibility + if quiet is None: + quiet = True + return _filterwarnings(filters, quiet) + + +class CleanImport(object): + """Context manager to force import to return a new module reference. + + This is useful for testing module-level behaviours, such as + the emission of a DeprecationWarning on import. + + Use like this: + + with CleanImport("foo"): + importlib.import_module("foo") # new reference + """ + + def __init__(self, *module_names): + self.original_modules = sys.modules.copy() + for module_name in module_names: + if module_name in sys.modules: + module = sys.modules[module_name] + # It is possible that module_name is just an alias for + # another module (e.g. stub for modules renamed in 3.x). + # In that case, we also need delete the real module to clear + # the import cache. + if module.__name__ != module_name: + del sys.modules[module.__name__] + del sys.modules[module_name] + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + sys.modules.update(self.original_modules) + + +class EnvironmentVarGuard(collections.abc.MutableMapping): + + """Class to help protect the environment variable properly. Can be used as + a context manager.""" + + def __init__(self): + self._environ = os.environ + self._changed = {} + + def __getitem__(self, envvar): + return self._environ[envvar] + + def __setitem__(self, envvar, value): + # Remember the initial value on the first access + if envvar not in self._changed: + self._changed[envvar] = self._environ.get(envvar) + self._environ[envvar] = value + + def __delitem__(self, envvar): + # Remember the initial value on the first access + if envvar not in self._changed: + self._changed[envvar] = self._environ.get(envvar) + if envvar in self._environ: + del self._environ[envvar] + + def keys(self): + return self._environ.keys() + + def __iter__(self): + return iter(self._environ) + + def __len__(self): + return len(self._environ) + + def set(self, envvar, value): + self[envvar] = value + + def unset(self, envvar): + del self[envvar] + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + for (k, v) in self._changed.items(): + if v is None: + if k in self._environ: + del self._environ[k] + else: + self._environ[k] = v + os.environ = self._environ + + +class DirsOnSysPath(object): + """Context manager to temporarily add directories to sys.path. + + This makes a copy of sys.path, appends any directories given + as positional arguments, then reverts sys.path to the copied + settings when the context ends. + + Note that *all* sys.path modifications in the body of the + context manager, including replacement of the object, + will be reverted at the end of the block. + """ + + def __init__(self, *paths): + self.original_value = sys.path[:] + self.original_object = sys.path + sys.path.extend(paths) + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + sys.path = self.original_object + sys.path[:] = self.original_value + + +class TransientResource(object): + + """Raise ResourceDenied if an exception is raised while the context manager + is in effect that matches the specified exception and attributes.""" + + def __init__(self, exc, **kwargs): + self.exc = exc + self.attrs = kwargs + + def __enter__(self): + return self + + def __exit__(self, type_=None, value=None, traceback=None): + """If type_ is a subclass of self.exc and value has attributes matching + self.attrs, raise ResourceDenied. Otherwise let the exception + propagate (if any).""" + if type_ is not None and issubclass(self.exc, type_): + for attr, attr_value in self.attrs.items(): + if not hasattr(value, attr): + break + if getattr(value, attr) != attr_value: + break + else: + raise ResourceDenied("an optional resource is not available") + +# Context managers that raise ResourceDenied when various issues +# with the Internet connection manifest themselves as exceptions. +# XXX deprecate these and use transient_internet() instead +time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) +socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) +ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) + + +@contextlib.contextmanager +def transient_internet(resource_name, *, timeout=30.0, errnos=()): + """Return a context manager that raises ResourceDenied when various issues + with the Internet connection manifest themselves as exceptions.""" + default_errnos = [ + ('ECONNREFUSED', 111), + ('ECONNRESET', 104), + ('EHOSTUNREACH', 113), + ('ENETUNREACH', 101), + ('ETIMEDOUT', 110), + ] + default_gai_errnos = [ + ('EAI_AGAIN', -3), + ('EAI_FAIL', -4), + ('EAI_NONAME', -2), + ('EAI_NODATA', -5), + # Encountered when trying to resolve IPv6-only hostnames + ('WSANO_DATA', 11004), + ] + + denied = ResourceDenied("Resource %r is not available" % resource_name) + captured_errnos = errnos + gai_errnos = [] + if not captured_errnos: + captured_errnos = [getattr(errno, name, num) + for (name, num) in default_errnos] + gai_errnos = [getattr(socket, name, num) + for (name, num) in default_gai_errnos] + + def filter_error(err): + n = getattr(err, 'errno', None) + if (isinstance(err, socket.timeout) or + (isinstance(err, socket.gaierror) and n in gai_errnos) or + n in captured_errnos): + if not verbose: + sys.stderr.write(denied.args[0] + "\n") + raise denied from err + + old_timeout = socket.getdefaulttimeout() + try: + if timeout is not None: + socket.setdefaulttimeout(timeout) + yield + except IOError as err: + # urllib can wrap original socket errors multiple times (!), we must + # unwrap to get at the original error. + while True: + a = err.args + if len(a) >= 1 and isinstance(a[0], IOError): + err = a[0] + # The error can also be wrapped as args[1]: + # except socket.error as msg: + # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2]) + elif len(a) >= 2 and isinstance(a[1], IOError): + err = a[1] + else: + break + filter_error(err) + raise + # XXX should we catch generic exceptions and look for their + # __cause__ or __context__? + finally: + socket.setdefaulttimeout(old_timeout) + + +@contextlib.contextmanager +def captured_output(stream_name): + """Return a context manager used by captured_stdout/stdin/stderr + that temporarily replaces the sys stream *stream_name* with a StringIO.""" + import io + orig_stdout = getattr(sys, stream_name) + setattr(sys, stream_name, io.StringIO()) + try: + yield getattr(sys, stream_name) + finally: + setattr(sys, stream_name, orig_stdout) + +def captured_stdout(): + """Capture the output of sys.stdout: + + with captured_stdout() as stdout: + print("hello") + self.assertEqual(stdout.getvalue(), "hello\n") + """ + return captured_output("stdout") + +def captured_stderr(): + """Capture the output of sys.stderr: + + with captured_stderr() as stderr: + print("hello", file=sys.stderr) + self.assertEqual(stderr.getvalue(), "hello\n") + """ + return captured_output("stderr") + +def captured_stdin(): + """Capture the input to sys.stdin: + + with captured_stdin() as stdin: + stdin.write('hello\n') + stdin.seek(0) + # call test code that consumes from sys.stdin + captured = input() + self.assertEqual(captured, "hello") + """ + return captured_output("stdin") + + +def gc_collect(): + """Force as many objects as possible to be collected. + + In non-CPython implementations of Python, this is needed because timely + deallocation is not guaranteed by the garbage collector. (Even in CPython + this can be the case in case of reference cycles.) This means that __del__ + methods may be called later than expected and weakrefs may remain alive for + longer than expected. This function tries its best to force all garbage + objects to disappear. + """ + gc.collect() + if is_jython: + time.sleep(0.1) + gc.collect() + gc.collect() + +@contextlib.contextmanager +def disable_gc(): + have_gc = gc.isenabled() + gc.disable() + try: + yield + finally: + if have_gc: + gc.enable() + + +def python_is_optimized(): + """Find if Python was built with optimizations.""" + cflags = sysconfig.get_config_var('PY_CFLAGS') or '' + final_opt = "" + for opt in cflags.split(): + if opt.startswith('-O'): + final_opt = opt + return final_opt != '' and final_opt != '-O0' + + +_header = 'nP' +_align = '0n' +if hasattr(sys, "gettotalrefcount"): + _header = '2P' + _header + _align = '0P' +_vheader = _header + 'n' + +def calcobjsize(fmt): + return struct.calcsize(_header + fmt + _align) + +def calcvobjsize(fmt): + return struct.calcsize(_vheader + fmt + _align) + + +_TPFLAGS_HAVE_GC = 1<<14 +_TPFLAGS_HEAPTYPE = 1<<9 + +def check_sizeof(test, o, size): + result = sys.getsizeof(o) + # add GC header size + if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ + ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): + size += _testcapi.SIZEOF_PYGC_HEAD + msg = 'wrong size for %s: got %d, expected %d' \ + % (type(o), result, size) + test.assertEqual(result, size, msg) + +#======================================================================= +# Decorator for running a function in a different locale, correctly resetting +# it afterwards. + +def run_with_locale(catstr, *locales): + def decorator(func): + def inner(*args, **kwds): + try: + import locale + category = getattr(locale, catstr) + orig_locale = locale.setlocale(category) + except AttributeError: + # if the test author gives us an invalid category string + raise + except: + # cannot retrieve original locale, so do nothing + locale = orig_locale = None + else: + for loc in locales: + try: + locale.setlocale(category, loc) + break + except: + pass + + # now run the function, resetting the locale on exceptions + try: + return func(*args, **kwds) + finally: + if locale and orig_locale: + locale.setlocale(category, orig_locale) + inner.__name__ = func.__name__ + inner.__doc__ = func.__doc__ + return inner + return decorator + +#======================================================================= +# Decorator for running a function in a specific timezone, correctly +# resetting it afterwards. + +def run_with_tz(tz): + def decorator(func): + def inner(*args, **kwds): + try: + tzset = time.tzset + except AttributeError: + raise unittest.SkipTest("tzset required") + if 'TZ' in os.environ: + orig_tz = os.environ['TZ'] + else: + orig_tz = None + os.environ['TZ'] = tz + tzset() + + # now run the function, resetting the tz on exceptions + try: + return func(*args, **kwds) + finally: + if orig_tz is None: + del os.environ['TZ'] + else: + os.environ['TZ'] = orig_tz + time.tzset() + + inner.__name__ = func.__name__ + inner.__doc__ = func.__doc__ + return inner + return decorator + +#======================================================================= +# Big-memory-test support. Separate from 'resources' because memory use +# should be configurable. + +# Some handy shorthands. Note that these are used for byte-limits as well +# as size-limits, in the various bigmem tests +_1M = 1024*1024 +_1G = 1024 * _1M +_2G = 2 * _1G +_4G = 4 * _1G + +MAX_Py_ssize_t = sys.maxsize + +def set_memlimit(limit): + global max_memuse + global real_max_memuse + sizes = { + 'k': 1024, + 'm': _1M, + 'g': _1G, + 't': 1024*_1G, + } + m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, + re.IGNORECASE | re.VERBOSE) + if m is None: + raise ValueError('Invalid memory limit %r' % (limit,)) + memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) + real_max_memuse = memlimit + if memlimit > MAX_Py_ssize_t: + memlimit = MAX_Py_ssize_t + if memlimit < _2G - 1: + raise ValueError('Memory limit %r too low to be useful' % (limit,)) + max_memuse = memlimit + +class _MemoryWatchdog: + """An object which periodically watches the process' memory consumption + and prints it out. + """ + + def __init__(self): + self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) + self.started = False + + def start(self): + try: + f = open(self.procfile, 'r') + except OSError as e: + warnings.warn('/proc not available for stats: {}'.format(e), + RuntimeWarning) + sys.stderr.flush() + return + + watchdog_script = findfile("memory_watchdog.py") + self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], + stdin=f, stderr=subprocess.DEVNULL) + f.close() + self.started = True + + def stop(self): + if self.started: + self.mem_watchdog.terminate() + self.mem_watchdog.wait() + + +def bigmemtest(size, memuse, dry_run=True): + """Decorator for bigmem tests. + + 'minsize' is the minimum useful size for the test (in arbitrary, + test-interpreted units.) 'memuse' is the number of 'bytes per size' for + the test, or a good estimate of it. + + if 'dry_run' is False, it means the test doesn't support dummy runs + when -M is not specified. + """ + def decorator(f): + def wrapper(self): + size = wrapper.size + memuse = wrapper.memuse + if not real_max_memuse: + maxsize = 5147 + else: + maxsize = size + + if ((real_max_memuse or not dry_run) + and real_max_memuse < maxsize * memuse): + raise unittest.SkipTest( + "not enough memory: %.1fG minimum needed" + % (size * memuse / (1024 ** 3))) + + if real_max_memuse and verbose: + print() + print(" ... expected peak memory use: {peak:.1f}G" + .format(peak=size * memuse / (1024 ** 3))) + watchdog = _MemoryWatchdog() + watchdog.start() + else: + watchdog = None + + try: + return f(self, maxsize) + finally: + if watchdog: + watchdog.stop() + + wrapper.size = size + wrapper.memuse = memuse + return wrapper + return decorator + +def bigaddrspacetest(f): + """Decorator for tests that fill the address space.""" + def wrapper(self): + if max_memuse < MAX_Py_ssize_t: + if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: + raise unittest.SkipTest( + "not enough memory: try a 32-bit build instead") + else: + raise unittest.SkipTest( + "not enough memory: %.1fG minimum needed" + % (MAX_Py_ssize_t / (1024 ** 3))) + else: + return f(self) + return wrapper + +#======================================================================= +# unittest integration. + +class BasicTestRunner: + def run(self, test): + result = unittest.TestResult() + test(result) + return result + +def _id(obj): + return obj + +def requires_resource(resource): + if resource == 'gui' and not _is_gui_available(): + return unittest.skip("resource 'gui' is not available") + if is_resource_enabled(resource): + return _id + else: + return unittest.skip("resource {0!r} is not enabled".format(resource)) + +def cpython_only(test): + """ + Decorator for tests only applicable on CPython. + """ + return impl_detail(cpython=True)(test) + +def impl_detail(msg=None, **guards): + if check_impl_detail(**guards): + return _id + if msg is None: + guardnames, default = _parse_guards(guards) + if default: + msg = "implementation detail not available on {0}" + else: + msg = "implementation detail specific to {0}" + guardnames = sorted(guardnames.keys()) + msg = msg.format(' or '.join(guardnames)) + return unittest.skip(msg) + +def _parse_guards(guards): + # Returns a tuple ({platform_name: run_me}, default_value) + if not guards: + return ({'cpython': True}, False) + is_true = list(guards.values())[0] + assert list(guards.values()) == [is_true] * len(guards) # all True or all False + return (guards, not is_true) + +# Use the following check to guard CPython's implementation-specific tests -- +# or to run them only on the implementation(s) guarded by the arguments. +def check_impl_detail(**guards): + """This function returns True or False depending on the host platform. + Examples: + if check_impl_detail(): # only on CPython (default) + if check_impl_detail(jython=True): # only on Jython + if check_impl_detail(cpython=False): # everywhere except on CPython + """ + guards, default = _parse_guards(guards) + return guards.get(platform.python_implementation().lower(), default) + + +def no_tracing(func): + """Decorator to temporarily turn off tracing for the duration of a test.""" + if not hasattr(sys, 'gettrace'): + return func + else: + @functools.wraps(func) + def wrapper(*args, **kwargs): + original_trace = sys.gettrace() + try: + sys.settrace(None) + return func(*args, **kwargs) + finally: + sys.settrace(original_trace) + return wrapper + + +def refcount_test(test): + """Decorator for tests which involve reference counting. + + To start, the decorator does not run the test if is not run by CPython. + After that, any trace function is unset during the test to prevent + unexpected refcounts caused by the trace function. + + """ + return no_tracing(cpython_only(test)) + + +def _filter_suite(suite, pred): + """Recursively filter test cases in a suite based on a predicate.""" + newtests = [] + for test in suite._tests: + if isinstance(test, unittest.TestSuite): + _filter_suite(test, pred) + newtests.append(test) + else: + if pred(test): + newtests.append(test) + suite._tests = newtests + +def _run_suite(suite): + """Run tests from a unittest.TestSuite-derived class.""" + if verbose: + runner = unittest.TextTestRunner(sys.stdout, verbosity=2, + failfast=failfast) + else: + runner = BasicTestRunner() + + result = runner.run(suite) + if not result.wasSuccessful(): + if len(result.errors) == 1 and not result.failures: + err = result.errors[0][1] + elif len(result.failures) == 1 and not result.errors: + err = result.failures[0][1] + else: + err = "multiple errors occurred" + if not verbose: err += "; run in verbose mode for details" + raise TestFailed(err) + + +def run_unittest(*classes): + """Run tests from unittest.TestCase-derived classes.""" + valid_types = (unittest.TestSuite, unittest.TestCase) + suite = unittest.TestSuite() + for cls in classes: + if isinstance(cls, str): + if cls in sys.modules: + suite.addTest(unittest.findTestCases(sys.modules[cls])) + else: + raise ValueError("str arguments must be keys in sys.modules") + elif isinstance(cls, valid_types): + suite.addTest(cls) + else: + suite.addTest(unittest.makeSuite(cls)) + def case_pred(test): + if match_tests is None: + return True + for name in test.id().split("."): + if fnmatch.fnmatchcase(name, match_tests): + return True + return False + _filter_suite(suite, case_pred) + _run_suite(suite) + +#======================================================================= +# Check for the presence of docstrings. + +HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or + sys.platform == 'win32' or + sysconfig.get_config_var('WITH_DOC_STRINGS')) + +requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, + "test requires docstrings") + + +#======================================================================= +# doctest driver. + +def run_doctest(module, verbosity=None, optionflags=0): + """Run doctest on the given module. Return (#failures, #tests). + + If optional argument verbosity is not specified (or is None), pass + support's belief about verbosity on to doctest. Else doctest's + usual behavior is used (it searches sys.argv for -v). + """ + + import doctest + + if verbosity is None: + verbosity = verbose + else: + verbosity = None + + f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) + if f: + raise TestFailed("%d of %d doctests failed" % (f, t)) + if verbose: + print('doctest (%s) ... %d tests with zero failures' % + (module.__name__, t)) + return f, t + + +#======================================================================= +# Support for saving and restoring the imported modules. + +def modules_setup(): + return sys.modules.copy(), + +def modules_cleanup(oldmodules): + # Encoders/decoders are registered permanently within the internal + # codec cache. If we destroy the corresponding modules their + # globals will be set to None which will trip up the cached functions. + encodings = [(k, v) for k, v in sys.modules.items() + if k.startswith('encodings.')] + sys.modules.clear() + sys.modules.update(encodings) + # XXX: This kind of problem can affect more than just encodings. In particular + # extension modules (such as _ssl) don't cope with reloading properly. + # Really, test modules should be cleaning out the test specific modules they + # know they added (ala test_runpy) rather than relying on this function (as + # test_importhooks and test_pkg do currently). + # Implicitly imported *real* modules should be left alone (see issue 10556). + sys.modules.update(oldmodules) + +#======================================================================= +# Threading support to prevent reporting refleaks when running regrtest.py -R + +# NOTE: we use thread._count() rather than threading.enumerate() (or the +# moral equivalent thereof) because a threading.Thread object is still alive +# until its __bootstrap() method has returned, even after it has been +# unregistered from the threading module. +# thread._count(), on the other hand, only gets decremented *after* the +# __bootstrap() method has returned, which gives us reliable reference counts +# at the end of a test run. + +def threading_setup(): + if _thread: + return _thread._count(), threading._dangling.copy() + else: + return 1, () + +def threading_cleanup(*original_values): + if not _thread: + return + _MAX_COUNT = 10 + for count in range(_MAX_COUNT): + values = _thread._count(), threading._dangling + if values == original_values: + break + time.sleep(0.1) + gc_collect() + # XXX print a warning in case of failure? + +def reap_threads(func): + """Use this function when threads are being used. This will + ensure that the threads are cleaned up even when the test fails. + If threading is unavailable this function does nothing. + """ + if not _thread: + return func + + @functools.wraps(func) + def decorator(*args): + key = threading_setup() + try: + return func(*args) + finally: + threading_cleanup(*key) + return decorator + +def reap_children(): + """Use this function at the end of test_main() whenever sub-processes + are started. This will help ensure that no extra children (zombies) + stick around to hog resources and create problems when looking + for refleaks. + """ + + # Reap all our dead child processes so we don't leave zombies around. + # These hog resources and might be causing some of the buildbots to die. + if hasattr(os, 'waitpid'): + any_process = -1 + while True: + try: + # This will raise an exception on Windows. That's ok. + pid, status = os.waitpid(any_process, os.WNOHANG) + if pid == 0: + break + except: + break + +@contextlib.contextmanager +def swap_attr(obj, attr, new_val): + """Temporary swap out an attribute with a new object. + + Usage: + with swap_attr(obj, "attr", 5): + ... + + This will set obj.attr to 5 for the duration of the with: block, + restoring the old value at the end of the block. If `attr` doesn't + exist on `obj`, it will be created and then deleted at the end of the + block. + """ + if hasattr(obj, attr): + real_val = getattr(obj, attr) + setattr(obj, attr, new_val) + try: + yield + finally: + setattr(obj, attr, real_val) + else: + setattr(obj, attr, new_val) + try: + yield + finally: + delattr(obj, attr) + +@contextlib.contextmanager +def swap_item(obj, item, new_val): + """Temporary swap out an item with a new object. + + Usage: + with swap_item(obj, "item", 5): + ... + + This will set obj["item"] to 5 for the duration of the with: block, + restoring the old value at the end of the block. If `item` doesn't + exist on `obj`, it will be created and then deleted at the end of the + block. + """ + if item in obj: + real_val = obj[item] + obj[item] = new_val + try: + yield + finally: + obj[item] = real_val + else: + obj[item] = new_val + try: + yield + finally: + del obj[item] + +def strip_python_stderr(stderr): + """Strip the stderr of a Python process from potential debug output + emitted by the interpreter. + + This will typically be run on the result of the communicate() method + of a subprocess.Popen object. + """ + stderr = re.sub(br"\[\d+ refs\]\r?\n?", b"", stderr).strip() + return stderr + +def args_from_interpreter_flags(): + """Return a list of command-line arguments reproducing the current + settings in sys.flags and sys.warnoptions.""" + return subprocess._args_from_interpreter_flags() + +#============================================================ +# Support for assertions about logging. +#============================================================ + +class TestHandler(logging.handlers.BufferingHandler): + def __init__(self, matcher): + # BufferingHandler takes a "capacity" argument + # so as to know when to flush. As we're overriding + # shouldFlush anyway, we can set a capacity of zero. + # You can call flush() manually to clear out the + # buffer. + logging.handlers.BufferingHandler.__init__(self, 0) + self.matcher = matcher + + def shouldFlush(self): + return False + + def emit(self, record): + self.format(record) + self.buffer.append(record.__dict__) + + def matches(self, **kwargs): + """ + Look for a saved dict whose keys/values match the supplied arguments. + """ + result = False + for d in self.buffer: + if self.matcher.matches(d, **kwargs): + result = True + break + return result + +class Matcher(object): + + _partial_matches = ('msg', 'message') + + def matches(self, d, **kwargs): + """ + Try to match a single dict with the supplied arguments. + + Keys whose values are strings and which are in self._partial_matches + will be checked for partial (i.e. substring) matches. You can extend + this scheme to (for example) do regular expression matching, etc. + """ + result = True + for k in kwargs: + v = kwargs[k] + dv = d.get(k) + if not self.match_value(k, dv, v): + result = False + break + return result + + def match_value(self, k, dv, v): + """ + Try to match a single stored value (dv) with a supplied value (v). + """ + if type(v) != type(dv): + result = False + elif type(dv) is not str or k not in self._partial_matches: + result = (v == dv) + else: + result = dv.find(v) >= 0 + return result + + +_can_symlink = None +def can_symlink(): + global _can_symlink + if _can_symlink is not None: + return _can_symlink + symlink_path = TESTFN + "can_symlink" + try: + os.symlink(TESTFN, symlink_path) + can = True + except (OSError, NotImplementedError, AttributeError): + can = False + else: + os.remove(symlink_path) + _can_symlink = can + return can + +def skip_unless_symlink(test): + """Skip decorator for tests that require functional symlink""" + ok = can_symlink() + msg = "Requires functional symlink implementation" + return test if ok else unittest.skip(msg)(test) + +_can_xattr = None +def can_xattr(): + global _can_xattr + if _can_xattr is not None: + return _can_xattr + if not hasattr(os, "setxattr"): + can = False + else: + tmp_fp, tmp_name = tempfile.mkstemp() + try: + with open(TESTFN, "wb") as fp: + try: + # TESTFN & tempfile may use different file systems with + # different capabilities + os.setxattr(tmp_fp, b"user.test", b"") + os.setxattr(fp.fileno(), b"user.test", b"") + # Kernels < 2.6.39 don't respect setxattr flags. + kernel_version = platform.release() + m = re.match("2.6.(\d{1,2})", kernel_version) + can = m is None or int(m.group(1)) >= 39 + except OSError: + can = False + finally: + unlink(TESTFN) + unlink(tmp_name) + _can_xattr = can + return can + +def skip_unless_xattr(test): + """Skip decorator for tests that require functional extended attributes""" + ok = can_xattr() + msg = "no non-broken extended attribute support" + return test if ok else unittest.skip(msg)(test) + + +if sys.platform.startswith('win'): + @contextlib.contextmanager + def suppress_crash_popup(): + """Disable Windows Error Reporting dialogs using SetErrorMode.""" + # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621%28v=vs.85%29.aspx + # GetErrorMode is not available on Windows XP and Windows Server 2003, + # but SetErrorMode returns the previous value, so we can use that + import ctypes + k32 = ctypes.windll.kernel32 + SEM_NOGPFAULTERRORBOX = 0x02 + old_error_mode = k32.SetErrorMode(SEM_NOGPFAULTERRORBOX) + k32.SetErrorMode(old_error_mode | SEM_NOGPFAULTERRORBOX) + try: + yield + finally: + k32.SetErrorMode(old_error_mode) +else: + # this is a no-op for other platforms + @contextlib.contextmanager + def suppress_crash_popup(): + yield + + +def patch(test_instance, object_to_patch, attr_name, new_value): + """Override 'object_to_patch'.'attr_name' with 'new_value'. + + Also, add a cleanup procedure to 'test_instance' to restore + 'object_to_patch' value for 'attr_name'. + The 'attr_name' should be a valid attribute for 'object_to_patch'. + + """ + # check that 'attr_name' is a real attribute for 'object_to_patch' + # will raise AttributeError if it does not exist + getattr(object_to_patch, attr_name) + + # keep a copy of the old value + attr_is_local = False + try: + old_value = object_to_patch.__dict__[attr_name] + except (AttributeError, KeyError): + old_value = getattr(object_to_patch, attr_name, None) + else: + attr_is_local = True + + # restore the value when the test is done + def cleanup(): + if attr_is_local: + setattr(object_to_patch, attr_name, old_value) + else: + delattr(object_to_patch, attr_name) + + test_instance.addCleanup(cleanup) + + # actually override the attribute + setattr(object_to_patch, attr_name, new_value) diff --git a/Lib/test/test_linecache.py b/Lib/test/test_linecache.py index 7d14534..5fe0554 100644 --- a/Lib/test/test_linecache.py +++ b/Lib/test/test_linecache.py @@ -11,7 +11,7 @@ INVALID_NAME = '!@$)(!@#_1' EMPTY = '' TESTS = 'inspect_fodder inspect_fodder2 mapping_tests' TESTS = TESTS.split() -TEST_PATH = os.path.dirname(support.__file__) +TEST_PATH = os.path.dirname(__file__) MODULES = "linecache abc".split() MODULE_PATH = os.path.dirname(FILENAME) diff --git a/Misc/ACKS b/Misc/ACKS index 288f186..aaa8dd4 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -1202,6 +1202,7 @@ Thenault Sylvain Péter Szabó Amir Szekely Arfrever Frehtes Taifersar Arahesis +Indra Talip Neil Tallim Geoff Talvola Musashi Tamura diff --git a/Misc/NEWS b/Misc/NEWS index ff6fe06..853e738 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -222,6 +222,9 @@ IDLE Tests ----- +- Issue #15494: test.support is now a package rather than a module (Initial + patch by Indra Talip) + - Issue #17944: test_zipfile now discoverable and uses subclassing to generate tests for different compression types. Fixed a bug with skipping some tests due to use of exhausted iterators. -- cgit v0.12