diff options
Diffstat (limited to 'Lib/test')
33 files changed, 3546 insertions, 2042 deletions
diff --git a/Lib/test/datetimetester.py b/Lib/test/datetimetester.py index 63c3ae8..11deffc 100644 --- a/Lib/test/datetimetester.py +++ b/Lib/test/datetimetester.py @@ -281,7 +281,8 @@ class TestTimeZone(unittest.TestCase): with self.assertRaises(TypeError): self.EST.dst(5) def test_tzname(self): - self.assertEqual('UTC+00:00', timezone(ZERO).tzname(None)) + self.assertEqual('UTC', timezone.utc.tzname(None)) + self.assertEqual('UTC', timezone(ZERO).tzname(None)) self.assertEqual('UTC-05:00', timezone(-5 * HOUR).tzname(None)) self.assertEqual('UTC+09:30', timezone(9.5 * HOUR).tzname(None)) self.assertEqual('UTC-00:01', timezone(timedelta(minutes=-1)).tzname(None)) diff --git a/Lib/test/libregrtest/__init__.py b/Lib/test/libregrtest/__init__.py new file mode 100644 index 0000000..9f7b1c1 --- /dev/null +++ b/Lib/test/libregrtest/__init__.py @@ -0,0 +1,2 @@ +from test.libregrtest.cmdline import _parse_args, RESOURCE_NAMES +from test.libregrtest.main import main, main_in_temp_cwd diff --git a/Lib/test/libregrtest/cmdline.py b/Lib/test/libregrtest/cmdline.py new file mode 100644 index 0000000..19c52a2 --- /dev/null +++ b/Lib/test/libregrtest/cmdline.py @@ -0,0 +1,340 @@ +import argparse +import faulthandler +import os +from test import support + + +USAGE = """\ +python -m test [options] [test_name1 [test_name2 ...]] +python path/to/Lib/test/regrtest.py [options] [test_name1 [test_name2 ...]] +""" + +DESCRIPTION = """\ +Run Python regression tests. + +If no arguments or options are provided, finds all files matching +the pattern "test_*" in the Lib/test subdirectory and runs +them in alphabetical order (but see -M and -u, below, for exceptions). + +For more rigorous testing, it is useful to use the following +command line: + +python -E -Wd -m test [options] [test_name1 ...] +""" + +EPILOG = """\ +Additional option details: + +-r randomizes test execution order. You can use --randseed=int to provide a +int seed value for the randomizer; this is useful for reproducing troublesome +test orders. + +-s On the first invocation of regrtest using -s, the first test file found +or the first test file given on the command line is run, and the name of +the next test is recorded in a file named pynexttest. If run from the +Python build directory, pynexttest is located in the 'build' subdirectory, +otherwise it is located in tempfile.gettempdir(). On subsequent runs, +the test in pynexttest is run, and the next test is written to pynexttest. +When the last test has been run, pynexttest is deleted. In this way it +is possible to single step through the test files. This is useful when +doing memory analysis on the Python interpreter, which process tends to +consume too many resources to run the full regression test non-stop. + +-S is used to continue running tests after an aborted run. It will +maintain the order a standard run (ie, this assumes -r is not used). +This is useful after the tests have prematurely stopped for some external +reason and you want to start running from where you left off rather +than starting from the beginning. + +-f reads the names of tests from the file given as f's argument, one +or more test names per line. Whitespace is ignored. Blank lines and +lines beginning with '#' are ignored. This is especially useful for +whittling down failures involving interactions among tests. + +-L causes the leaks(1) command to be run just before exit if it exists. +leaks(1) is available on Mac OS X and presumably on some other +FreeBSD-derived systems. + +-R runs each test several times and examines sys.gettotalrefcount() to +see if the test appears to be leaking references. The argument should +be of the form stab:run:fname where 'stab' is the number of times the +test is run to let gettotalrefcount settle down, 'run' is the number +of times further it is run and 'fname' is the name of the file the +reports are written to. These parameters all have defaults (5, 4 and +"reflog.txt" respectively), and the minimal invocation is '-R :'. + +-M runs tests that require an exorbitant amount of memory. These tests +typically try to ascertain containers keep working when containing more than +2 billion objects, which only works on 64-bit systems. There are also some +tests that try to exhaust the address space of the process, which only makes +sense on 32-bit systems with at least 2Gb of memory. The passed-in memlimit, +which is a string in the form of '2.5Gb', determines howmuch memory the +tests will limit themselves to (but they may go slightly over.) The number +shouldn't be more memory than the machine has (including swap memory). You +should also keep in mind that swap memory is generally much, much slower +than RAM, and setting memlimit to all available RAM or higher will heavily +tax the machine. On the other hand, it is no use running these tests with a +limit of less than 2.5Gb, and many require more than 20Gb. Tests that expect +to use more than memlimit memory will be skipped. The big-memory tests +generally run very, very long. + +-u is used to specify which special resource intensive tests to run, +such as those requiring large file support or network connectivity. +The argument is a comma-separated list of words indicating the +resources to test. Currently only the following are defined: + + all - Enable all special resources. + + none - Disable all special resources (this is the default). + + audio - Tests that use the audio device. (There are known + cases of broken audio drivers that can crash Python or + even the Linux kernel.) + + curses - Tests that use curses and will modify the terminal's + state and output modes. + + largefile - It is okay to run some test that may create huge + files. These tests can take a long time and may + consume >2GB of disk space temporarily. + + network - It is okay to run tests that use external network + resource, e.g. testing SSL support for sockets. + + decimal - Test the decimal module against a large suite that + verifies compliance with standards. + + cpu - Used for certain CPU-heavy tests. + + subprocess Run all tests for the subprocess module. + + urlfetch - It is okay to download files required on testing. + + gui - Run tests that require a running GUI. + +To enable all resources except one, use '-uall,-<resource>'. For +example, to run all the tests except for the gui tests, give the +option '-uall,-gui'. +""" + + +RESOURCE_NAMES = ('audio', 'curses', 'largefile', 'network', + 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui') + +class _ArgParser(argparse.ArgumentParser): + + def error(self, message): + super().error(message + "\nPass -h or --help for complete help.") + + +def _create_parser(): + # Set prog to prevent the uninformative "__main__.py" from displaying in + # error messages when using "python -m test ...". + parser = _ArgParser(prog='regrtest.py', + usage=USAGE, + description=DESCRIPTION, + epilog=EPILOG, + add_help=False, + formatter_class=argparse.RawDescriptionHelpFormatter) + + # Arguments with this clause added to its help are described further in + # the epilog's "Additional option details" section. + more_details = ' See the section at bottom for more details.' + + group = parser.add_argument_group('General options') + # We add help explicitly to control what argument group it renders under. + group.add_argument('-h', '--help', action='help', + help='show this help message and exit') + group.add_argument('--timeout', metavar='TIMEOUT', type=float, + help='dump the traceback and exit if a test takes ' + 'more than TIMEOUT seconds; disabled if TIMEOUT ' + 'is negative or equals to zero') + group.add_argument('--wait', action='store_true', + help='wait for user input, e.g., allow a debugger ' + 'to be attached') + group.add_argument('--slaveargs', metavar='ARGS') + group.add_argument('-S', '--start', metavar='START', + help='the name of the test at which to start.' + + more_details) + + group = parser.add_argument_group('Verbosity') + group.add_argument('-v', '--verbose', action='count', + help='run tests in verbose mode with output to stdout') + group.add_argument('-w', '--verbose2', action='store_true', + help='re-run failed tests in verbose mode') + group.add_argument('-W', '--verbose3', action='store_true', + help='display test output on failure') + group.add_argument('-q', '--quiet', action='store_true', + help='no output unless one or more tests fail') + group.add_argument('-o', '--slow', action='store_true', dest='print_slow', + help='print the slowest 10 tests') + group.add_argument('--header', action='store_true', + help='print header with interpreter info') + + group = parser.add_argument_group('Selecting tests') + group.add_argument('-r', '--randomize', action='store_true', + help='randomize test execution order.' + more_details) + group.add_argument('--randseed', metavar='SEED', + dest='random_seed', type=int, + help='pass a random seed to reproduce a previous ' + 'random run') + group.add_argument('-f', '--fromfile', metavar='FILE', + help='read names of tests to run from a file.' + + more_details) + group.add_argument('-x', '--exclude', action='store_true', + help='arguments are tests to *exclude*') + group.add_argument('-s', '--single', action='store_true', + help='single step through a set of tests.' + + more_details) + group.add_argument('-m', '--match', metavar='PAT', + dest='match_tests', + help='match test cases and methods with glob pattern PAT') + group.add_argument('-G', '--failfast', action='store_true', + help='fail as soon as a test fails (only with -v or -W)') + group.add_argument('-u', '--use', metavar='RES1,RES2,...', + action='append', type=resources_list, + help='specify which special resource intensive tests ' + 'to run.' + more_details) + group.add_argument('-M', '--memlimit', metavar='LIMIT', + help='run very large memory-consuming tests.' + + more_details) + group.add_argument('--testdir', metavar='DIR', + type=relative_filename, + help='execute test files in the specified directory ' + '(instead of the Python stdlib test suite)') + + group = parser.add_argument_group('Special runs') + group.add_argument('-l', '--findleaks', action='store_true', + help='if GC is available detect tests that leak memory') + group.add_argument('-L', '--runleaks', action='store_true', + help='run the leaks(1) command just before exit.' + + more_details) + group.add_argument('-R', '--huntrleaks', metavar='RUNCOUNTS', + type=huntrleaks, + help='search for reference leaks (needs debug build, ' + 'very slow).' + more_details) + group.add_argument('-j', '--multiprocess', metavar='PROCESSES', + dest='use_mp', type=int, + help='run PROCESSES processes at once') + group.add_argument('-T', '--coverage', action='store_true', + dest='trace', + help='turn on code coverage tracing using the trace ' + 'module') + group.add_argument('-D', '--coverdir', metavar='DIR', + type=relative_filename, + help='directory where coverage files are put') + group.add_argument('-N', '--nocoverdir', + action='store_const', const=None, dest='coverdir', + help='put coverage files alongside modules') + group.add_argument('-t', '--threshold', metavar='THRESHOLD', + type=int, + help='call gc.set_threshold(THRESHOLD)') + group.add_argument('-n', '--nowindows', action='store_true', + help='suppress error message boxes on Windows') + group.add_argument('-F', '--forever', action='store_true', + help='run the specified tests in a loop, until an ' + 'error happens') + + parser.add_argument('args', nargs=argparse.REMAINDER, + help=argparse.SUPPRESS) + + return parser + + +def relative_filename(string): + # CWD is replaced with a temporary dir before calling main(), so we + # join it with the saved CWD so it ends up where the user expects. + return os.path.join(support.SAVEDCWD, string) + + +def huntrleaks(string): + args = string.split(':') + if len(args) not in (2, 3): + raise argparse.ArgumentTypeError( + 'needs 2 or 3 colon-separated arguments') + nwarmup = int(args[0]) if args[0] else 5 + ntracked = int(args[1]) if args[1] else 4 + fname = args[2] if len(args) > 2 and args[2] else 'reflog.txt' + return nwarmup, ntracked, fname + + +def resources_list(string): + u = [x.lower() for x in string.split(',')] + for r in u: + if r == 'all' or r == 'none': + continue + if r[0] == '-': + r = r[1:] + if r not in RESOURCE_NAMES: + raise argparse.ArgumentTypeError('invalid resource: ' + r) + return u + + +def _parse_args(args, **kwargs): + # Defaults + ns = argparse.Namespace(testdir=None, verbose=0, quiet=False, + exclude=False, single=False, randomize=False, fromfile=None, + findleaks=False, use_resources=None, trace=False, coverdir='coverage', + runleaks=False, huntrleaks=False, verbose2=False, print_slow=False, + random_seed=None, use_mp=None, verbose3=False, forever=False, + header=False, failfast=False, match_tests=None) + for k, v in kwargs.items(): + if not hasattr(ns, k): + raise TypeError('%r is an invalid keyword argument ' + 'for this function' % k) + setattr(ns, k, v) + if ns.use_resources is None: + ns.use_resources = [] + + parser = _create_parser() + parser.parse_args(args=args, namespace=ns) + + if ns.single and ns.fromfile: + parser.error("-s and -f don't go together!") + if ns.use_mp and ns.trace: + parser.error("-T and -j don't go together!") + if ns.use_mp and ns.findleaks: + parser.error("-l and -j don't go together!") + if ns.use_mp and ns.memlimit: + parser.error("-M and -j don't go together!") + if ns.failfast and not (ns.verbose or ns.verbose3): + parser.error("-G/--failfast needs either -v or -W") + + if ns.quiet: + ns.verbose = 0 + if ns.timeout is not None: + if hasattr(faulthandler, 'dump_traceback_later'): + if ns.timeout <= 0: + ns.timeout = None + else: + print("Warning: The timeout option requires " + "faulthandler.dump_traceback_later") + ns.timeout = None + if ns.use_mp is not None: + if ns.use_mp <= 0: + # Use all cores + extras for tests that like to sleep + ns.use_mp = 2 + (os.cpu_count() or 1) + if ns.use_mp == 1: + ns.use_mp = None + if ns.use: + for a in ns.use: + for r in a: + if r == 'all': + ns.use_resources[:] = RESOURCE_NAMES + continue + if r == 'none': + del ns.use_resources[:] + continue + remove = False + if r[0] == '-': + remove = True + r = r[1:] + if remove: + if r in ns.use_resources: + ns.use_resources.remove(r) + elif r not in ns.use_resources: + ns.use_resources.append(r) + if ns.random_seed is not None: + ns.randomize = True + + return ns diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py new file mode 100644 index 0000000..5d34cff --- /dev/null +++ b/Lib/test/libregrtest/main.py @@ -0,0 +1,547 @@ +import faulthandler +import json +import os +import re +import sys +import tempfile +import sysconfig +import signal +import random +import platform +import traceback +import unittest +from test.libregrtest.runtest import ( + findtests, runtest, run_test_in_subprocess, + STDTESTS, NOTTESTS, + PASSED, FAILED, ENV_CHANGED, SKIPPED, + RESOURCE_DENIED, INTERRUPTED, CHILD_ERROR) +from test.libregrtest.refleak import warm_caches +from test.libregrtest.cmdline import _parse_args +from test import support +try: + import threading +except ImportError: + threading = None + + +# Some times __path__ and __file__ are not absolute (e.g. while running from +# Lib/) and, if we change the CWD to run the tests in a temporary dir, some +# imports might fail. This affects only the modules imported before os.chdir(). +# These modules are searched first in sys.path[0] (so '' -- the CWD) and if +# they are found in the CWD their __file__ and __path__ will be relative (this +# happens before the chdir). All the modules imported after the chdir, are +# not found in the CWD, and since the other paths in sys.path[1:] are absolute +# (site.py absolutize them), the __file__ and __path__ will be absolute too. +# Therefore it is necessary to absolutize manually the __file__ and __path__ of +# the packages to prevent later imports to fail when the CWD is different. +for module in sys.modules.values(): + if hasattr(module, '__path__'): + module.__path__ = [os.path.abspath(path) for path in module.__path__] + if hasattr(module, '__file__'): + module.__file__ = os.path.abspath(module.__file__) + + +# MacOSX (a.k.a. Darwin) has a default stack size that is too small +# for deeply recursive regular expressions. We see this as crashes in +# the Python test suite when running test_re.py and test_sre.py. The +# fix is to set the stack limit to 2048. +# This approach may also be useful for other Unixy platforms that +# suffer from small default stack limits. +if sys.platform == 'darwin': + try: + import resource + except ImportError: + pass + else: + soft, hard = resource.getrlimit(resource.RLIMIT_STACK) + newsoft = min(hard, max(soft, 1024*2048)) + resource.setrlimit(resource.RLIMIT_STACK, (newsoft, hard)) + + +# When tests are run from the Python build directory, it is best practice +# to keep the test files in a subfolder. This eases the cleanup of leftover +# files using the "make distclean" command. +if sysconfig.is_python_build(): + TEMPDIR = os.path.join(sysconfig.get_config_var('srcdir'), 'build') +else: + TEMPDIR = tempfile.gettempdir() +TEMPDIR = os.path.abspath(TEMPDIR) + + +def main(tests=None, **kwargs): + """Execute a test suite. + + This also parses command-line options and modifies its behavior + accordingly. + + tests -- a list of strings containing test names (optional) + testdir -- the directory in which to look for tests (optional) + + Users other than the Python test suite will certainly want to + specify testdir; if it's omitted, the directory containing the + Python test suite is searched for. + + If the tests argument is omitted, the tests listed on the + command-line will be used. If that's empty, too, then all *.py + files beginning with test_ will be used. + + The other default arguments (verbose, quiet, exclude, + single, randomize, findleaks, use_resources, trace, coverdir, + print_slow, and random_seed) allow programmers calling main() + directly to set the values that would normally be set by flags + on the command line. + """ + # Display the Python traceback on fatal errors (e.g. segfault) + faulthandler.enable(all_threads=True) + + # Display the Python traceback on SIGALRM or SIGUSR1 signal + signals = [] + if hasattr(signal, 'SIGALRM'): + signals.append(signal.SIGALRM) + if hasattr(signal, 'SIGUSR1'): + signals.append(signal.SIGUSR1) + for signum in signals: + faulthandler.register(signum, chain=True) + + replace_stdout() + + support.record_original_stdout(sys.stdout) + + ns = _parse_args(sys.argv[1:], **kwargs) + + if ns.huntrleaks: + # Avoid false positives due to various caches + # filling slowly with random data: + warm_caches() + if ns.memlimit is not None: + support.set_memlimit(ns.memlimit) + if ns.threshold is not None: + import gc + gc.set_threshold(ns.threshold) + if ns.nowindows: + import msvcrt + msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS| + msvcrt.SEM_NOALIGNMENTFAULTEXCEPT| + msvcrt.SEM_NOGPFAULTERRORBOX| + msvcrt.SEM_NOOPENFILEERRORBOX) + try: + msvcrt.CrtSetReportMode + except AttributeError: + # release build + pass + else: + for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]: + msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE) + msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR) + if ns.wait: + input("Press any key to continue...") + + if ns.slaveargs is not None: + args, kwargs = json.loads(ns.slaveargs) + if kwargs.get('huntrleaks'): + unittest.BaseTestSuite._cleanup = False + try: + result = runtest(*args, **kwargs) + except KeyboardInterrupt: + result = INTERRUPTED, '' + except BaseException as e: + traceback.print_exc() + result = CHILD_ERROR, str(e) + sys.stdout.flush() + print() # Force a newline (just in case) + print(json.dumps(result)) + sys.exit(0) + + good = [] + bad = [] + skipped = [] + resource_denieds = [] + environment_changed = [] + interrupted = False + + if ns.findleaks: + try: + import gc + except ImportError: + print('No GC available, disabling findleaks.') + ns.findleaks = False + else: + # Uncomment the line below to report garbage that is not + # freeable by reference counting alone. By default only + # garbage that is not collectable by the GC is reported. + #gc.set_debug(gc.DEBUG_SAVEALL) + found_garbage = [] + + if ns.huntrleaks: + unittest.BaseTestSuite._cleanup = False + + if ns.single: + filename = os.path.join(TEMPDIR, 'pynexttest') + try: + with open(filename, 'r') as fp: + next_test = fp.read().strip() + tests = [next_test] + except OSError: + pass + + if ns.fromfile: + tests = [] + with open(os.path.join(support.SAVEDCWD, ns.fromfile)) as fp: + count_pat = re.compile(r'\[\s*\d+/\s*\d+\]') + for line in fp: + line = count_pat.sub('', line) + guts = line.split() # assuming no test has whitespace in its name + if guts and not guts[0].startswith('#'): + tests.extend(guts) + + # Strip .py extensions. + removepy(ns.args) + removepy(tests) + + stdtests = STDTESTS[:] + nottests = NOTTESTS.copy() + if ns.exclude: + for arg in ns.args: + if arg in stdtests: + stdtests.remove(arg) + nottests.add(arg) + ns.args = [] + + # For a partial run, we do not need to clutter the output. + if ns.verbose or ns.header or not (ns.quiet or ns.single or tests or ns.args): + # Print basic platform information + print("==", platform.python_implementation(), *sys.version.split()) + print("== ", platform.platform(aliased=True), + "%s-endian" % sys.byteorder) + print("== ", "hash algorithm:", sys.hash_info.algorithm, + "64bit" if sys.maxsize > 2**32 else "32bit") + print("== ", os.getcwd()) + print("Testing with flags:", sys.flags) + + # if testdir is set, then we are not running the python tests suite, so + # don't add default tests to be executed or skipped (pass empty values) + if ns.testdir: + alltests = findtests(ns.testdir, list(), set()) + else: + alltests = findtests(ns.testdir, stdtests, nottests) + + selected = tests or ns.args or alltests + if ns.single: + selected = selected[:1] + try: + next_single_test = alltests[alltests.index(selected[0])+1] + except IndexError: + next_single_test = None + # Remove all the selected tests that precede start if it's set. + if ns.start: + try: + del selected[:selected.index(ns.start)] + except ValueError: + print("Couldn't find starting test (%s), using all tests" % ns.start) + if ns.randomize: + if ns.random_seed is None: + ns.random_seed = random.randrange(10000000) + random.seed(ns.random_seed) + print("Using random seed", ns.random_seed) + random.shuffle(selected) + if ns.trace: + import trace, tempfile + tracer = trace.Trace(ignoredirs=[sys.base_prefix, sys.base_exec_prefix, + tempfile.gettempdir()], + trace=False, count=True) + + test_times = [] + support.verbose = ns.verbose # Tell tests to be moderately quiet + support.use_resources = ns.use_resources + save_modules = sys.modules.keys() + + def accumulate_result(test, result): + ok, test_time = result + test_times.append((test_time, test)) + if ok == PASSED: + good.append(test) + elif ok == FAILED: + bad.append(test) + elif ok == ENV_CHANGED: + environment_changed.append(test) + elif ok == SKIPPED: + skipped.append(test) + elif ok == RESOURCE_DENIED: + skipped.append(test) + resource_denieds.append(test) + + if ns.forever: + def test_forever(tests=list(selected)): + while True: + for test in tests: + yield test + if bad: + return + tests = test_forever() + test_count = '' + test_count_width = 3 + else: + tests = iter(selected) + test_count = '/{}'.format(len(selected)) + test_count_width = len(test_count) - 1 + + if ns.use_mp: + try: + from threading import Thread + except ImportError: + print("Multiprocess option requires thread support") + sys.exit(2) + from queue import Queue + debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$") + output = Queue() + pending = MultiprocessTests(tests) + def work(): + # A worker thread. + try: + while True: + try: + test = next(pending) + except StopIteration: + output.put((None, None, None, None)) + return + retcode, stdout, stderr = run_test_in_subprocess(test, ns) + # Strip last refcount output line if it exists, since it + # comes from the shutdown of the interpreter in the subcommand. + stderr = debug_output_pat.sub("", stderr) + stdout, _, result = stdout.strip().rpartition("\n") + if retcode != 0: + result = (CHILD_ERROR, "Exit code %s" % retcode) + output.put((test, stdout.rstrip(), stderr.rstrip(), result)) + return + if not result: + output.put((None, None, None, None)) + return + result = json.loads(result) + output.put((test, stdout.rstrip(), stderr.rstrip(), result)) + except BaseException: + output.put((None, None, None, None)) + raise + workers = [Thread(target=work) for i in range(ns.use_mp)] + for worker in workers: + worker.start() + finished = 0 + test_index = 1 + try: + while finished < ns.use_mp: + test, stdout, stderr, result = output.get() + if test is None: + finished += 1 + continue + accumulate_result(test, result) + if not ns.quiet: + fmt = "[{1:{0}}{2}/{3}] {4}" if bad else "[{1:{0}}{2}] {4}" + print(fmt.format( + test_count_width, test_index, test_count, + len(bad), test)) + if stdout: + print(stdout) + if stderr: + print(stderr, file=sys.stderr) + sys.stdout.flush() + sys.stderr.flush() + if result[0] == INTERRUPTED: + raise KeyboardInterrupt + if result[0] == CHILD_ERROR: + raise Exception("Child error on {}: {}".format(test, result[1])) + test_index += 1 + except KeyboardInterrupt: + interrupted = True + pending.interrupted = True + for worker in workers: + worker.join() + else: + for test_index, test in enumerate(tests, 1): + if not ns.quiet: + fmt = "[{1:{0}}{2}/{3}] {4}" if bad else "[{1:{0}}{2}] {4}" + print(fmt.format( + test_count_width, test_index, test_count, len(bad), test)) + sys.stdout.flush() + if ns.trace: + # If we're tracing code coverage, then we don't exit with status + # if on a false return value from main. + tracer.runctx('runtest(test, ns.verbose, ns.quiet, timeout=ns.timeout)', + globals=globals(), locals=vars()) + else: + try: + result = runtest(test, ns.verbose, ns.quiet, + ns.huntrleaks, + output_on_failure=ns.verbose3, + timeout=ns.timeout, failfast=ns.failfast, + match_tests=ns.match_tests) + accumulate_result(test, result) + except KeyboardInterrupt: + interrupted = True + break + if ns.findleaks: + gc.collect() + if gc.garbage: + print("Warning: test created", len(gc.garbage), end=' ') + print("uncollectable object(s).") + # move the uncollectable objects somewhere so we don't see + # them again + found_garbage.extend(gc.garbage) + del gc.garbage[:] + # Unload the newly imported modules (best effort finalization) + for module in sys.modules.keys(): + if module not in save_modules and module.startswith("test."): + support.unload(module) + + if interrupted: + # print a newline after ^C + print() + print("Test suite interrupted by signal SIGINT.") + omitted = set(selected) - set(good) - set(bad) - set(skipped) + print(count(len(omitted), "test"), "omitted:") + printlist(omitted) + if good and not ns.quiet: + if not bad and not skipped and not interrupted and len(good) > 1: + print("All", end=' ') + print(count(len(good), "test"), "OK.") + if ns.print_slow: + test_times.sort(reverse=True) + print("10 slowest tests:") + for time, test in test_times[:10]: + print("%s: %.1fs" % (test, time)) + if bad: + print(count(len(bad), "test"), "failed:") + printlist(bad) + if environment_changed: + print("{} altered the execution environment:".format( + count(len(environment_changed), "test"))) + printlist(environment_changed) + if skipped and not ns.quiet: + print(count(len(skipped), "test"), "skipped:") + printlist(skipped) + + if ns.verbose2 and bad: + print("Re-running failed tests in verbose mode") + for test in bad[:]: + print("Re-running test %r in verbose mode" % test) + sys.stdout.flush() + try: + ns.verbose = True + ok = runtest(test, True, ns.quiet, ns.huntrleaks, + timeout=ns.timeout) + except KeyboardInterrupt: + # print a newline separate from the ^C + print() + break + else: + if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}: + bad.remove(test) + else: + if bad: + print(count(len(bad), 'test'), "failed again:") + printlist(bad) + + if ns.single: + if next_single_test: + with open(filename, 'w') as fp: + fp.write(next_single_test + '\n') + else: + os.unlink(filename) + + if ns.trace: + r = tracer.results() + r.write_results(show_missing=True, summary=True, coverdir=ns.coverdir) + + if ns.runleaks: + os.system("leaks %d" % os.getpid()) + + sys.exit(len(bad) > 0 or interrupted) + + +# We do not use a generator so multiple threads can call next(). +class MultiprocessTests(object): + + """A thread-safe iterator over tests for multiprocess mode.""" + + def __init__(self, tests): + self.interrupted = False + self.lock = threading.Lock() + self.tests = tests + + def __iter__(self): + return self + + def __next__(self): + with self.lock: + if self.interrupted: + raise StopIteration('tests interrupted') + return next(self.tests) + + +def replace_stdout(): + """Set stdout encoder error handler to backslashreplace (as stderr error + handler) to avoid UnicodeEncodeError when printing a traceback""" + import atexit + + stdout = sys.stdout + sys.stdout = open(stdout.fileno(), 'w', + encoding=stdout.encoding, + errors="backslashreplace", + closefd=False, + newline='\n') + + def restore_stdout(): + sys.stdout.close() + sys.stdout = stdout + atexit.register(restore_stdout) + + +def removepy(names): + if not names: + return + for idx, name in enumerate(names): + basename, ext = os.path.splitext(name) + if ext == '.py': + names[idx] = basename + + +def count(n, word): + if n == 1: + return "%d %s" % (n, word) + else: + return "%d %ss" % (n, word) + + +def printlist(x, width=70, indent=4): + """Print the elements of iterable x to stdout. + + Optional arg width (default 70) is the maximum line length. + Optional arg indent (default 4) is the number of blanks with which to + begin each line. + """ + + from textwrap import fill + blanks = ' ' * indent + # Print the sorted list: 'x' may be a '--random' list or a set() + print(fill(' '.join(str(elt) for elt in sorted(x)), width, + initial_indent=blanks, subsequent_indent=blanks)) + + +def main_in_temp_cwd(): + """Run main() in a temporary working directory.""" + if sysconfig.is_python_build(): + try: + os.mkdir(TEMPDIR) + except FileExistsError: + pass + + # Define a writable temp dir that will be used as cwd while running + # the tests. The name of the dir includes the pid to allow parallel + # testing (see the -j option). + test_cwd = 'test_python_{}'.format(os.getpid()) + test_cwd = os.path.join(TEMPDIR, test_cwd) + + # Run the tests in a context manager that temporarily changes the CWD to a + # temporary and writable directory. If it's not possible to create or + # change the CWD, the original CWD will be used. The original CWD is + # available from support.SAVEDCWD. + with support.temp_cwd(test_cwd, quiet=True): + main() diff --git a/Lib/test/libregrtest/refleak.py b/Lib/test/libregrtest/refleak.py new file mode 100644 index 0000000..dd0d05d --- /dev/null +++ b/Lib/test/libregrtest/refleak.py @@ -0,0 +1,165 @@ +import os +import re +import sys +import warnings +from inspect import isabstract +from test import support + + +def dash_R(the_module, test, indirect_test, huntrleaks): + """Run a test multiple times, looking for reference leaks. + + Returns: + False if the test didn't leak references; True if we detected refleaks. + """ + # This code is hackish and inelegant, but it seems to do the job. + import copyreg + import collections.abc + + if not hasattr(sys, 'gettotalrefcount'): + raise Exception("Tracking reference leaks requires a debug build " + "of Python") + + # Save current values for dash_R_cleanup() to restore. + fs = warnings.filters[:] + ps = copyreg.dispatch_table.copy() + pic = sys.path_importer_cache.copy() + try: + import zipimport + except ImportError: + zdc = None # Run unmodified on platforms without zipimport support + else: + zdc = zipimport._zip_directory_cache.copy() + abcs = {} + for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]: + if not isabstract(abc): + continue + for obj in abc.__subclasses__() + [abc]: + abcs[obj] = obj._abc_registry.copy() + + nwarmup, ntracked, fname = huntrleaks + fname = os.path.join(support.SAVEDCWD, fname) + repcount = nwarmup + ntracked + rc_deltas = [0] * repcount + alloc_deltas = [0] * repcount + + print("beginning", repcount, "repetitions", file=sys.stderr) + print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr) + sys.stderr.flush() + for i in range(repcount): + indirect_test() + alloc_after, rc_after = dash_R_cleanup(fs, ps, pic, zdc, abcs) + sys.stderr.write('.') + sys.stderr.flush() + if i >= nwarmup: + rc_deltas[i] = rc_after - rc_before + alloc_deltas[i] = alloc_after - alloc_before + alloc_before, rc_before = alloc_after, rc_after + print(file=sys.stderr) + # These checkers return False on success, True on failure + def check_rc_deltas(deltas): + return any(deltas) + def check_alloc_deltas(deltas): + # At least 1/3rd of 0s + if 3 * deltas.count(0) < len(deltas): + return True + # Nothing else than 1s, 0s and -1s + if not set(deltas) <= {1,0,-1}: + return True + return False + failed = False + for deltas, item_name, checker in [ + (rc_deltas, 'references', check_rc_deltas), + (alloc_deltas, 'memory blocks', check_alloc_deltas)]: + if checker(deltas): + msg = '%s leaked %s %s, sum=%s' % ( + test, deltas[nwarmup:], item_name, sum(deltas)) + print(msg, file=sys.stderr) + sys.stderr.flush() + with open(fname, "a") as refrep: + print(msg, file=refrep) + refrep.flush() + failed = True + return failed + + +def dash_R_cleanup(fs, ps, pic, zdc, abcs): + import gc, copyreg + import _strptime, linecache + import urllib.parse, urllib.request, mimetypes, doctest + import struct, filecmp, collections.abc + from distutils.dir_util import _path_created + from weakref import WeakSet + + # Clear the warnings registry, so they can be displayed again + for mod in sys.modules.values(): + if hasattr(mod, '__warningregistry__'): + del mod.__warningregistry__ + + # Restore some original values. + warnings.filters[:] = fs + copyreg.dispatch_table.clear() + copyreg.dispatch_table.update(ps) + sys.path_importer_cache.clear() + sys.path_importer_cache.update(pic) + try: + import zipimport + except ImportError: + pass # Run unmodified on platforms without zipimport support + else: + zipimport._zip_directory_cache.clear() + zipimport._zip_directory_cache.update(zdc) + + # clear type cache + sys._clear_type_cache() + + # Clear ABC registries, restoring previously saved ABC registries. + for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]: + if not isabstract(abc): + continue + for obj in abc.__subclasses__() + [abc]: + obj._abc_registry = abcs.get(obj, WeakSet()).copy() + obj._abc_cache.clear() + obj._abc_negative_cache.clear() + + # Flush standard output, so that buffered data is sent to the OS and + # associated Python objects are reclaimed. + for stream in (sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__): + if stream is not None: + stream.flush() + + # Clear assorted module caches. + _path_created.clear() + re.purge() + _strptime._regex_cache.clear() + urllib.parse.clear_cache() + urllib.request.urlcleanup() + linecache.clearcache() + mimetypes._default_mime_types() + filecmp._cache.clear() + struct._clearcache() + doctest.master = None + try: + import ctypes + except ImportError: + # Don't worry about resetting the cache if ctypes is not supported + pass + else: + ctypes._reset_cache() + + # Collect cyclic trash and read memory statistics immediately after. + func1 = sys.getallocatedblocks + func2 = sys.gettotalrefcount + gc.collect() + return func1(), func2() + + +def warm_caches(): + # char cache + s = bytes(range(256)) + for i in range(256): + s[i:i+1] + # unicode cache + x = [chr(i) for i in range(256)] + # int cache + x = list(range(-5, 257)) diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py new file mode 100644 index 0000000..d8c0eb2 --- /dev/null +++ b/Lib/test/libregrtest/runtest.py @@ -0,0 +1,271 @@ +import faulthandler +import importlib +import io +import json +import os +import sys +import time +import traceback +import unittest +from test import support +from test.libregrtest.refleak import dash_R +from test.libregrtest.save_env import saved_test_environment + + +# Test result constants. +PASSED = 1 +FAILED = 0 +ENV_CHANGED = -1 +SKIPPED = -2 +RESOURCE_DENIED = -3 +INTERRUPTED = -4 +CHILD_ERROR = -5 # error in a child process + + +def run_test_in_subprocess(testname, ns): + """Run the given test in a subprocess with --slaveargs. + + ns is the option Namespace parsed from command-line arguments. regrtest + is invoked in a subprocess with the --slaveargs argument; when the + subprocess exits, its return code, stdout and stderr are returned as a + 3-tuple. + """ + from subprocess import Popen, PIPE + base_cmd = ([sys.executable] + support.args_from_interpreter_flags() + + ['-X', 'faulthandler', '-m', 'test.regrtest']) + + slaveargs = ( + (testname, ns.verbose, ns.quiet), + dict(huntrleaks=ns.huntrleaks, + use_resources=ns.use_resources, + output_on_failure=ns.verbose3, + timeout=ns.timeout, failfast=ns.failfast, + match_tests=ns.match_tests)) + # Running the child from the same working directory as regrtest's original + # invocation ensures that TEMPDIR for the child is the same when + # sysconfig.is_python_build() is true. See issue 15300. + popen = Popen(base_cmd + ['--slaveargs', json.dumps(slaveargs)], + stdout=PIPE, stderr=PIPE, + universal_newlines=True, + close_fds=(os.name != 'nt'), + cwd=support.SAVEDCWD) + stdout, stderr = popen.communicate() + retcode = popen.wait() + return retcode, stdout, stderr + + +# small set of tests to determine if we have a basically functioning interpreter +# (i.e. if any of these fail, then anything else is likely to follow) +STDTESTS = [ + 'test_grammar', + 'test_opcodes', + 'test_dict', + 'test_builtin', + 'test_exceptions', + 'test_types', + 'test_unittest', + 'test_doctest', + 'test_doctest2', + 'test_support' +] + +# set of tests that we don't want to be executed when using regrtest +NOTTESTS = set() + + +def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS): + """Return a list of all applicable test modules.""" + testdir = findtestdir(testdir) + names = os.listdir(testdir) + tests = [] + others = set(stdtests) | nottests + for name in names: + mod, ext = os.path.splitext(name) + if mod[:5] == "test_" and ext in (".py", "") and mod not in others: + tests.append(mod) + return stdtests + sorted(tests) + + +def runtest(test, verbose, quiet, + huntrleaks=False, use_resources=None, + output_on_failure=False, failfast=False, match_tests=None, + timeout=None): + """Run a single test. + + test -- the name of the test + verbose -- if true, print more messages + quiet -- if true, don't print 'skipped' messages (probably redundant) + huntrleaks -- run multiple times to test for leaks; requires a debug + build; a triple corresponding to -R's three arguments + use_resources -- list of extra resources to use + output_on_failure -- if true, display test output on failure + timeout -- dump the traceback and exit if a test takes more than + timeout seconds + failfast, match_tests -- See regrtest command-line flags for these. + + Returns the tuple result, test_time, where result is one of the constants: + INTERRUPTED KeyboardInterrupt when run under -j + RESOURCE_DENIED test skipped because resource denied + SKIPPED test skipped for some other reason + ENV_CHANGED test failed because it changed the execution environment + FAILED test failed + PASSED test passed + """ + + if use_resources is not None: + support.use_resources = use_resources + use_timeout = (timeout is not None) + if use_timeout: + faulthandler.dump_traceback_later(timeout, exit=True) + try: + support.match_tests = match_tests + if failfast: + support.failfast = True + if output_on_failure: + support.verbose = True + + # Reuse the same instance to all calls to runtest(). Some + # tests keep a reference to sys.stdout or sys.stderr + # (eg. test_argparse). + if runtest.stringio is None: + stream = io.StringIO() + runtest.stringio = stream + else: + stream = runtest.stringio + stream.seek(0) + stream.truncate() + + orig_stdout = sys.stdout + orig_stderr = sys.stderr + try: + sys.stdout = stream + sys.stderr = stream + result = runtest_inner(test, verbose, quiet, huntrleaks, + display_failure=False) + if result[0] == FAILED: + output = stream.getvalue() + orig_stderr.write(output) + orig_stderr.flush() + finally: + sys.stdout = orig_stdout + sys.stderr = orig_stderr + else: + support.verbose = verbose # Tell tests to be moderately quiet + result = runtest_inner(test, verbose, quiet, huntrleaks, + display_failure=not verbose) + return result + finally: + if use_timeout: + faulthandler.cancel_dump_traceback_later() + cleanup_test_droppings(test, verbose) +runtest.stringio = None + + +def runtest_inner(test, verbose, quiet, + huntrleaks=False, display_failure=True): + support.unload(test) + + test_time = 0.0 + refleak = False # True if the test leaked references. + try: + if test.startswith('test.'): + abstest = test + else: + # Always import it from the test package + abstest = 'test.' + test + with saved_test_environment(test, verbose, quiet) as environment: + start_time = time.time() + the_module = importlib.import_module(abstest) + # If the test has a test_main, that will run the appropriate + # tests. If not, use normal unittest test loading. + test_runner = getattr(the_module, "test_main", None) + if test_runner is None: + def test_runner(): + loader = unittest.TestLoader() + tests = loader.loadTestsFromModule(the_module) + for error in loader.errors: + print(error, file=sys.stderr) + if loader.errors: + raise Exception("errors while loading tests") + support.run_unittest(tests) + test_runner() + if huntrleaks: + refleak = dash_R(the_module, test, test_runner, huntrleaks) + test_time = time.time() - start_time + except support.ResourceDenied as msg: + if not quiet: + print(test, "skipped --", msg) + sys.stdout.flush() + return RESOURCE_DENIED, test_time + except unittest.SkipTest as msg: + if not quiet: + print(test, "skipped --", msg) + sys.stdout.flush() + return SKIPPED, test_time + except KeyboardInterrupt: + raise + except support.TestFailed as msg: + if display_failure: + print("test", test, "failed --", msg, file=sys.stderr) + else: + print("test", test, "failed", file=sys.stderr) + sys.stderr.flush() + return FAILED, test_time + except: + msg = traceback.format_exc() + print("test", test, "crashed --", msg, file=sys.stderr) + sys.stderr.flush() + return FAILED, test_time + else: + if refleak: + return FAILED, test_time + if environment.changed: + return ENV_CHANGED, test_time + return PASSED, test_time + + +def cleanup_test_droppings(testname, verbose): + import shutil + import stat + import gc + + # First kill any dangling references to open files etc. + # This can also issue some ResourceWarnings which would otherwise get + # triggered during the following test run, and possibly produce failures. + gc.collect() + + # Try to clean up junk commonly left behind. While tests shouldn't leave + # any files or directories behind, when a test fails that can be tedious + # for it to arrange. The consequences can be especially nasty on Windows, + # since if a test leaves a file open, it cannot be deleted by name (while + # there's nothing we can do about that here either, we can display the + # name of the offending test, which is a real help). + for name in (support.TESTFN, + "db_home", + ): + if not os.path.exists(name): + continue + + if os.path.isdir(name): + kind, nuker = "directory", shutil.rmtree + elif os.path.isfile(name): + kind, nuker = "file", os.unlink + else: + raise SystemError("os.path says %r exists but is neither " + "directory nor file" % name) + + if verbose: + print("%r left behind %s %r" % (testname, kind, name)) + try: + # if we have chmod, fix possible permissions problems + # that might prevent cleanup + if (hasattr(os, 'chmod')): + os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + nuker(name) + except Exception as msg: + print(("%r left behind %s %r and it couldn't be " + "removed: %s" % (testname, kind, name, msg)), file=sys.stderr) + + +def findtestdir(path=None): + return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py new file mode 100644 index 0000000..4f6a1aa --- /dev/null +++ b/Lib/test/libregrtest/save_env.py @@ -0,0 +1,284 @@ +import builtins +import locale +import logging +import os +import shutil +import sys +import sysconfig +import warnings +from test import support +try: + import threading +except ImportError: + threading = None +try: + import _multiprocessing, multiprocessing.process +except ImportError: + multiprocessing = None + + +# Unit tests are supposed to leave the execution environment unchanged +# once they complete. But sometimes tests have bugs, especially when +# tests fail, and the changes to environment go on to mess up other +# tests. This can cause issues with buildbot stability, since tests +# are run in random order and so problems may appear to come and go. +# There are a few things we can save and restore to mitigate this, and +# the following context manager handles this task. + +class saved_test_environment: + """Save bits of the test environment and restore them at block exit. + + with saved_test_environment(testname, verbose, quiet): + #stuff + + Unless quiet is True, a warning is printed to stderr if any of + the saved items was changed by the test. The attribute 'changed' + is initially False, but is set to True if a change is detected. + + If verbose is more than 1, the before and after state of changed + items is also printed. + """ + + changed = False + + def __init__(self, testname, verbose=0, quiet=False): + self.testname = testname + self.verbose = verbose + self.quiet = quiet + + # To add things to save and restore, add a name XXX to the resources list + # and add corresponding get_XXX/restore_XXX functions. get_XXX should + # return the value to be saved and compared against a second call to the + # get function when test execution completes. restore_XXX should accept + # the saved value and restore the resource using it. It will be called if + # and only if a change in the value is detected. + # + # Note: XXX will have any '.' replaced with '_' characters when determining + # the corresponding method names. + + resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr', + 'os.environ', 'sys.path', 'sys.path_hooks', '__import__', + 'warnings.filters', 'asyncore.socket_map', + 'logging._handlers', 'logging._handlerList', 'sys.gettrace', + 'sys.warnoptions', + # multiprocessing.process._cleanup() may release ref + # to a thread, so check processes first. + 'multiprocessing.process._dangling', 'threading._dangling', + 'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES', + 'files', 'locale', 'warnings.showwarning', + ) + + def get_sys_argv(self): + return id(sys.argv), sys.argv, sys.argv[:] + def restore_sys_argv(self, saved_argv): + sys.argv = saved_argv[1] + sys.argv[:] = saved_argv[2] + + def get_cwd(self): + return os.getcwd() + def restore_cwd(self, saved_cwd): + os.chdir(saved_cwd) + + def get_sys_stdout(self): + return sys.stdout + def restore_sys_stdout(self, saved_stdout): + sys.stdout = saved_stdout + + def get_sys_stderr(self): + return sys.stderr + def restore_sys_stderr(self, saved_stderr): + sys.stderr = saved_stderr + + def get_sys_stdin(self): + return sys.stdin + def restore_sys_stdin(self, saved_stdin): + sys.stdin = saved_stdin + + def get_os_environ(self): + return id(os.environ), os.environ, dict(os.environ) + def restore_os_environ(self, saved_environ): + os.environ = saved_environ[1] + os.environ.clear() + os.environ.update(saved_environ[2]) + + def get_sys_path(self): + return id(sys.path), sys.path, sys.path[:] + def restore_sys_path(self, saved_path): + sys.path = saved_path[1] + sys.path[:] = saved_path[2] + + def get_sys_path_hooks(self): + return id(sys.path_hooks), sys.path_hooks, sys.path_hooks[:] + def restore_sys_path_hooks(self, saved_hooks): + sys.path_hooks = saved_hooks[1] + sys.path_hooks[:] = saved_hooks[2] + + def get_sys_gettrace(self): + return sys.gettrace() + def restore_sys_gettrace(self, trace_fxn): + sys.settrace(trace_fxn) + + def get___import__(self): + return builtins.__import__ + def restore___import__(self, import_): + builtins.__import__ = import_ + + def get_warnings_filters(self): + return id(warnings.filters), warnings.filters, warnings.filters[:] + def restore_warnings_filters(self, saved_filters): + warnings.filters = saved_filters[1] + warnings.filters[:] = saved_filters[2] + + def get_asyncore_socket_map(self): + asyncore = sys.modules.get('asyncore') + # XXX Making a copy keeps objects alive until __exit__ gets called. + return asyncore and asyncore.socket_map.copy() or {} + def restore_asyncore_socket_map(self, saved_map): + asyncore = sys.modules.get('asyncore') + if asyncore is not None: + asyncore.close_all(ignore_all=True) + asyncore.socket_map.update(saved_map) + + def get_shutil_archive_formats(self): + # we could call get_archives_formats() but that only returns the + # registry keys; we want to check the values too (the functions that + # are registered) + return shutil._ARCHIVE_FORMATS, shutil._ARCHIVE_FORMATS.copy() + def restore_shutil_archive_formats(self, saved): + shutil._ARCHIVE_FORMATS = saved[0] + shutil._ARCHIVE_FORMATS.clear() + shutil._ARCHIVE_FORMATS.update(saved[1]) + + def get_shutil_unpack_formats(self): + return shutil._UNPACK_FORMATS, shutil._UNPACK_FORMATS.copy() + def restore_shutil_unpack_formats(self, saved): + shutil._UNPACK_FORMATS = saved[0] + shutil._UNPACK_FORMATS.clear() + shutil._UNPACK_FORMATS.update(saved[1]) + + def get_logging__handlers(self): + # _handlers is a WeakValueDictionary + return id(logging._handlers), logging._handlers, logging._handlers.copy() + def restore_logging__handlers(self, saved_handlers): + # Can't easily revert the logging state + pass + + def get_logging__handlerList(self): + # _handlerList is a list of weakrefs to handlers + return id(logging._handlerList), logging._handlerList, logging._handlerList[:] + def restore_logging__handlerList(self, saved_handlerList): + # Can't easily revert the logging state + pass + + def get_sys_warnoptions(self): + return id(sys.warnoptions), sys.warnoptions, sys.warnoptions[:] + def restore_sys_warnoptions(self, saved_options): + sys.warnoptions = saved_options[1] + sys.warnoptions[:] = saved_options[2] + + # Controlling dangling references to Thread objects can make it easier + # to track reference leaks. + def get_threading__dangling(self): + if not threading: + return None + # This copies the weakrefs without making any strong reference + return threading._dangling.copy() + def restore_threading__dangling(self, saved): + if not threading: + return + threading._dangling.clear() + threading._dangling.update(saved) + + # Same for Process objects + def get_multiprocessing_process__dangling(self): + if not multiprocessing: + return None + # Unjoined process objects can survive after process exits + multiprocessing.process._cleanup() + # This copies the weakrefs without making any strong reference + return multiprocessing.process._dangling.copy() + def restore_multiprocessing_process__dangling(self, saved): + if not multiprocessing: + return + multiprocessing.process._dangling.clear() + multiprocessing.process._dangling.update(saved) + + def get_sysconfig__CONFIG_VARS(self): + # make sure the dict is initialized + sysconfig.get_config_var('prefix') + return (id(sysconfig._CONFIG_VARS), sysconfig._CONFIG_VARS, + dict(sysconfig._CONFIG_VARS)) + def restore_sysconfig__CONFIG_VARS(self, saved): + sysconfig._CONFIG_VARS = saved[1] + sysconfig._CONFIG_VARS.clear() + sysconfig._CONFIG_VARS.update(saved[2]) + + def get_sysconfig__INSTALL_SCHEMES(self): + return (id(sysconfig._INSTALL_SCHEMES), sysconfig._INSTALL_SCHEMES, + sysconfig._INSTALL_SCHEMES.copy()) + def restore_sysconfig__INSTALL_SCHEMES(self, saved): + sysconfig._INSTALL_SCHEMES = saved[1] + sysconfig._INSTALL_SCHEMES.clear() + sysconfig._INSTALL_SCHEMES.update(saved[2]) + + def get_files(self): + return sorted(fn + ('/' if os.path.isdir(fn) else '') + for fn in os.listdir()) + def restore_files(self, saved_value): + fn = support.TESTFN + if fn not in saved_value and (fn + '/') not in saved_value: + if os.path.isfile(fn): + support.unlink(fn) + elif os.path.isdir(fn): + support.rmtree(fn) + + _lc = [getattr(locale, lc) for lc in dir(locale) + if lc.startswith('LC_')] + def get_locale(self): + pairings = [] + for lc in self._lc: + try: + pairings.append((lc, locale.setlocale(lc, None))) + except (TypeError, ValueError): + continue + return pairings + def restore_locale(self, saved): + for lc, setting in saved: + locale.setlocale(lc, setting) + + def get_warnings_showwarning(self): + return warnings.showwarning + def restore_warnings_showwarning(self, fxn): + warnings.showwarning = fxn + + def resource_info(self): + for name in self.resources: + method_suffix = name.replace('.', '_') + get_name = 'get_' + method_suffix + restore_name = 'restore_' + method_suffix + yield name, getattr(self, get_name), getattr(self, restore_name) + + def __enter__(self): + self.saved_values = dict((name, get()) for name, get, restore + in self.resource_info()) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + saved_values = self.saved_values + del self.saved_values + for name, get, restore in self.resource_info(): + current = get() + original = saved_values.pop(name) + # Check for changes to the resource's value + if current != original: + self.changed = True + restore(original) + if not self.quiet: + print("Warning -- {} was modified by {}".format( + name, self.testname), + file=sys.stderr) + if self.verbose > 1: + print(" Before: {}\n After: {} ".format( + original, current), + file=sys.stderr) + return False diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py index b49e66b..fcc3937 100755 --- a/Lib/test/regrtest.py +++ b/Lib/test/regrtest.py @@ -6,1563 +6,12 @@ Script to run Python regression tests. Run this script with -h or --help for documentation. """ -USAGE = """\ -python -m test [options] [test_name1 [test_name2 ...]] -python path/to/Lib/test/regrtest.py [options] [test_name1 [test_name2 ...]] -""" - -DESCRIPTION = """\ -Run Python regression tests. - -If no arguments or options are provided, finds all files matching -the pattern "test_*" in the Lib/test subdirectory and runs -them in alphabetical order (but see -M and -u, below, for exceptions). - -For more rigorous testing, it is useful to use the following -command line: - -python -E -Wd -m test [options] [test_name1 ...] -""" - -EPILOG = """\ -Additional option details: - --r randomizes test execution order. You can use --randseed=int to provide a -int seed value for the randomizer; this is useful for reproducing troublesome -test orders. - --s On the first invocation of regrtest using -s, the first test file found -or the first test file given on the command line is run, and the name of -the next test is recorded in a file named pynexttest. If run from the -Python build directory, pynexttest is located in the 'build' subdirectory, -otherwise it is located in tempfile.gettempdir(). On subsequent runs, -the test in pynexttest is run, and the next test is written to pynexttest. -When the last test has been run, pynexttest is deleted. In this way it -is possible to single step through the test files. This is useful when -doing memory analysis on the Python interpreter, which process tends to -consume too many resources to run the full regression test non-stop. - --S is used to continue running tests after an aborted run. It will -maintain the order a standard run (ie, this assumes -r is not used). -This is useful after the tests have prematurely stopped for some external -reason and you want to start running from where you left off rather -than starting from the beginning. - --f reads the names of tests from the file given as f's argument, one -or more test names per line. Whitespace is ignored. Blank lines and -lines beginning with '#' are ignored. This is especially useful for -whittling down failures involving interactions among tests. - --L causes the leaks(1) command to be run just before exit if it exists. -leaks(1) is available on Mac OS X and presumably on some other -FreeBSD-derived systems. - --R runs each test several times and examines sys.gettotalrefcount() to -see if the test appears to be leaking references. The argument should -be of the form stab:run:fname where 'stab' is the number of times the -test is run to let gettotalrefcount settle down, 'run' is the number -of times further it is run and 'fname' is the name of the file the -reports are written to. These parameters all have defaults (5, 4 and -"reflog.txt" respectively), and the minimal invocation is '-R :'. - --M runs tests that require an exorbitant amount of memory. These tests -typically try to ascertain containers keep working when containing more than -2 billion objects, which only works on 64-bit systems. There are also some -tests that try to exhaust the address space of the process, which only makes -sense on 32-bit systems with at least 2Gb of memory. The passed-in memlimit, -which is a string in the form of '2.5Gb', determines howmuch memory the -tests will limit themselves to (but they may go slightly over.) The number -shouldn't be more memory than the machine has (including swap memory). You -should also keep in mind that swap memory is generally much, much slower -than RAM, and setting memlimit to all available RAM or higher will heavily -tax the machine. On the other hand, it is no use running these tests with a -limit of less than 2.5Gb, and many require more than 20Gb. Tests that expect -to use more than memlimit memory will be skipped. The big-memory tests -generally run very, very long. - --u is used to specify which special resource intensive tests to run, -such as those requiring large file support or network connectivity. -The argument is a comma-separated list of words indicating the -resources to test. Currently only the following are defined: - - all - Enable all special resources. - - none - Disable all special resources (this is the default). - - audio - Tests that use the audio device. (There are known - cases of broken audio drivers that can crash Python or - even the Linux kernel.) - - curses - Tests that use curses and will modify the terminal's - state and output modes. - - largefile - It is okay to run some test that may create huge - files. These tests can take a long time and may - consume >2GB of disk space temporarily. - - network - It is okay to run tests that use external network - resource, e.g. testing SSL support for sockets. - - decimal - Test the decimal module against a large suite that - verifies compliance with standards. - - cpu - Used for certain CPU-heavy tests. - - subprocess Run all tests for the subprocess module. - - urlfetch - It is okay to download files required on testing. - - gui - Run tests that require a running GUI. - -To enable all resources except one, use '-uall,-<resource>'. For -example, to run all the tests except for the gui tests, give the -option '-uall,-gui'. -""" - # We import importlib *ASAP* in order to test #15386 import importlib -import argparse -import builtins -import faulthandler -import io -import json -import locale -import logging import os -import platform -import random -import re -import shutil -import signal import sys -import sysconfig -import tempfile -import time -import traceback -import unittest -import warnings -from inspect import isabstract - -try: - import threading -except ImportError: - threading = None -try: - import _multiprocessing, multiprocessing.process -except ImportError: - multiprocessing = None - - -# Some times __path__ and __file__ are not absolute (e.g. while running from -# Lib/) and, if we change the CWD to run the tests in a temporary dir, some -# imports might fail. This affects only the modules imported before os.chdir(). -# These modules are searched first in sys.path[0] (so '' -- the CWD) and if -# they are found in the CWD their __file__ and __path__ will be relative (this -# happens before the chdir). All the modules imported after the chdir, are -# not found in the CWD, and since the other paths in sys.path[1:] are absolute -# (site.py absolutize them), the __file__ and __path__ will be absolute too. -# Therefore it is necessary to absolutize manually the __file__ and __path__ of -# the packages to prevent later imports to fail when the CWD is different. -for module in sys.modules.values(): - if hasattr(module, '__path__'): - module.__path__ = [os.path.abspath(path) for path in module.__path__] - if hasattr(module, '__file__'): - module.__file__ = os.path.abspath(module.__file__) - - -# MacOSX (a.k.a. Darwin) has a default stack size that is too small -# for deeply recursive regular expressions. We see this as crashes in -# the Python test suite when running test_re.py and test_sre.py. The -# fix is to set the stack limit to 2048. -# This approach may also be useful for other Unixy platforms that -# suffer from small default stack limits. -if sys.platform == 'darwin': - try: - import resource - except ImportError: - pass - else: - soft, hard = resource.getrlimit(resource.RLIMIT_STACK) - newsoft = min(hard, max(soft, 1024*2048)) - resource.setrlimit(resource.RLIMIT_STACK, (newsoft, hard)) - -# Test result constants. -PASSED = 1 -FAILED = 0 -ENV_CHANGED = -1 -SKIPPED = -2 -RESOURCE_DENIED = -3 -INTERRUPTED = -4 -CHILD_ERROR = -5 # error in a child process - -from test import support - -RESOURCE_NAMES = ('audio', 'curses', 'largefile', 'network', - 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui') - -# When tests are run from the Python build directory, it is best practice -# to keep the test files in a subfolder. This eases the cleanup of leftover -# files using the "make distclean" command. -if sysconfig.is_python_build(): - TEMPDIR = os.path.join(sysconfig.get_config_var('srcdir'), 'build') -else: - TEMPDIR = tempfile.gettempdir() -TEMPDIR = os.path.abspath(TEMPDIR) - -class _ArgParser(argparse.ArgumentParser): - - def error(self, message): - super().error(message + "\nPass -h or --help for complete help.") - -def _create_parser(): - # Set prog to prevent the uninformative "__main__.py" from displaying in - # error messages when using "python -m test ...". - parser = _ArgParser(prog='regrtest.py', - usage=USAGE, - description=DESCRIPTION, - epilog=EPILOG, - add_help=False, - formatter_class=argparse.RawDescriptionHelpFormatter) - - # Arguments with this clause added to its help are described further in - # the epilog's "Additional option details" section. - more_details = ' See the section at bottom for more details.' - - group = parser.add_argument_group('General options') - # We add help explicitly to control what argument group it renders under. - group.add_argument('-h', '--help', action='help', - help='show this help message and exit') - group.add_argument('--timeout', metavar='TIMEOUT', type=float, - help='dump the traceback and exit if a test takes ' - 'more than TIMEOUT seconds; disabled if TIMEOUT ' - 'is negative or equals to zero') - group.add_argument('--wait', action='store_true', - help='wait for user input, e.g., allow a debugger ' - 'to be attached') - group.add_argument('--slaveargs', metavar='ARGS') - group.add_argument('-S', '--start', metavar='START', - help='the name of the test at which to start.' + - more_details) - - group = parser.add_argument_group('Verbosity') - group.add_argument('-v', '--verbose', action='count', - help='run tests in verbose mode with output to stdout') - group.add_argument('-w', '--verbose2', action='store_true', - help='re-run failed tests in verbose mode') - group.add_argument('-W', '--verbose3', action='store_true', - help='display test output on failure') - group.add_argument('-q', '--quiet', action='store_true', - help='no output unless one or more tests fail') - group.add_argument('-o', '--slow', action='store_true', dest='print_slow', - help='print the slowest 10 tests') - group.add_argument('--header', action='store_true', - help='print header with interpreter info') - - group = parser.add_argument_group('Selecting tests') - group.add_argument('-r', '--randomize', action='store_true', - help='randomize test execution order.' + more_details) - group.add_argument('--randseed', metavar='SEED', - dest='random_seed', type=int, - help='pass a random seed to reproduce a previous ' - 'random run') - group.add_argument('-f', '--fromfile', metavar='FILE', - help='read names of tests to run from a file.' + - more_details) - group.add_argument('-x', '--exclude', action='store_true', - help='arguments are tests to *exclude*') - group.add_argument('-s', '--single', action='store_true', - help='single step through a set of tests.' + - more_details) - group.add_argument('-m', '--match', metavar='PAT', - dest='match_tests', - help='match test cases and methods with glob pattern PAT') - group.add_argument('-G', '--failfast', action='store_true', - help='fail as soon as a test fails (only with -v or -W)') - group.add_argument('-u', '--use', metavar='RES1,RES2,...', - action='append', type=resources_list, - help='specify which special resource intensive tests ' - 'to run.' + more_details) - group.add_argument('-M', '--memlimit', metavar='LIMIT', - help='run very large memory-consuming tests.' + - more_details) - group.add_argument('--testdir', metavar='DIR', - type=relative_filename, - help='execute test files in the specified directory ' - '(instead of the Python stdlib test suite)') - - group = parser.add_argument_group('Special runs') - group.add_argument('-l', '--findleaks', action='store_true', - help='if GC is available detect tests that leak memory') - group.add_argument('-L', '--runleaks', action='store_true', - help='run the leaks(1) command just before exit.' + - more_details) - group.add_argument('-R', '--huntrleaks', metavar='RUNCOUNTS', - type=huntrleaks, - help='search for reference leaks (needs debug build, ' - 'very slow).' + more_details) - group.add_argument('-j', '--multiprocess', metavar='PROCESSES', - dest='use_mp', type=int, - help='run PROCESSES processes at once') - group.add_argument('-T', '--coverage', action='store_true', - dest='trace', - help='turn on code coverage tracing using the trace ' - 'module') - group.add_argument('-D', '--coverdir', metavar='DIR', - type=relative_filename, - help='directory where coverage files are put') - group.add_argument('-N', '--nocoverdir', - action='store_const', const=None, dest='coverdir', - help='put coverage files alongside modules') - group.add_argument('-t', '--threshold', metavar='THRESHOLD', - type=int, - help='call gc.set_threshold(THRESHOLD)') - group.add_argument('-n', '--nowindows', action='store_true', - help='suppress error message boxes on Windows') - group.add_argument('-F', '--forever', action='store_true', - help='run the specified tests in a loop, until an ' - 'error happens') - - parser.add_argument('args', nargs=argparse.REMAINDER, - help=argparse.SUPPRESS) - - return parser - -def relative_filename(string): - # CWD is replaced with a temporary dir before calling main(), so we - # join it with the saved CWD so it ends up where the user expects. - return os.path.join(support.SAVEDCWD, string) - -def huntrleaks(string): - args = string.split(':') - if len(args) not in (2, 3): - raise argparse.ArgumentTypeError( - 'needs 2 or 3 colon-separated arguments') - nwarmup = int(args[0]) if args[0] else 5 - ntracked = int(args[1]) if args[1] else 4 - fname = args[2] if len(args) > 2 and args[2] else 'reflog.txt' - return nwarmup, ntracked, fname - -def resources_list(string): - u = [x.lower() for x in string.split(',')] - for r in u: - if r == 'all' or r == 'none': - continue - if r[0] == '-': - r = r[1:] - if r not in RESOURCE_NAMES: - raise argparse.ArgumentTypeError('invalid resource: ' + r) - return u - -def _parse_args(args, **kwargs): - # Defaults - ns = argparse.Namespace(testdir=None, verbose=0, quiet=False, - exclude=False, single=False, randomize=False, fromfile=None, - findleaks=False, use_resources=None, trace=False, coverdir='coverage', - runleaks=False, huntrleaks=False, verbose2=False, print_slow=False, - random_seed=None, use_mp=None, verbose3=False, forever=False, - header=False, failfast=False, match_tests=None) - for k, v in kwargs.items(): - if not hasattr(ns, k): - raise TypeError('%r is an invalid keyword argument ' - 'for this function' % k) - setattr(ns, k, v) - if ns.use_resources is None: - ns.use_resources = [] - - parser = _create_parser() - parser.parse_args(args=args, namespace=ns) - - if ns.single and ns.fromfile: - parser.error("-s and -f don't go together!") - if ns.use_mp and ns.trace: - parser.error("-T and -j don't go together!") - if ns.use_mp and ns.findleaks: - parser.error("-l and -j don't go together!") - if ns.use_mp and ns.memlimit: - parser.error("-M and -j don't go together!") - if ns.failfast and not (ns.verbose or ns.verbose3): - parser.error("-G/--failfast needs either -v or -W") - - if ns.quiet: - ns.verbose = 0 - if ns.timeout is not None: - if hasattr(faulthandler, 'dump_traceback_later'): - if ns.timeout <= 0: - ns.timeout = None - else: - print("Warning: The timeout option requires " - "faulthandler.dump_traceback_later") - ns.timeout = None - if ns.use_mp is not None: - if ns.use_mp <= 0: - # Use all cores + extras for tests that like to sleep - ns.use_mp = 2 + (os.cpu_count() or 1) - if ns.use_mp == 1: - ns.use_mp = None - if ns.use: - for a in ns.use: - for r in a: - if r == 'all': - ns.use_resources[:] = RESOURCE_NAMES - continue - if r == 'none': - del ns.use_resources[:] - continue - remove = False - if r[0] == '-': - remove = True - r = r[1:] - if remove: - if r in ns.use_resources: - ns.use_resources.remove(r) - elif r not in ns.use_resources: - ns.use_resources.append(r) - if ns.random_seed is not None: - ns.randomize = True - - return ns - - -def run_test_in_subprocess(testname, ns): - """Run the given test in a subprocess with --slaveargs. - - ns is the option Namespace parsed from command-line arguments. regrtest - is invoked in a subprocess with the --slaveargs argument; when the - subprocess exits, its return code, stdout and stderr are returned as a - 3-tuple. - """ - from subprocess import Popen, PIPE - base_cmd = ([sys.executable] + support.args_from_interpreter_flags() + - ['-X', 'faulthandler', '-m', 'test.regrtest']) - - slaveargs = ( - (testname, ns.verbose, ns.quiet), - dict(huntrleaks=ns.huntrleaks, - use_resources=ns.use_resources, - output_on_failure=ns.verbose3, - timeout=ns.timeout, failfast=ns.failfast, - match_tests=ns.match_tests)) - # Running the child from the same working directory as regrtest's original - # invocation ensures that TEMPDIR for the child is the same when - # sysconfig.is_python_build() is true. See issue 15300. - popen = Popen(base_cmd + ['--slaveargs', json.dumps(slaveargs)], - stdout=PIPE, stderr=PIPE, - universal_newlines=True, - close_fds=(os.name != 'nt'), - cwd=support.SAVEDCWD) - stdout, stderr = popen.communicate() - retcode = popen.wait() - return retcode, stdout, stderr - - -def main(tests=None, **kwargs): - """Execute a test suite. - - This also parses command-line options and modifies its behavior - accordingly. - - tests -- a list of strings containing test names (optional) - testdir -- the directory in which to look for tests (optional) - - Users other than the Python test suite will certainly want to - specify testdir; if it's omitted, the directory containing the - Python test suite is searched for. - - If the tests argument is omitted, the tests listed on the - command-line will be used. If that's empty, too, then all *.py - files beginning with test_ will be used. - - The other default arguments (verbose, quiet, exclude, - single, randomize, findleaks, use_resources, trace, coverdir, - print_slow, and random_seed) allow programmers calling main() - directly to set the values that would normally be set by flags - on the command line. - """ - # Display the Python traceback on fatal errors (e.g. segfault) - faulthandler.enable(all_threads=True) - - # Display the Python traceback on SIGALRM or SIGUSR1 signal - signals = [] - if hasattr(signal, 'SIGALRM'): - signals.append(signal.SIGALRM) - if hasattr(signal, 'SIGUSR1'): - signals.append(signal.SIGUSR1) - for signum in signals: - faulthandler.register(signum, chain=True) - - replace_stdout() - - support.record_original_stdout(sys.stdout) - - ns = _parse_args(sys.argv[1:], **kwargs) - - if ns.huntrleaks: - # Avoid false positives due to various caches - # filling slowly with random data: - warm_caches() - if ns.memlimit is not None: - support.set_memlimit(ns.memlimit) - if ns.threshold is not None: - import gc - gc.set_threshold(ns.threshold) - if ns.nowindows: - import msvcrt - msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS| - msvcrt.SEM_NOALIGNMENTFAULTEXCEPT| - msvcrt.SEM_NOGPFAULTERRORBOX| - msvcrt.SEM_NOOPENFILEERRORBOX) - try: - msvcrt.CrtSetReportMode - except AttributeError: - # release build - pass - else: - for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]: - msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE) - msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR) - if ns.wait: - input("Press any key to continue...") - - if ns.slaveargs is not None: - args, kwargs = json.loads(ns.slaveargs) - if kwargs.get('huntrleaks'): - unittest.BaseTestSuite._cleanup = False - try: - result = runtest(*args, **kwargs) - except KeyboardInterrupt: - result = INTERRUPTED, '' - except BaseException as e: - traceback.print_exc() - result = CHILD_ERROR, str(e) - sys.stdout.flush() - print() # Force a newline (just in case) - print(json.dumps(result)) - sys.exit(0) - - good = [] - bad = [] - skipped = [] - resource_denieds = [] - environment_changed = [] - interrupted = False - - if ns.findleaks: - try: - import gc - except ImportError: - print('No GC available, disabling findleaks.') - ns.findleaks = False - else: - # Uncomment the line below to report garbage that is not - # freeable by reference counting alone. By default only - # garbage that is not collectable by the GC is reported. - #gc.set_debug(gc.DEBUG_SAVEALL) - found_garbage = [] - - if ns.huntrleaks: - unittest.BaseTestSuite._cleanup = False - - if ns.single: - filename = os.path.join(TEMPDIR, 'pynexttest') - try: - with open(filename, 'r') as fp: - next_test = fp.read().strip() - tests = [next_test] - except OSError: - pass - - if ns.fromfile: - tests = [] - with open(os.path.join(support.SAVEDCWD, ns.fromfile)) as fp: - count_pat = re.compile(r'\[\s*\d+/\s*\d+\]') - for line in fp: - line = count_pat.sub('', line) - guts = line.split() # assuming no test has whitespace in its name - if guts and not guts[0].startswith('#'): - tests.extend(guts) - - # Strip .py extensions. - removepy(ns.args) - removepy(tests) - - stdtests = STDTESTS[:] - nottests = NOTTESTS.copy() - if ns.exclude: - for arg in ns.args: - if arg in stdtests: - stdtests.remove(arg) - nottests.add(arg) - ns.args = [] - - # For a partial run, we do not need to clutter the output. - if ns.verbose or ns.header or not (ns.quiet or ns.single or tests or ns.args): - # Print basic platform information - print("==", platform.python_implementation(), *sys.version.split()) - print("== ", platform.platform(aliased=True), - "%s-endian" % sys.byteorder) - print("== ", "hash algorithm:", sys.hash_info.algorithm, - "64bit" if sys.maxsize > 2**32 else "32bit") - print("== ", os.getcwd()) - print("Testing with flags:", sys.flags) - - # if testdir is set, then we are not running the python tests suite, so - # don't add default tests to be executed or skipped (pass empty values) - if ns.testdir: - alltests = findtests(ns.testdir, list(), set()) - else: - alltests = findtests(ns.testdir, stdtests, nottests) - - selected = tests or ns.args or alltests - if ns.single: - selected = selected[:1] - try: - next_single_test = alltests[alltests.index(selected[0])+1] - except IndexError: - next_single_test = None - # Remove all the selected tests that precede start if it's set. - if ns.start: - try: - del selected[:selected.index(ns.start)] - except ValueError: - print("Couldn't find starting test (%s), using all tests" % ns.start) - if ns.randomize: - if ns.random_seed is None: - ns.random_seed = random.randrange(10000000) - random.seed(ns.random_seed) - print("Using random seed", ns.random_seed) - random.shuffle(selected) - if ns.trace: - import trace, tempfile - tracer = trace.Trace(ignoredirs=[sys.base_prefix, sys.base_exec_prefix, - tempfile.gettempdir()], - trace=False, count=True) - - test_times = [] - support.verbose = ns.verbose # Tell tests to be moderately quiet - support.use_resources = ns.use_resources - save_modules = sys.modules.keys() - - def accumulate_result(test, result): - ok, test_time = result - test_times.append((test_time, test)) - if ok == PASSED: - good.append(test) - elif ok == FAILED: - bad.append(test) - elif ok == ENV_CHANGED: - environment_changed.append(test) - elif ok == SKIPPED: - skipped.append(test) - elif ok == RESOURCE_DENIED: - skipped.append(test) - resource_denieds.append(test) - - if ns.forever: - def test_forever(tests=list(selected)): - while True: - for test in tests: - yield test - if bad: - return - tests = test_forever() - test_count = '' - test_count_width = 3 - else: - tests = iter(selected) - test_count = '/{}'.format(len(selected)) - test_count_width = len(test_count) - 1 - - if ns.use_mp: - try: - from threading import Thread - except ImportError: - print("Multiprocess option requires thread support") - sys.exit(2) - from queue import Queue - debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$") - output = Queue() - pending = MultiprocessTests(tests) - def work(): - # A worker thread. - try: - while True: - try: - test = next(pending) - except StopIteration: - output.put((None, None, None, None)) - return - retcode, stdout, stderr = run_test_in_subprocess(test, ns) - # Strip last refcount output line if it exists, since it - # comes from the shutdown of the interpreter in the subcommand. - stderr = debug_output_pat.sub("", stderr) - stdout, _, result = stdout.strip().rpartition("\n") - if retcode != 0: - result = (CHILD_ERROR, "Exit code %s" % retcode) - output.put((test, stdout.rstrip(), stderr.rstrip(), result)) - return - if not result: - output.put((None, None, None, None)) - return - result = json.loads(result) - output.put((test, stdout.rstrip(), stderr.rstrip(), result)) - except BaseException: - output.put((None, None, None, None)) - raise - workers = [Thread(target=work) for i in range(ns.use_mp)] - for worker in workers: - worker.start() - finished = 0 - test_index = 1 - try: - while finished < ns.use_mp: - test, stdout, stderr, result = output.get() - if test is None: - finished += 1 - continue - accumulate_result(test, result) - if not ns.quiet: - fmt = "[{1:{0}}{2}/{3}] {4}" if bad else "[{1:{0}}{2}] {4}" - print(fmt.format( - test_count_width, test_index, test_count, - len(bad), test)) - if stdout: - print(stdout) - if stderr: - print(stderr, file=sys.stderr) - sys.stdout.flush() - sys.stderr.flush() - if result[0] == INTERRUPTED: - raise KeyboardInterrupt - if result[0] == CHILD_ERROR: - raise Exception("Child error on {}: {}".format(test, result[1])) - test_index += 1 - except KeyboardInterrupt: - interrupted = True - pending.interrupted = True - for worker in workers: - worker.join() - else: - for test_index, test in enumerate(tests, 1): - if not ns.quiet: - fmt = "[{1:{0}}{2}/{3}] {4}" if bad else "[{1:{0}}{2}] {4}" - print(fmt.format( - test_count_width, test_index, test_count, len(bad), test)) - sys.stdout.flush() - if ns.trace: - # If we're tracing code coverage, then we don't exit with status - # if on a false return value from main. - tracer.runctx('runtest(test, ns.verbose, ns.quiet, timeout=ns.timeout)', - globals=globals(), locals=vars()) - else: - try: - result = runtest(test, ns.verbose, ns.quiet, - ns.huntrleaks, - output_on_failure=ns.verbose3, - timeout=ns.timeout, failfast=ns.failfast, - match_tests=ns.match_tests) - accumulate_result(test, result) - except KeyboardInterrupt: - interrupted = True - break - if ns.findleaks: - gc.collect() - if gc.garbage: - print("Warning: test created", len(gc.garbage), end=' ') - print("uncollectable object(s).") - # move the uncollectable objects somewhere so we don't see - # them again - found_garbage.extend(gc.garbage) - del gc.garbage[:] - # Unload the newly imported modules (best effort finalization) - for module in sys.modules.keys(): - if module not in save_modules and module.startswith("test."): - support.unload(module) - - if interrupted: - # print a newline after ^C - print() - print("Test suite interrupted by signal SIGINT.") - omitted = set(selected) - set(good) - set(bad) - set(skipped) - print(count(len(omitted), "test"), "omitted:") - printlist(omitted) - if good and not ns.quiet: - if not bad and not skipped and not interrupted and len(good) > 1: - print("All", end=' ') - print(count(len(good), "test"), "OK.") - if ns.print_slow: - test_times.sort(reverse=True) - print("10 slowest tests:") - for time, test in test_times[:10]: - print("%s: %.1fs" % (test, time)) - if bad: - print(count(len(bad), "test"), "failed:") - printlist(bad) - if environment_changed: - print("{} altered the execution environment:".format( - count(len(environment_changed), "test"))) - printlist(environment_changed) - if skipped and not ns.quiet: - print(count(len(skipped), "test"), "skipped:") - printlist(skipped) - - if ns.verbose2 and bad: - print("Re-running failed tests in verbose mode") - for test in bad[:]: - print("Re-running test %r in verbose mode" % test) - sys.stdout.flush() - try: - ns.verbose = True - ok = runtest(test, True, ns.quiet, ns.huntrleaks, - timeout=ns.timeout) - except KeyboardInterrupt: - # print a newline separate from the ^C - print() - break - else: - if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}: - bad.remove(test) - else: - if bad: - print(count(len(bad), 'test'), "failed again:") - printlist(bad) - - if ns.single: - if next_single_test: - with open(filename, 'w') as fp: - fp.write(next_single_test + '\n') - else: - os.unlink(filename) - - if ns.trace: - r = tracer.results() - r.write_results(show_missing=True, summary=True, coverdir=ns.coverdir) - - if ns.runleaks: - os.system("leaks %d" % os.getpid()) - - sys.exit(len(bad) > 0 or interrupted) - - -# small set of tests to determine if we have a basically functioning interpreter -# (i.e. if any of these fail, then anything else is likely to follow) -STDTESTS = [ - 'test_grammar', - 'test_opcodes', - 'test_dict', - 'test_builtin', - 'test_exceptions', - 'test_types', - 'test_unittest', - 'test_doctest', - 'test_doctest2', - 'test_support' -] - -# set of tests that we don't want to be executed when using regrtest -NOTTESTS = set() - -def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS): - """Return a list of all applicable test modules.""" - testdir = findtestdir(testdir) - names = os.listdir(testdir) - tests = [] - others = set(stdtests) | nottests - for name in names: - mod, ext = os.path.splitext(name) - if mod[:5] == "test_" and ext in (".py", "") and mod not in others: - tests.append(mod) - return stdtests + sorted(tests) - -# We do not use a generator so multiple threads can call next(). -class MultiprocessTests(object): - - """A thread-safe iterator over tests for multiprocess mode.""" - - def __init__(self, tests): - self.interrupted = False - self.lock = threading.Lock() - self.tests = tests - - def __iter__(self): - return self - - def __next__(self): - with self.lock: - if self.interrupted: - raise StopIteration('tests interrupted') - return next(self.tests) - -def replace_stdout(): - """Set stdout encoder error handler to backslashreplace (as stderr error - handler) to avoid UnicodeEncodeError when printing a traceback""" - import atexit - - stdout = sys.stdout - sys.stdout = open(stdout.fileno(), 'w', - encoding=stdout.encoding, - errors="backslashreplace", - closefd=False, - newline='\n') - - def restore_stdout(): - sys.stdout.close() - sys.stdout = stdout - atexit.register(restore_stdout) - -def runtest(test, verbose, quiet, - huntrleaks=False, use_resources=None, - output_on_failure=False, failfast=False, match_tests=None, - timeout=None): - """Run a single test. - - test -- the name of the test - verbose -- if true, print more messages - quiet -- if true, don't print 'skipped' messages (probably redundant) - huntrleaks -- run multiple times to test for leaks; requires a debug - build; a triple corresponding to -R's three arguments - use_resources -- list of extra resources to use - output_on_failure -- if true, display test output on failure - timeout -- dump the traceback and exit if a test takes more than - timeout seconds - failfast, match_tests -- See regrtest command-line flags for these. - - Returns the tuple result, test_time, where result is one of the constants: - INTERRUPTED KeyboardInterrupt when run under -j - RESOURCE_DENIED test skipped because resource denied - SKIPPED test skipped for some other reason - ENV_CHANGED test failed because it changed the execution environment - FAILED test failed - PASSED test passed - """ - - if use_resources is not None: - support.use_resources = use_resources - use_timeout = (timeout is not None) - if use_timeout: - faulthandler.dump_traceback_later(timeout, exit=True) - try: - support.match_tests = match_tests - if failfast: - support.failfast = True - if output_on_failure: - support.verbose = True - - # Reuse the same instance to all calls to runtest(). Some - # tests keep a reference to sys.stdout or sys.stderr - # (eg. test_argparse). - if runtest.stringio is None: - stream = io.StringIO() - runtest.stringio = stream - else: - stream = runtest.stringio - stream.seek(0) - stream.truncate() - - orig_stdout = sys.stdout - orig_stderr = sys.stderr - try: - sys.stdout = stream - sys.stderr = stream - result = runtest_inner(test, verbose, quiet, huntrleaks, - display_failure=False) - if result[0] == FAILED: - output = stream.getvalue() - orig_stderr.write(output) - orig_stderr.flush() - finally: - sys.stdout = orig_stdout - sys.stderr = orig_stderr - else: - support.verbose = verbose # Tell tests to be moderately quiet - result = runtest_inner(test, verbose, quiet, huntrleaks, - display_failure=not verbose) - return result - finally: - if use_timeout: - faulthandler.cancel_dump_traceback_later() - cleanup_test_droppings(test, verbose) -runtest.stringio = None - -# Unit tests are supposed to leave the execution environment unchanged -# once they complete. But sometimes tests have bugs, especially when -# tests fail, and the changes to environment go on to mess up other -# tests. This can cause issues with buildbot stability, since tests -# are run in random order and so problems may appear to come and go. -# There are a few things we can save and restore to mitigate this, and -# the following context manager handles this task. - -class saved_test_environment: - """Save bits of the test environment and restore them at block exit. - - with saved_test_environment(testname, verbose, quiet): - #stuff - - Unless quiet is True, a warning is printed to stderr if any of - the saved items was changed by the test. The attribute 'changed' - is initially False, but is set to True if a change is detected. - - If verbose is more than 1, the before and after state of changed - items is also printed. - """ - - changed = False - - def __init__(self, testname, verbose=0, quiet=False): - self.testname = testname - self.verbose = verbose - self.quiet = quiet - - # To add things to save and restore, add a name XXX to the resources list - # and add corresponding get_XXX/restore_XXX functions. get_XXX should - # return the value to be saved and compared against a second call to the - # get function when test execution completes. restore_XXX should accept - # the saved value and restore the resource using it. It will be called if - # and only if a change in the value is detected. - # - # Note: XXX will have any '.' replaced with '_' characters when determining - # the corresponding method names. - - resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr', - 'os.environ', 'sys.path', 'sys.path_hooks', '__import__', - 'warnings.filters', 'asyncore.socket_map', - 'logging._handlers', 'logging._handlerList', 'sys.gettrace', - 'sys.warnoptions', - # multiprocessing.process._cleanup() may release ref - # to a thread, so check processes first. - 'multiprocessing.process._dangling', 'threading._dangling', - 'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES', - 'files', 'locale', 'warnings.showwarning', - ) - - def get_sys_argv(self): - return id(sys.argv), sys.argv, sys.argv[:] - def restore_sys_argv(self, saved_argv): - sys.argv = saved_argv[1] - sys.argv[:] = saved_argv[2] - - def get_cwd(self): - return os.getcwd() - def restore_cwd(self, saved_cwd): - os.chdir(saved_cwd) - - def get_sys_stdout(self): - return sys.stdout - def restore_sys_stdout(self, saved_stdout): - sys.stdout = saved_stdout - - def get_sys_stderr(self): - return sys.stderr - def restore_sys_stderr(self, saved_stderr): - sys.stderr = saved_stderr - - def get_sys_stdin(self): - return sys.stdin - def restore_sys_stdin(self, saved_stdin): - sys.stdin = saved_stdin - - def get_os_environ(self): - return id(os.environ), os.environ, dict(os.environ) - def restore_os_environ(self, saved_environ): - os.environ = saved_environ[1] - os.environ.clear() - os.environ.update(saved_environ[2]) - - def get_sys_path(self): - return id(sys.path), sys.path, sys.path[:] - def restore_sys_path(self, saved_path): - sys.path = saved_path[1] - sys.path[:] = saved_path[2] - - def get_sys_path_hooks(self): - return id(sys.path_hooks), sys.path_hooks, sys.path_hooks[:] - def restore_sys_path_hooks(self, saved_hooks): - sys.path_hooks = saved_hooks[1] - sys.path_hooks[:] = saved_hooks[2] - - def get_sys_gettrace(self): - return sys.gettrace() - def restore_sys_gettrace(self, trace_fxn): - sys.settrace(trace_fxn) - - def get___import__(self): - return builtins.__import__ - def restore___import__(self, import_): - builtins.__import__ = import_ - - def get_warnings_filters(self): - return id(warnings.filters), warnings.filters, warnings.filters[:] - def restore_warnings_filters(self, saved_filters): - warnings.filters = saved_filters[1] - warnings.filters[:] = saved_filters[2] - - def get_asyncore_socket_map(self): - asyncore = sys.modules.get('asyncore') - # XXX Making a copy keeps objects alive until __exit__ gets called. - return asyncore and asyncore.socket_map.copy() or {} - def restore_asyncore_socket_map(self, saved_map): - asyncore = sys.modules.get('asyncore') - if asyncore is not None: - asyncore.close_all(ignore_all=True) - asyncore.socket_map.update(saved_map) - - def get_shutil_archive_formats(self): - # we could call get_archives_formats() but that only returns the - # registry keys; we want to check the values too (the functions that - # are registered) - return shutil._ARCHIVE_FORMATS, shutil._ARCHIVE_FORMATS.copy() - def restore_shutil_archive_formats(self, saved): - shutil._ARCHIVE_FORMATS = saved[0] - shutil._ARCHIVE_FORMATS.clear() - shutil._ARCHIVE_FORMATS.update(saved[1]) - - def get_shutil_unpack_formats(self): - return shutil._UNPACK_FORMATS, shutil._UNPACK_FORMATS.copy() - def restore_shutil_unpack_formats(self, saved): - shutil._UNPACK_FORMATS = saved[0] - shutil._UNPACK_FORMATS.clear() - shutil._UNPACK_FORMATS.update(saved[1]) - - def get_logging__handlers(self): - # _handlers is a WeakValueDictionary - return id(logging._handlers), logging._handlers, logging._handlers.copy() - def restore_logging__handlers(self, saved_handlers): - # Can't easily revert the logging state - pass - - def get_logging__handlerList(self): - # _handlerList is a list of weakrefs to handlers - return id(logging._handlerList), logging._handlerList, logging._handlerList[:] - def restore_logging__handlerList(self, saved_handlerList): - # Can't easily revert the logging state - pass - - def get_sys_warnoptions(self): - return id(sys.warnoptions), sys.warnoptions, sys.warnoptions[:] - def restore_sys_warnoptions(self, saved_options): - sys.warnoptions = saved_options[1] - sys.warnoptions[:] = saved_options[2] - - # Controlling dangling references to Thread objects can make it easier - # to track reference leaks. - def get_threading__dangling(self): - if not threading: - return None - # This copies the weakrefs without making any strong reference - return threading._dangling.copy() - def restore_threading__dangling(self, saved): - if not threading: - return - threading._dangling.clear() - threading._dangling.update(saved) - - # Same for Process objects - def get_multiprocessing_process__dangling(self): - if not multiprocessing: - return None - # Unjoined process objects can survive after process exits - multiprocessing.process._cleanup() - # This copies the weakrefs without making any strong reference - return multiprocessing.process._dangling.copy() - def restore_multiprocessing_process__dangling(self, saved): - if not multiprocessing: - return - multiprocessing.process._dangling.clear() - multiprocessing.process._dangling.update(saved) - - def get_sysconfig__CONFIG_VARS(self): - # make sure the dict is initialized - sysconfig.get_config_var('prefix') - return (id(sysconfig._CONFIG_VARS), sysconfig._CONFIG_VARS, - dict(sysconfig._CONFIG_VARS)) - def restore_sysconfig__CONFIG_VARS(self, saved): - sysconfig._CONFIG_VARS = saved[1] - sysconfig._CONFIG_VARS.clear() - sysconfig._CONFIG_VARS.update(saved[2]) - - def get_sysconfig__INSTALL_SCHEMES(self): - return (id(sysconfig._INSTALL_SCHEMES), sysconfig._INSTALL_SCHEMES, - sysconfig._INSTALL_SCHEMES.copy()) - def restore_sysconfig__INSTALL_SCHEMES(self, saved): - sysconfig._INSTALL_SCHEMES = saved[1] - sysconfig._INSTALL_SCHEMES.clear() - sysconfig._INSTALL_SCHEMES.update(saved[2]) - - def get_files(self): - return sorted(fn + ('/' if os.path.isdir(fn) else '') - for fn in os.listdir()) - def restore_files(self, saved_value): - fn = support.TESTFN - if fn not in saved_value and (fn + '/') not in saved_value: - if os.path.isfile(fn): - support.unlink(fn) - elif os.path.isdir(fn): - support.rmtree(fn) - - _lc = [getattr(locale, lc) for lc in dir(locale) - if lc.startswith('LC_')] - def get_locale(self): - pairings = [] - for lc in self._lc: - try: - pairings.append((lc, locale.setlocale(lc, None))) - except (TypeError, ValueError): - continue - return pairings - def restore_locale(self, saved): - for lc, setting in saved: - locale.setlocale(lc, setting) - - def get_warnings_showwarning(self): - return warnings.showwarning - def restore_warnings_showwarning(self, fxn): - warnings.showwarning = fxn - - def resource_info(self): - for name in self.resources: - method_suffix = name.replace('.', '_') - get_name = 'get_' + method_suffix - restore_name = 'restore_' + method_suffix - yield name, getattr(self, get_name), getattr(self, restore_name) - - def __enter__(self): - self.saved_values = dict((name, get()) for name, get, restore - in self.resource_info()) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - saved_values = self.saved_values - del self.saved_values - for name, get, restore in self.resource_info(): - current = get() - original = saved_values.pop(name) - # Check for changes to the resource's value - if current != original: - self.changed = True - restore(original) - if not self.quiet: - print("Warning -- {} was modified by {}".format( - name, self.testname), - file=sys.stderr) - if self.verbose > 1: - print(" Before: {}\n After: {} ".format( - original, current), - file=sys.stderr) - return False - - -def runtest_inner(test, verbose, quiet, - huntrleaks=False, display_failure=True): - support.unload(test) - - test_time = 0.0 - refleak = False # True if the test leaked references. - try: - if test.startswith('test.'): - abstest = test - else: - # Always import it from the test package - abstest = 'test.' + test - with saved_test_environment(test, verbose, quiet) as environment: - start_time = time.time() - the_module = importlib.import_module(abstest) - # If the test has a test_main, that will run the appropriate - # tests. If not, use normal unittest test loading. - test_runner = getattr(the_module, "test_main", None) - if test_runner is None: - def test_runner(): - loader = unittest.TestLoader() - tests = loader.loadTestsFromModule(the_module) - for error in loader.errors: - print(error, file=sys.stderr) - if loader.errors: - raise Exception("errors while loading tests") - support.run_unittest(tests) - test_runner() - if huntrleaks: - refleak = dash_R(the_module, test, test_runner, huntrleaks) - test_time = time.time() - start_time - except support.ResourceDenied as msg: - if not quiet: - print(test, "skipped --", msg) - sys.stdout.flush() - return RESOURCE_DENIED, test_time - except unittest.SkipTest as msg: - if not quiet: - print(test, "skipped --", msg) - sys.stdout.flush() - return SKIPPED, test_time - except KeyboardInterrupt: - raise - except support.TestFailed as msg: - if display_failure: - print("test", test, "failed --", msg, file=sys.stderr) - else: - print("test", test, "failed", file=sys.stderr) - sys.stderr.flush() - return FAILED, test_time - except: - msg = traceback.format_exc() - print("test", test, "crashed --", msg, file=sys.stderr) - sys.stderr.flush() - return FAILED, test_time - else: - if refleak: - return FAILED, test_time - if environment.changed: - return ENV_CHANGED, test_time - return PASSED, test_time - -def cleanup_test_droppings(testname, verbose): - import shutil - import stat - import gc - - # First kill any dangling references to open files etc. - # This can also issue some ResourceWarnings which would otherwise get - # triggered during the following test run, and possibly produce failures. - gc.collect() - - # Try to clean up junk commonly left behind. While tests shouldn't leave - # any files or directories behind, when a test fails that can be tedious - # for it to arrange. The consequences can be especially nasty on Windows, - # since if a test leaves a file open, it cannot be deleted by name (while - # there's nothing we can do about that here either, we can display the - # name of the offending test, which is a real help). - for name in (support.TESTFN, - "db_home", - ): - if not os.path.exists(name): - continue - - if os.path.isdir(name): - kind, nuker = "directory", shutil.rmtree - elif os.path.isfile(name): - kind, nuker = "file", os.unlink - else: - raise SystemError("os.path says %r exists but is neither " - "directory nor file" % name) - - if verbose: - print("%r left behind %s %r" % (testname, kind, name)) - try: - # if we have chmod, fix possible permissions problems - # that might prevent cleanup - if (hasattr(os, 'chmod')): - os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) - nuker(name) - except Exception as msg: - print(("%r left behind %s %r and it couldn't be " - "removed: %s" % (testname, kind, name, msg)), file=sys.stderr) - -def dash_R(the_module, test, indirect_test, huntrleaks): - """Run a test multiple times, looking for reference leaks. - - Returns: - False if the test didn't leak references; True if we detected refleaks. - """ - # This code is hackish and inelegant, but it seems to do the job. - import copyreg - import collections.abc - - if not hasattr(sys, 'gettotalrefcount'): - raise Exception("Tracking reference leaks requires a debug build " - "of Python") - - # Save current values for dash_R_cleanup() to restore. - fs = warnings.filters[:] - ps = copyreg.dispatch_table.copy() - pic = sys.path_importer_cache.copy() - try: - import zipimport - except ImportError: - zdc = None # Run unmodified on platforms without zipimport support - else: - zdc = zipimport._zip_directory_cache.copy() - abcs = {} - for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]: - if not isabstract(abc): - continue - for obj in abc.__subclasses__() + [abc]: - abcs[obj] = obj._abc_registry.copy() - - nwarmup, ntracked, fname = huntrleaks - fname = os.path.join(support.SAVEDCWD, fname) - repcount = nwarmup + ntracked - rc_deltas = [0] * repcount - alloc_deltas = [0] * repcount - - print("beginning", repcount, "repetitions", file=sys.stderr) - print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr) - sys.stderr.flush() - for i in range(repcount): - indirect_test() - alloc_after, rc_after = dash_R_cleanup(fs, ps, pic, zdc, abcs) - sys.stderr.write('.') - sys.stderr.flush() - if i >= nwarmup: - rc_deltas[i] = rc_after - rc_before - alloc_deltas[i] = alloc_after - alloc_before - alloc_before, rc_before = alloc_after, rc_after - print(file=sys.stderr) - # These checkers return False on success, True on failure - def check_rc_deltas(deltas): - return any(deltas) - def check_alloc_deltas(deltas): - # At least 1/3rd of 0s - if 3 * deltas.count(0) < len(deltas): - return True - # Nothing else than 1s, 0s and -1s - if not set(deltas) <= {1,0,-1}: - return True - return False - failed = False - for deltas, item_name, checker in [ - (rc_deltas, 'references', check_rc_deltas), - (alloc_deltas, 'memory blocks', check_alloc_deltas)]: - if checker(deltas): - msg = '%s leaked %s %s, sum=%s' % ( - test, deltas[nwarmup:], item_name, sum(deltas)) - print(msg, file=sys.stderr) - sys.stderr.flush() - with open(fname, "a") as refrep: - print(msg, file=refrep) - refrep.flush() - failed = True - return failed - -def dash_R_cleanup(fs, ps, pic, zdc, abcs): - import gc, copyreg - import _strptime, linecache - import urllib.parse, urllib.request, mimetypes, doctest - import struct, filecmp, collections.abc - from distutils.dir_util import _path_created - from weakref import WeakSet - - # Clear the warnings registry, so they can be displayed again - for mod in sys.modules.values(): - if hasattr(mod, '__warningregistry__'): - del mod.__warningregistry__ - - # Restore some original values. - warnings.filters[:] = fs - copyreg.dispatch_table.clear() - copyreg.dispatch_table.update(ps) - sys.path_importer_cache.clear() - sys.path_importer_cache.update(pic) - try: - import zipimport - except ImportError: - pass # Run unmodified on platforms without zipimport support - else: - zipimport._zip_directory_cache.clear() - zipimport._zip_directory_cache.update(zdc) - - # clear type cache - sys._clear_type_cache() - - # Clear ABC registries, restoring previously saved ABC registries. - for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]: - if not isabstract(abc): - continue - for obj in abc.__subclasses__() + [abc]: - obj._abc_registry = abcs.get(obj, WeakSet()).copy() - obj._abc_cache.clear() - obj._abc_negative_cache.clear() - - # Flush standard output, so that buffered data is sent to the OS and - # associated Python objects are reclaimed. - for stream in (sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__): - if stream is not None: - stream.flush() - - # Clear assorted module caches. - _path_created.clear() - re.purge() - _strptime._regex_cache.clear() - urllib.parse.clear_cache() - urllib.request.urlcleanup() - linecache.clearcache() - mimetypes._default_mime_types() - filecmp._cache.clear() - struct._clearcache() - doctest.master = None - try: - import ctypes - except ImportError: - # Don't worry about resetting the cache if ctypes is not supported - pass - else: - ctypes._reset_cache() - - # Collect cyclic trash and read memory statistics immediately after. - func1 = sys.getallocatedblocks - func2 = sys.gettotalrefcount - gc.collect() - return func1(), func2() - -def warm_caches(): - # char cache - s = bytes(range(256)) - for i in range(256): - s[i:i+1] - # unicode cache - x = [chr(i) for i in range(256)] - # int cache - x = list(range(-5, 257)) - -def findtestdir(path=None): - return path or os.path.dirname(__file__) or os.curdir - -def removepy(names): - if not names: - return - for idx, name in enumerate(names): - basename, ext = os.path.splitext(name) - if ext == '.py': - names[idx] = basename - -def count(n, word): - if n == 1: - return "%d %s" % (n, word) - else: - return "%d %ss" % (n, word) - -def printlist(x, width=70, indent=4): - """Print the elements of iterable x to stdout. - - Optional arg width (default 70) is the maximum line length. - Optional arg indent (default 4) is the number of blanks with which to - begin each line. - """ - - from textwrap import fill - blanks = ' ' * indent - # Print the sorted list: 'x' may be a '--random' list or a set() - print(fill(' '.join(str(elt) for elt in sorted(x)), width, - initial_indent=blanks, subsequent_indent=blanks)) - - -def main_in_temp_cwd(): - """Run main() in a temporary working directory.""" - if sysconfig.is_python_build(): - try: - os.mkdir(TEMPDIR) - except FileExistsError: - pass - - # Define a writable temp dir that will be used as cwd while running - # the tests. The name of the dir includes the pid to allow parallel - # testing (see the -j option). - test_cwd = 'test_python_{}'.format(os.getpid()) - test_cwd = os.path.join(TEMPDIR, test_cwd) - - # Run the tests in a context manager that temporarily changes the CWD to a - # temporary and writable directory. If it's not possible to create or - # change the CWD, the original CWD will be used. The original CWD is - # available from support.SAVEDCWD. - with support.temp_cwd(test_cwd, quiet=True): - main() +from test.libregrtest import main, main_in_temp_cwd if __name__ == '__main__': diff --git a/Lib/test/test_argparse.py b/Lib/test/test_argparse.py index 27bfad5..893ec39 100644 --- a/Lib/test/test_argparse.py +++ b/Lib/test/test_argparse.py @@ -4512,6 +4512,21 @@ class TestStrings(TestCase): string = "Namespace(bar='spam', foo=42)" self.assertStringEqual(ns, string) + def test_namespace_starkwargs_notidentifier(self): + ns = argparse.Namespace(**{'"': 'quote'}) + string = """Namespace(**{'"': 'quote'})""" + self.assertStringEqual(ns, string) + + def test_namespace_kwargs_and_starkwargs_notidentifier(self): + ns = argparse.Namespace(a=1, **{'"': 'quote'}) + string = """Namespace(a=1, **{'"': 'quote'})""" + self.assertStringEqual(ns, string) + + def test_namespace_starkwargs_identifier(self): + ns = argparse.Namespace(**{'valid': True}) + string = "Namespace(valid=True)" + self.assertStringEqual(ns, string) + def test_parser(self): parser = argparse.ArgumentParser(prog='PROG') string = ( diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index a4a6f95..254c0c1 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -27,6 +27,7 @@ def coding_checker(self, coder): self.assertEqual(coder(input), (expect, len(input))) return check + class Queue(object): """ queue: write bytes at one end, read bytes from the other end @@ -47,6 +48,7 @@ class Queue(object): self._buffer = self._buffer[size:] return s + class MixInCheckStateHandling: def check_state_handling_decode(self, encoding, u, s): for i in range(len(s)+1): @@ -80,6 +82,7 @@ class MixInCheckStateHandling: part2 = d.encode(u[i:], True) self.assertEqual(s, part1+part2) + class ReadTest(MixInCheckStateHandling): def check_partial(self, input, partialresults): # get a StreamReader for the encoding and feed the bytestring version @@ -383,6 +386,7 @@ class ReadTest(MixInCheckStateHandling): self.assertEqual(test_sequence.decode(self.encoding, "backslashreplace"), before + backslashreplace + after) + class UTF32Test(ReadTest, unittest.TestCase): encoding = "utf-32" if sys.byteorder == 'little': @@ -478,6 +482,7 @@ class UTF32Test(ReadTest, unittest.TestCase): self.assertEqual('\U00010000' * 1024, codecs.utf_32_decode(encoded_be)[0]) + class UTF32LETest(ReadTest, unittest.TestCase): encoding = "utf-32-le" ill_formed_sequence = b"\x80\xdc\x00\x00" @@ -523,6 +528,7 @@ class UTF32LETest(ReadTest, unittest.TestCase): self.assertEqual('\U00010000' * 1024, codecs.utf_32_le_decode(encoded)[0]) + class UTF32BETest(ReadTest, unittest.TestCase): encoding = "utf-32-be" ill_formed_sequence = b"\x00\x00\xdc\x80" @@ -797,6 +803,7 @@ class UTF8Test(ReadTest, unittest.TestCase): with self.assertRaises(UnicodeDecodeError): b"abc\xed\xa0z".decode("utf-8", "surrogatepass") + @unittest.skipUnless(sys.platform == 'win32', 'cp65001 is a Windows-only codec') class CP65001Test(ReadTest, unittest.TestCase): @@ -1136,6 +1143,7 @@ class EscapeDecodeTest(unittest.TestCase): self.assertEqual(decode(br"[\x0]\x0", "ignore"), (b"[]", 8)) self.assertEqual(decode(br"[\x0]\x0", "replace"), (b"[?]?", 8)) + class RecodingTest(unittest.TestCase): def test_recoding(self): f = io.BytesIO() @@ -1255,6 +1263,7 @@ for i in punycode_testcases: if len(i)!=2: print(repr(i)) + class PunycodeTest(unittest.TestCase): def test_encode(self): for uni, puny in punycode_testcases: @@ -1274,6 +1283,7 @@ class PunycodeTest(unittest.TestCase): puny = puny.decode("ascii").encode("ascii") self.assertEqual(uni, puny.decode("punycode")) + class UnicodeInternalTest(unittest.TestCase): @unittest.skipUnless(SIZEOF_WCHAR_T == 4, 'specific to 32-bit wchar_t') def test_bug1251300(self): @@ -1528,6 +1538,7 @@ class NameprepTest(unittest.TestCase): except Exception as e: raise support.TestFailed("Test 3.%d: %s" % (pos+1, str(e))) + class IDNACodecTest(unittest.TestCase): def test_builtin_decode(self): self.assertEqual(str(b"python.org", "idna"), "python.org") @@ -1614,6 +1625,7 @@ class IDNACodecTest(unittest.TestCase): self.assertRaises(Exception, b"python.org".decode, "idna", errors) + class CodecsModuleTest(unittest.TestCase): def test_decode(self): @@ -1722,6 +1734,7 @@ class CodecsModuleTest(unittest.TestCase): self.assertRaises(UnicodeError, codecs.decode, b'abc', 'undefined', errors) + class StreamReaderTest(unittest.TestCase): def setUp(self): @@ -1732,6 +1745,7 @@ class StreamReaderTest(unittest.TestCase): f = self.reader(self.stream) self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00']) + class EncodedFileTest(unittest.TestCase): def test_basic(self): @@ -1862,6 +1876,7 @@ broken_unicode_with_stateful = [ "unicode_internal" ] + class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): def test_basics(self): s = "abc123" # all codecs should be able to encode these @@ -2024,6 +2039,7 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): self.check_state_handling_decode(encoding, u, u.encode(encoding)) self.check_state_handling_encode(encoding, u, u.encode(encoding)) + class CharmapTest(unittest.TestCase): def test_decode_with_string_map(self): self.assertEqual( @@ -2274,6 +2290,7 @@ class WithStmtTest(unittest.TestCase): info.streamwriter, 'strict') as srw: self.assertEqual(srw.read(), "\xfc") + class TypesTest(unittest.TestCase): def test_decode_unicode(self): # Most decoders don't accept unicode input @@ -2564,6 +2581,7 @@ else: bytes_transform_encodings.append("bz2_codec") transform_aliases["bz2_codec"] = ["bz2"] + class TransformCodecTest(unittest.TestCase): def test_basics(self): @@ -3041,5 +3059,79 @@ class CodePageTest(unittest.TestCase): self.assertEqual(decoded, ('abc', 3)) +class ASCIITest(unittest.TestCase): + def test_encode(self): + self.assertEqual('abc123'.encode('ascii'), b'abc123') + + def test_encode_error(self): + for data, error_handler, expected in ( + ('[\x80\xff\u20ac]', 'ignore', b'[]'), + ('[\x80\xff\u20ac]', 'replace', b'[???]'), + ('[\x80\xff\u20ac]', 'xmlcharrefreplace', b'[€ÿ€]'), + ('[\x80\xff\u20ac]', 'backslashreplace', b'[\\x80\\xff\\u20ac]'), + ('[\udc80\udcff]', 'surrogateescape', b'[\x80\xff]'), + ): + with self.subTest(data=data, error_handler=error_handler, + expected=expected): + self.assertEqual(data.encode('ascii', error_handler), + expected) + + def test_encode_surrogateescape_error(self): + with self.assertRaises(UnicodeEncodeError): + # the first character can be decoded, but not the second + '\udc80\xff'.encode('ascii', 'surrogateescape') + + def test_decode(self): + self.assertEqual(b'abc'.decode('ascii'), 'abc') + + def test_decode_error(self): + for data, error_handler, expected in ( + (b'[\x80\xff]', 'ignore', '[]'), + (b'[\x80\xff]', 'replace', '[\ufffd\ufffd]'), + (b'[\x80\xff]', 'surrogateescape', '[\udc80\udcff]'), + (b'[\x80\xff]', 'backslashreplace', '[\\x80\\xff]'), + ): + with self.subTest(data=data, error_handler=error_handler, + expected=expected): + self.assertEqual(data.decode('ascii', error_handler), + expected) + + +class Latin1Test(unittest.TestCase): + def test_encode(self): + for data, expected in ( + ('abc', b'abc'), + ('\x80\xe9\xff', b'\x80\xe9\xff'), + ): + with self.subTest(data=data, expected=expected): + self.assertEqual(data.encode('latin1'), expected) + + def test_encode_errors(self): + for data, error_handler, expected in ( + ('[\u20ac\udc80]', 'ignore', b'[]'), + ('[\u20ac\udc80]', 'replace', b'[??]'), + ('[\u20ac\udc80]', 'backslashreplace', b'[\\u20ac\\udc80]'), + ('[\u20ac\udc80]', 'xmlcharrefreplace', b'[€�]'), + ('[\udc80\udcff]', 'surrogateescape', b'[\x80\xff]'), + ): + with self.subTest(data=data, error_handler=error_handler, + expected=expected): + self.assertEqual(data.encode('latin1', error_handler), + expected) + + def test_encode_surrogateescape_error(self): + with self.assertRaises(UnicodeEncodeError): + # the first character can be decoded, but not the second + '\udc80\u20ac'.encode('latin1', 'surrogateescape') + + def test_decode(self): + for data, expected in ( + (b'abc', 'abc'), + (b'[\x80\xff]', '[\x80\xff]'), + ): + with self.subTest(data=data, expected=expected): + self.assertEqual(data.decode('latin1'), expected) + + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_collections.py b/Lib/test/test_collections.py index 4124f91..cd238bc 100644 --- a/Lib/test/test_collections.py +++ b/Lib/test/test_collections.py @@ -2002,6 +2002,14 @@ class OrderedDictTests: od = OrderedDict(**d) self.assertGreater(sys.getsizeof(od), sys.getsizeof(d)) + def test_views(self): + OrderedDict = self.module.OrderedDict + # See http://bugs.python.org/issue24286 + s = 'the quick brown fox jumped over a lazy dog yesterday before dawn'.split() + od = OrderedDict.fromkeys(s) + self.assertEqual(od.keys(), dict(od).keys()) + self.assertEqual(od.items(), dict(od).items()) + def test_override_update(self): OrderedDict = self.module.OrderedDict # Verify that subclasses can override update() without breaking __init__() diff --git a/Lib/test/test_deque.py b/Lib/test/test_deque.py index 8718716..c61e80b 100644 --- a/Lib/test/test_deque.py +++ b/Lib/test/test_deque.py @@ -654,6 +654,15 @@ class TestBasic(unittest.TestCase): self.assertNotEqual(id(d), id(e)) self.assertEqual(list(d), list(e)) + for i in range(5): + for maxlen in range(-1, 6): + s = [random.random() for j in range(i)] + d = deque(s) if maxlen == -1 else deque(s, maxlen) + e = d.copy() + self.assertEqual(d, e) + self.assertEqual(d.maxlen, e.maxlen) + self.assertTrue(all(x is y for x, y in zip(d, e))) + def test_copy_method(self): mut = [10] d = deque([mut]) diff --git a/Lib/test/test_dictviews.py b/Lib/test/test_dictviews.py index 8d33801..fcb6814 100644 --- a/Lib/test/test_dictviews.py +++ b/Lib/test/test_dictviews.py @@ -1,3 +1,4 @@ +import collections import unittest class DictSetTest(unittest.TestCase): @@ -197,6 +198,27 @@ class DictSetTest(unittest.TestCase): d[42] = d.values() self.assertRaises(RecursionError, repr, d) + def test_abc_registry(self): + d = dict(a=1) + + self.assertIsInstance(d.keys(), collections.KeysView) + self.assertIsInstance(d.keys(), collections.MappingView) + self.assertIsInstance(d.keys(), collections.Set) + self.assertIsInstance(d.keys(), collections.Sized) + self.assertIsInstance(d.keys(), collections.Iterable) + self.assertIsInstance(d.keys(), collections.Container) + + self.assertIsInstance(d.values(), collections.ValuesView) + self.assertIsInstance(d.values(), collections.MappingView) + self.assertIsInstance(d.values(), collections.Sized) + + self.assertIsInstance(d.items(), collections.ItemsView) + self.assertIsInstance(d.items(), collections.MappingView) + self.assertIsInstance(d.items(), collections.Set) + self.assertIsInstance(d.items(), collections.Sized) + self.assertIsInstance(d.items(), collections.Iterable) + self.assertIsInstance(d.items(), collections.Container) + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_eintr.py b/Lib/test/test_eintr.py index aabad83..d3cdda0 100644 --- a/Lib/test/test_eintr.py +++ b/Lib/test/test_eintr.py @@ -16,14 +16,7 @@ class EINTRTests(unittest.TestCase): # Run the tester in a sub-process, to make sure there is only one # thread (for reliable signal delivery). tester = support.findfile("eintr_tester.py", subdir="eintrdata") - - if support.verbose: - args = [sys.executable, tester] - with subprocess.Popen(args) as proc: - exitcode = proc.wait() - self.assertEqual(exitcode, 0) - else: - script_helper.assert_python_ok(tester) + script_helper.assert_python_ok(tester) if __name__ == "__main__": diff --git a/Lib/test/test_enum.py b/Lib/test/test_enum.py index 4b5d0d0..0f7b769 100644 --- a/Lib/test/test_enum.py +++ b/Lib/test/test_enum.py @@ -270,6 +270,13 @@ class TestEnum(unittest.TestCase): class Wrong(Enum): _any_name_ = 9 + def test_bool(self): + class Logic(Enum): + true = True + false = False + self.assertTrue(Logic.true) + self.assertFalse(Logic.false) + def test_contains(self): Season = self.Season self.assertIn(Season.AUTUMN, Season) diff --git a/Lib/test/test_file.py b/Lib/test/test_file.py index 4e392b7..67c3d86 100644 --- a/Lib/test/test_file.py +++ b/Lib/test/test_file.py @@ -139,7 +139,7 @@ class OtherFileTests: def testModeStrings(self): # check invalid mode strings - for mode in ("", "aU", "wU+"): + for mode in ("", "aU", "wU+", "U+", "+U", "rU+"): try: f = self.open(TESTFN, mode) except ValueError: diff --git a/Lib/test/test_fstring.py b/Lib/test/test_fstring.py new file mode 100644 index 0000000..d6f781c --- /dev/null +++ b/Lib/test/test_fstring.py @@ -0,0 +1,734 @@ +import ast +import types +import decimal +import unittest + +a_global = 'global variable' + +# You could argue that I'm too strict in looking for specific error +# values with assertRaisesRegex, but without it it's way too easy to +# make a syntax error in the test strings. Especially with all of the +# triple quotes, raw strings, backslashes, etc. I think it's a +# worthwhile tradeoff. When I switched to this method, I found many +# examples where I wasn't testing what I thought I was. + +class TestCase(unittest.TestCase): + def assertAllRaise(self, exception_type, regex, error_strings): + for str in error_strings: + with self.subTest(str=str): + with self.assertRaisesRegex(exception_type, regex): + eval(str) + + def test__format__lookup(self): + # Make sure __format__ is looked up on the type, not the instance. + class X: + def __format__(self, spec): + return 'class' + + x = X() + + # Add a bound __format__ method to the 'y' instance, but not + # the 'x' instance. + y = X() + y.__format__ = types.MethodType(lambda self, spec: 'instance', y) + + self.assertEqual(f'{y}', format(y)) + self.assertEqual(f'{y}', 'class') + self.assertEqual(format(x), format(y)) + + # __format__ is not called this way, but still make sure it + # returns what we expect (so we can make sure we're bypassing + # it). + self.assertEqual(x.__format__(''), 'class') + self.assertEqual(y.__format__(''), 'instance') + + # This is how __format__ is actually called. + self.assertEqual(type(x).__format__(x, ''), 'class') + self.assertEqual(type(y).__format__(y, ''), 'class') + + def test_ast(self): + # Inspired by http://bugs.python.org/issue24975 + class X: + def __init__(self): + self.called = False + def __call__(self): + self.called = True + return 4 + x = X() + expr = """ +a = 10 +f'{a * x()}'""" + t = ast.parse(expr) + c = compile(t, '', 'exec') + + # Make sure x was not called. + self.assertFalse(x.called) + + # Actually run the code. + exec(c) + + # Make sure x was called. + self.assertTrue(x.called) + + def test_literal_eval(self): + # With no expressions, an f-string is okay. + self.assertEqual(ast.literal_eval("f'x'"), 'x') + self.assertEqual(ast.literal_eval("f'x' 'y'"), 'xy') + + # But this should raise an error. + with self.assertRaisesRegex(ValueError, 'malformed node or string'): + ast.literal_eval("f'x{3}'") + + # As should this, which uses a different ast node + with self.assertRaisesRegex(ValueError, 'malformed node or string'): + ast.literal_eval("f'{3}'") + + def test_ast_compile_time_concat(self): + x = [''] + + expr = """x[0] = 'foo' f'{3}'""" + t = ast.parse(expr) + c = compile(t, '', 'exec') + exec(c) + self.assertEqual(x[0], 'foo3') + + def test_literal(self): + self.assertEqual(f'', '') + self.assertEqual(f'a', 'a') + self.assertEqual(f' ', ' ') + self.assertEqual(f'\N{GREEK CAPITAL LETTER DELTA}', + '\N{GREEK CAPITAL LETTER DELTA}') + self.assertEqual(f'\N{GREEK CAPITAL LETTER DELTA}', + '\u0394') + self.assertEqual(f'\N{True}', '\u22a8') + self.assertEqual(rf'\N{True}', r'\NTrue') + + def test_escape_order(self): + # note that hex(ord('{')) == 0x7b, so this + # string becomes f'a{4*10}b' + self.assertEqual(f'a\u007b4*10}b', 'a40b') + self.assertEqual(f'a\x7b4*10}b', 'a40b') + self.assertEqual(f'a\x7b4*10\N{RIGHT CURLY BRACKET}b', 'a40b') + self.assertEqual(f'{"a"!\N{LATIN SMALL LETTER R}}', "'a'") + self.assertEqual(f'{10\x3a02X}', '0A') + self.assertEqual(f'{10:02\N{LATIN CAPITAL LETTER X}}', '0A') + + self.assertAllRaise(SyntaxError, "f-string: single '}' is not allowed", + [r"""f'a{\u007b4*10}b'""", # mis-matched brackets + ]) + self.assertAllRaise(SyntaxError, 'unexpected character after line continuation character', + [r"""f'{"a"\!r}'""", + r"""f'{a\!r}'""", + ]) + + def test_unterminated_string(self): + self.assertAllRaise(SyntaxError, 'f-string: unterminated string', + [r"""f'{"x'""", + r"""f'{"x}'""", + r"""f'{("x'""", + r"""f'{("x}'""", + ]) + + def test_mismatched_parens(self): + self.assertAllRaise(SyntaxError, 'f-string: mismatched', + ["f'{((}'", + ]) + + def test_double_braces(self): + self.assertEqual(f'{{', '{') + self.assertEqual(f'a{{', 'a{') + self.assertEqual(f'{{b', '{b') + self.assertEqual(f'a{{b', 'a{b') + self.assertEqual(f'}}', '}') + self.assertEqual(f'a}}', 'a}') + self.assertEqual(f'}}b', '}b') + self.assertEqual(f'a}}b', 'a}b') + + self.assertEqual(f'{{{10}', '{10') + self.assertEqual(f'}}{10}', '}10') + self.assertEqual(f'}}{{{10}', '}{10') + self.assertEqual(f'}}a{{{10}', '}a{10') + + self.assertEqual(f'{10}{{', '10{') + self.assertEqual(f'{10}}}', '10}') + self.assertEqual(f'{10}}}{{', '10}{') + self.assertEqual(f'{10}}}a{{' '}', '10}a{}') + + # Inside of strings, don't interpret doubled brackets. + self.assertEqual(f'{"{{}}"}', '{{}}') + + self.assertAllRaise(TypeError, 'unhashable type', + ["f'{ {{}} }'", # dict in a set + ]) + + def test_compile_time_concat(self): + x = 'def' + self.assertEqual('abc' f'## {x}ghi', 'abc## defghi') + self.assertEqual('abc' f'{x}' 'ghi', 'abcdefghi') + self.assertEqual('abc' f'{x}' 'gh' f'i{x:4}', 'abcdefghidef ') + self.assertEqual('{x}' f'{x}', '{x}def') + self.assertEqual('{x' f'{x}', '{xdef') + self.assertEqual('{x}' f'{x}', '{x}def') + self.assertEqual('{{x}}' f'{x}', '{{x}}def') + self.assertEqual('{{x' f'{x}', '{{xdef') + self.assertEqual('x}}' f'{x}', 'x}}def') + self.assertEqual(f'{x}' 'x}}', 'defx}}') + self.assertEqual(f'{x}' '', 'def') + self.assertEqual('' f'{x}' '', 'def') + self.assertEqual('' f'{x}', 'def') + self.assertEqual(f'{x}' '2', 'def2') + self.assertEqual('1' f'{x}' '2', '1def2') + self.assertEqual('1' f'{x}', '1def') + self.assertEqual(f'{x}' f'-{x}', 'def-def') + self.assertEqual('' f'', '') + self.assertEqual('' f'' '', '') + self.assertEqual('' f'' '' f'', '') + self.assertEqual(f'', '') + self.assertEqual(f'' '', '') + self.assertEqual(f'' '' f'', '') + self.assertEqual(f'' '' f'' '', '') + + self.assertAllRaise(SyntaxError, "f-string: expecting '}'", + ["f'{3' f'}'", # can't concat to get a valid f-string + ]) + + def test_comments(self): + # These aren't comments, since they're in strings. + d = {'#': 'hash'} + self.assertEqual(f'{"#"}', '#') + self.assertEqual(f'{d["#"]}', 'hash') + + self.assertAllRaise(SyntaxError, "f-string cannot include '#'", + ["f'{1#}'", # error because the expression becomes "(1#)" + "f'{3(#)}'", + ]) + + def test_many_expressions(self): + # Create a string with many expressions in it. Note that + # because we have a space in here as a literal, we're actually + # going to use twice as many ast nodes: one for each literal + # plus one for each expression. + def build_fstr(n, extra=''): + return "f'" + ('{x} ' * n) + extra + "'" + + x = 'X' + width = 1 + + # Test around 256. + for i in range(250, 260): + self.assertEqual(eval(build_fstr(i)), (x+' ')*i) + + # Test concatenating 2 largs fstrings. + self.assertEqual(eval(build_fstr(255)*256), (x+' ')*(255*256)) + + s = build_fstr(253, '{x:{width}} ') + self.assertEqual(eval(s), (x+' ')*254) + + # Test lots of expressions and constants, concatenated. + s = "f'{1}' 'x' 'y'" * 1024 + self.assertEqual(eval(s), '1xy' * 1024) + + def test_format_specifier_expressions(self): + width = 10 + precision = 4 + value = decimal.Decimal('12.34567') + self.assertEqual(f'result: {value:{width}.{precision}}', 'result: 12.35') + self.assertEqual(f'result: {value:{width!r}.{precision}}', 'result: 12.35') + self.assertEqual(f'result: {value:{width:0}.{precision:1}}', 'result: 12.35') + self.assertEqual(f'result: {value:{1}{0:0}.{precision:1}}', 'result: 12.35') + self.assertEqual(f'result: {value:{ 1}{ 0:0}.{ precision:1}}', 'result: 12.35') + self.assertEqual(f'{10:#{1}0x}', ' 0xa') + self.assertEqual(f'{10:{"#"}1{0}{"x"}}', ' 0xa') + self.assertEqual(f'{-10:-{"#"}1{0}x}', ' -0xa') + self.assertEqual(f'{-10:{"-"}#{1}0{"x"}}', ' -0xa') + self.assertEqual(f'{10:#{3 != {4:5} and width}x}', ' 0xa') + + self.assertAllRaise(SyntaxError, "f-string: expecting '}'", + ["""f'{"s"!r{":10"}}'""", + + # This looks like a nested format spec. + ]) + + self.assertAllRaise(SyntaxError, "invalid syntax", + [# Invalid sytax inside a nested spec. + "f'{4:{/5}}'", + ]) + + self.assertAllRaise(SyntaxError, "f-string: expressions nested too deeply", + [# Can't nest format specifiers. + "f'result: {value:{width:{0}}.{precision:1}}'", + ]) + + self.assertAllRaise(SyntaxError, 'f-string: invalid conversion character', + [# No expansion inside conversion or for + # the : or ! itself. + """f'{"s"!{"r"}}'""", + ]) + + def test_side_effect_order(self): + class X: + def __init__(self): + self.i = 0 + def __format__(self, spec): + self.i += 1 + return str(self.i) + + x = X() + self.assertEqual(f'{x} {x}', '1 2') + + def test_missing_expression(self): + self.assertAllRaise(SyntaxError, 'f-string: empty expression not allowed', + ["f'{}'", + "f'{ }'" + "f' {} '", + "f'{!r}'", + "f'{ !r}'", + "f'{10:{ }}'", + "f' { } '", + r"f'{\n}'", + r"f'{\n \n}'", + + # Catch the empty expression before the + # invalid conversion. + "f'{!x}'", + "f'{ !xr}'", + "f'{!x:}'", + "f'{!x:a}'", + "f'{ !xr:}'", + "f'{ !xr:a}'", + + "f'{!}'", + "f'{:}'", + + # We find the empty expression before the + # missing closing brace. + "f'{!'", + "f'{!s:'", + "f'{:'", + "f'{:x'", + ]) + + def test_parens_in_expressions(self): + self.assertEqual(f'{3,}', '(3,)') + + # Add these because when an expression is evaluated, parens + # are added around it. But we shouldn't go from an invalid + # expression to a valid one. The added parens are just + # supposed to allow whitespace (including newlines). + self.assertAllRaise(SyntaxError, 'invalid syntax', + ["f'{,}'", + "f'{,}'", # this is (,), which is an error + ]) + + self.assertAllRaise(SyntaxError, "f-string: expecting '}'", + ["f'{3)+(4}'", + ]) + + self.assertAllRaise(SyntaxError, 'EOL while scanning string literal', + ["f'{\n}'", + ]) + + def test_newlines_in_expressions(self): + self.assertEqual(f'{0}', '0') + self.assertEqual(f'{0\n}', '0') + self.assertEqual(f'{0\r}', '0') + self.assertEqual(f'{\n0\n}', '0') + self.assertEqual(f'{\r0\r}', '0') + self.assertEqual(f'{\n0\r}', '0') + self.assertEqual(f'{\n0}', '0') + self.assertEqual(f'{3+\n4}', '7') + self.assertEqual(f'{3+\\\n4}', '7') + self.assertEqual(rf'''{3+ +4}''', '7') + self.assertEqual(f'''{3+\ +4}''', '7') + + self.assertAllRaise(SyntaxError, 'f-string: empty expression not allowed', + [r"f'{\n}'", + ]) + + def test_lambda(self): + x = 5 + self.assertEqual(f'{(lambda y:x*y)("8")!r}', "'88888'") + self.assertEqual(f'{(lambda y:x*y)("8")!r:10}', "'88888' ") + self.assertEqual(f'{(lambda y:x*y)("8"):10}', "88888 ") + + # lambda doesn't work without parens, because the colon + # makes the parser think it's a format_spec + self.assertAllRaise(SyntaxError, 'unexpected EOF while parsing', + ["f'{lambda x:x}'", + ]) + + def test_yield(self): + # Not terribly useful, but make sure the yield turns + # a function into a generator + def fn(y): + f'y:{yield y*2}' + + g = fn(4) + self.assertEqual(next(g), 8) + + def test_yield_send(self): + def fn(x): + yield f'x:{yield (lambda i: x * i)}' + + g = fn(10) + the_lambda = next(g) + self.assertEqual(the_lambda(4), 40) + self.assertEqual(g.send('string'), 'x:string') + + def test_expressions_with_triple_quoted_strings(self): + self.assertEqual(f"{'''x'''}", 'x') + self.assertEqual(f"{'''eric's'''}", "eric's") + self.assertEqual(f'{"""eric\'s"""}', "eric's") + self.assertEqual(f"{'''eric\"s'''}", 'eric"s') + self.assertEqual(f'{"""eric"s"""}', 'eric"s') + + # Test concatenation within an expression + self.assertEqual(f'{"x" """eric"s""" "y"}', 'xeric"sy') + self.assertEqual(f'{"x" """eric"s"""}', 'xeric"s') + self.assertEqual(f'{"""eric"s""" "y"}', 'eric"sy') + self.assertEqual(f'{"""x""" """eric"s""" "y"}', 'xeric"sy') + self.assertEqual(f'{"""x""" """eric"s""" """y"""}', 'xeric"sy') + self.assertEqual(f'{r"""x""" """eric"s""" """y"""}', 'xeric"sy') + + def test_multiple_vars(self): + x = 98 + y = 'abc' + self.assertEqual(f'{x}{y}', '98abc') + + self.assertEqual(f'X{x}{y}', 'X98abc') + self.assertEqual(f'{x}X{y}', '98Xabc') + self.assertEqual(f'{x}{y}X', '98abcX') + + self.assertEqual(f'X{x}Y{y}', 'X98Yabc') + self.assertEqual(f'X{x}{y}Y', 'X98abcY') + self.assertEqual(f'{x}X{y}Y', '98XabcY') + + self.assertEqual(f'X{x}Y{y}Z', 'X98YabcZ') + + def test_closure(self): + def outer(x): + def inner(): + return f'x:{x}' + return inner + + self.assertEqual(outer('987')(), 'x:987') + self.assertEqual(outer(7)(), 'x:7') + + def test_arguments(self): + y = 2 + def f(x, width): + return f'x={x*y:{width}}' + + self.assertEqual(f('foo', 10), 'x=foofoo ') + x = 'bar' + self.assertEqual(f(10, 10), 'x= 20') + + def test_locals(self): + value = 123 + self.assertEqual(f'v:{value}', 'v:123') + + def test_missing_variable(self): + with self.assertRaises(NameError): + f'v:{value}' + + def test_missing_format_spec(self): + class O: + def __format__(self, spec): + if not spec: + return '*' + return spec + + self.assertEqual(f'{O():x}', 'x') + self.assertEqual(f'{O()}', '*') + self.assertEqual(f'{O():}', '*') + + self.assertEqual(f'{3:}', '3') + self.assertEqual(f'{3!s:}', '3') + + def test_global(self): + self.assertEqual(f'g:{a_global}', 'g:global variable') + self.assertEqual(f'g:{a_global!r}', "g:'global variable'") + + a_local = 'local variable' + self.assertEqual(f'g:{a_global} l:{a_local}', + 'g:global variable l:local variable') + self.assertEqual(f'g:{a_global!r}', + "g:'global variable'") + self.assertEqual(f'g:{a_global} l:{a_local!r}', + "g:global variable l:'local variable'") + + self.assertIn("module 'unittest' from", f'{unittest}') + + def test_shadowed_global(self): + a_global = 'really a local' + self.assertEqual(f'g:{a_global}', 'g:really a local') + self.assertEqual(f'g:{a_global!r}', "g:'really a local'") + + a_local = 'local variable' + self.assertEqual(f'g:{a_global} l:{a_local}', + 'g:really a local l:local variable') + self.assertEqual(f'g:{a_global!r}', + "g:'really a local'") + self.assertEqual(f'g:{a_global} l:{a_local!r}', + "g:really a local l:'local variable'") + + def test_call(self): + def foo(x): + return 'x=' + str(x) + + self.assertEqual(f'{foo(10)}', 'x=10') + + def test_nested_fstrings(self): + y = 5 + self.assertEqual(f'{f"{0}"*3}', '000') + self.assertEqual(f'{f"{y}"*3}', '555') + self.assertEqual(f'{f"{\'x\'}"*3}', 'xxx') + + self.assertEqual(f"{r'x' f'{\"s\"}'}", 'xs') + self.assertEqual(f"{r'x'rf'{\"s\"}'}", 'xs') + + def test_invalid_string_prefixes(self): + self.assertAllRaise(SyntaxError, 'unexpected EOF while parsing', + ["fu''", + "uf''", + "Fu''", + "fU''", + "Uf''", + "uF''", + "ufr''", + "urf''", + "fur''", + "fru''", + "rfu''", + "ruf''", + "FUR''", + "Fur''", + ]) + + def test_leading_trailing_spaces(self): + self.assertEqual(f'{ 3}', '3') + self.assertEqual(f'{ 3}', '3') + self.assertEqual(f'{\t3}', '3') + self.assertEqual(f'{\t\t3}', '3') + self.assertEqual(f'{3 }', '3') + self.assertEqual(f'{3 }', '3') + self.assertEqual(f'{3\t}', '3') + self.assertEqual(f'{3\t\t}', '3') + + self.assertEqual(f'expr={ {x: y for x, y in [(1, 2), ]}}', + 'expr={1: 2}') + self.assertEqual(f'expr={ {x: y for x, y in [(1, 2), ]} }', + 'expr={1: 2}') + + def test_character_name(self): + self.assertEqual(f'{4}\N{GREEK CAPITAL LETTER DELTA}{3}', + '4\N{GREEK CAPITAL LETTER DELTA}3') + self.assertEqual(f'{{}}\N{GREEK CAPITAL LETTER DELTA}{3}', + '{}\N{GREEK CAPITAL LETTER DELTA}3') + + def test_not_equal(self): + # There's a special test for this because there's a special + # case in the f-string parser to look for != as not ending an + # expression. Normally it would, while looking for !s or !r. + + self.assertEqual(f'{3!=4}', 'True') + self.assertEqual(f'{3!=4:}', 'True') + self.assertEqual(f'{3!=4!s}', 'True') + self.assertEqual(f'{3!=4!s:.3}', 'Tru') + + def test_conversions(self): + self.assertEqual(f'{3.14:10.10}', ' 3.14') + self.assertEqual(f'{3.14!s:10.10}', '3.14 ') + self.assertEqual(f'{3.14!r:10.10}', '3.14 ') + self.assertEqual(f'{3.14!a:10.10}', '3.14 ') + + self.assertEqual(f'{"a"}', 'a') + self.assertEqual(f'{"a"!r}', "'a'") + self.assertEqual(f'{"a"!a}', "'a'") + + # Not a conversion. + self.assertEqual(f'{"a!r"}', "a!r") + + # Not a conversion, but show that ! is allowed in a format spec. + self.assertEqual(f'{3.14:!<10.10}', '3.14!!!!!!') + + self.assertEqual(f'{"\N{GREEK CAPITAL LETTER DELTA}"}', '\u0394') + self.assertEqual(f'{"\N{GREEK CAPITAL LETTER DELTA}"!r}', "'\u0394'") + self.assertEqual(f'{"\N{GREEK CAPITAL LETTER DELTA}"!a}', "'\\u0394'") + + self.assertAllRaise(SyntaxError, 'f-string: invalid conversion character', + ["f'{3!g}'", + "f'{3!A}'", + "f'{3!A}'", + "f'{3!A}'", + "f'{3!!}'", + "f'{3!:}'", + "f'{3!\N{GREEK CAPITAL LETTER DELTA}}'", + "f'{3! s}'", # no space before conversion char + "f'{x!\\x00:.<10}'", + ]) + + self.assertAllRaise(SyntaxError, "f-string: expecting '}'", + ["f'{x!s{y}}'", + "f'{3!ss}'", + "f'{3!ss:}'", + "f'{3!ss:s}'", + ]) + + def test_assignment(self): + self.assertAllRaise(SyntaxError, 'invalid syntax', + ["f'' = 3", + "f'{0}' = x", + "f'{x}' = x", + ]) + + def test_del(self): + self.assertAllRaise(SyntaxError, 'invalid syntax', + ["del f''", + "del '' f''", + ]) + + def test_mismatched_braces(self): + self.assertAllRaise(SyntaxError, "f-string: single '}' is not allowed", + ["f'{{}'", + "f'{{}}}'", + "f'}'", + "f'x}'", + "f'x}x'", + + # Can't have { or } in a format spec. + "f'{3:}>10}'", + r"f'{3:\\}>10}'", + "f'{3:}}>10}'", + ]) + + self.assertAllRaise(SyntaxError, "f-string: expecting '}'", + ["f'{3:{{>10}'", + "f'{3'", + "f'{3!'", + "f'{3:'", + "f'{3!s'", + "f'{3!s:'", + "f'{3!s:3'", + "f'x{'", + "f'x{x'", + "f'{3:s'", + "f'{{{'", + "f'{{}}{'", + "f'{'", + ]) + + self.assertAllRaise(SyntaxError, 'invalid syntax', + [r"f'{3:\\{>10}'", + ]) + + # But these are just normal strings. + self.assertEqual(f'{"{"}', '{') + self.assertEqual(f'{"}"}', '}') + self.assertEqual(f'{3:{"}"}>10}', '}}}}}}}}}3') + self.assertEqual(f'{2:{"{"}>10}', '{{{{{{{{{2') + + def test_if_conditional(self): + # There's special logic in compile.c to test if the + # conditional for an if (and while) are constants. Exercise + # that code. + + def test_fstring(x, expected): + flag = 0 + if f'{x}': + flag = 1 + else: + flag = 2 + self.assertEqual(flag, expected) + + def test_concat_empty(x, expected): + flag = 0 + if '' f'{x}': + flag = 1 + else: + flag = 2 + self.assertEqual(flag, expected) + + def test_concat_non_empty(x, expected): + flag = 0 + if ' ' f'{x}': + flag = 1 + else: + flag = 2 + self.assertEqual(flag, expected) + + test_fstring('', 2) + test_fstring(' ', 1) + + test_concat_empty('', 2) + test_concat_empty(' ', 1) + + test_concat_non_empty('', 1) + test_concat_non_empty(' ', 1) + + def test_empty_format_specifier(self): + x = 'test' + self.assertEqual(f'{x}', 'test') + self.assertEqual(f'{x:}', 'test') + self.assertEqual(f'{x!s:}', 'test') + self.assertEqual(f'{x!r:}', "'test'") + + def test_str_format_differences(self): + d = {'a': 'string', + 0: 'integer', + } + a = 0 + self.assertEqual(f'{d[0]}', 'integer') + self.assertEqual(f'{d["a"]}', 'string') + self.assertEqual(f'{d[a]}', 'integer') + self.assertEqual('{d[a]}'.format(d=d), 'string') + self.assertEqual('{d[0]}'.format(d=d), 'integer') + + def test_invalid_expressions(self): + self.assertAllRaise(SyntaxError, 'invalid syntax', + [r"f'{a[4)}'", + r"f'{a(4]}'", + ]) + + def test_loop(self): + for i in range(1000): + self.assertEqual(f'i:{i}', 'i:' + str(i)) + + def test_dict(self): + d = {'"': 'dquote', + "'": 'squote', + 'foo': 'bar', + } + self.assertEqual(f'{d["\'"]}', 'squote') + self.assertEqual(f"{d['\"']}", 'dquote') + + self.assertEqual(f'''{d["'"]}''', 'squote') + self.assertEqual(f"""{d['"']}""", 'dquote') + + self.assertEqual(f'{d["foo"]}', 'bar') + self.assertEqual(f"{d['foo']}", 'bar') + self.assertEqual(f'{d[\'foo\']}', 'bar') + self.assertEqual(f"{d[\"foo\"]}", 'bar') + + def test_escaped_quotes(self): + d = {'"': 'a', + "'": 'b'} + + self.assertEqual(fr"{d['\"']}", 'a') + self.assertEqual(fr'{d["\'"]}', 'b') + self.assertEqual(fr"{'\"'}", '"') + self.assertEqual(fr'{"\'"}', "'") + self.assertEqual(f'{"\\"3"}', '"3') + + self.assertAllRaise(SyntaxError, 'f-string: unterminated string', + [r'''f'{"""\\}' ''', # Backslash at end of expression + ]) + self.assertAllRaise(SyntaxError, 'unexpected character after line continuation', + [r"rf'{3\}'", + ]) + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_grammar.py b/Lib/test/test_grammar.py index ec3d783..8f8d71c 100644 --- a/Lib/test/test_grammar.py +++ b/Lib/test/test_grammar.py @@ -295,6 +295,10 @@ class GrammarTests(unittest.TestCase): pos2key2dict(1,2,k2=100,tokwarg1=100,tokwarg2=200) pos2key2dict(1,2,tokwarg1=100,tokwarg2=200, k2=100) + self.assertRaises(SyntaxError, eval, "def f(*): pass") + self.assertRaises(SyntaxError, eval, "def f(*,): pass") + self.assertRaises(SyntaxError, eval, "def f(*, **kwds): pass") + # keyword arguments after *arglist def f(*args, **kwargs): return args, kwargs @@ -352,6 +356,23 @@ class GrammarTests(unittest.TestCase): check_syntax_error(self, "f(*g(1=2))") check_syntax_error(self, "f(**g(1=2))") + # Check trailing commas are permitted in funcdef argument list + def f(a,): pass + def f(*args,): pass + def f(**kwds,): pass + def f(a, *args,): pass + def f(a, **kwds,): pass + def f(*args, b,): pass + def f(*, b,): pass + def f(*args, **kwds,): pass + def f(a, *args, b,): pass + def f(a, *, b,): pass + def f(a, *args, **kwds,): pass + def f(*args, b, **kwds,): pass + def f(*, b, **kwds,): pass + def f(a, *args, b, **kwds,): pass + def f(a, *, b, **kwds,): pass + def test_lambdef(self): ### lambdef: 'lambda' [varargslist] ':' test l1 = lambda : 0 @@ -370,6 +391,23 @@ class GrammarTests(unittest.TestCase): self.assertEqual(l6(1,2), 1+2+20) self.assertEqual(l6(1,2,k=10), 1+2+10) + # check that trailing commas are permitted + l10 = lambda a,: 0 + l11 = lambda *args,: 0 + l12 = lambda **kwds,: 0 + l13 = lambda a, *args,: 0 + l14 = lambda a, **kwds,: 0 + l15 = lambda *args, b,: 0 + l16 = lambda *, b,: 0 + l17 = lambda *args, **kwds,: 0 + l18 = lambda a, *args, b,: 0 + l19 = lambda a, *, b,: 0 + l20 = lambda a, *args, **kwds,: 0 + l21 = lambda *args, b, **kwds,: 0 + l22 = lambda *, b, **kwds,: 0 + l23 = lambda a, *args, b, **kwds,: 0 + l24 = lambda a, *, b, **kwds,: 0 + ### stmt: simple_stmt | compound_stmt # Tested below diff --git a/Lib/test/test_inspect.py b/Lib/test/test_inspect.py index 955b2ad..db15b39 100644 --- a/Lib/test/test_inspect.py +++ b/Lib/test/test_inspect.py @@ -38,7 +38,7 @@ from test.test_import import _ready_to_import # ismodule, isclass, ismethod, isfunction, istraceback, isframe, iscode, # isbuiltin, isroutine, isgenerator, isgeneratorfunction, getmembers, # getdoc, getfile, getmodule, getsourcefile, getcomments, getsource, -# getclasstree, getargspec, getargvalues, formatargspec, formatargvalues, +# getclasstree, getargvalues, formatargspec, formatargvalues, # currentframe, stack, trace, isdatadescriptor # NOTE: There are some additional tests relating to interaction with @@ -628,18 +628,6 @@ class TestClassesAndFunctions(unittest.TestCase): got = inspect.getmro(D) self.assertEqual(expected, got) - def assertArgSpecEquals(self, routine, args_e, varargs_e=None, - varkw_e=None, defaults_e=None, formatted=None): - with self.assertWarns(DeprecationWarning): - args, varargs, varkw, defaults = inspect.getargspec(routine) - self.assertEqual(args, args_e) - self.assertEqual(varargs, varargs_e) - self.assertEqual(varkw, varkw_e) - self.assertEqual(defaults, defaults_e) - if formatted is not None: - self.assertEqual(inspect.formatargspec(args, varargs, varkw, defaults), - formatted) - def assertFullArgSpecEquals(self, routine, args_e, varargs_e=None, varkw_e=None, defaults_e=None, kwonlyargs_e=[], kwonlydefaults_e=None, @@ -658,23 +646,6 @@ class TestClassesAndFunctions(unittest.TestCase): kwonlyargs, kwonlydefaults, ann), formatted) - def test_getargspec(self): - self.assertArgSpecEquals(mod.eggs, ['x', 'y'], formatted='(x, y)') - - self.assertArgSpecEquals(mod.spam, - ['a', 'b', 'c', 'd', 'e', 'f'], - 'g', 'h', (3, 4, 5), - '(a, b, c, d=3, e=4, f=5, *g, **h)') - - self.assertRaises(ValueError, self.assertArgSpecEquals, - mod2.keyworded, []) - - self.assertRaises(ValueError, self.assertArgSpecEquals, - mod2.annotated, []) - self.assertRaises(ValueError, self.assertArgSpecEquals, - mod2.keyword_only_arg, []) - - def test_getfullargspec(self): self.assertFullArgSpecEquals(mod2.keyworded, [], varargs_e='arg1', kwonlyargs_e=['arg2'], @@ -688,20 +659,19 @@ class TestClassesAndFunctions(unittest.TestCase): kwonlyargs_e=['arg'], formatted='(*, arg)') - def test_argspec_api_ignores_wrapped(self): + def test_fullargspec_api_ignores_wrapped(self): # Issue 20684: low level introspection API must ignore __wrapped__ @functools.wraps(mod.spam) def ham(x, y): pass # Basic check - self.assertArgSpecEquals(ham, ['x', 'y'], formatted='(x, y)') self.assertFullArgSpecEquals(ham, ['x', 'y'], formatted='(x, y)') self.assertFullArgSpecEquals(functools.partial(ham), ['x', 'y'], formatted='(x, y)') # Other variants def check_method(f): - self.assertArgSpecEquals(f, ['self', 'x', 'y'], - formatted='(self, x, y)') + self.assertFullArgSpecEquals(f, ['self', 'x', 'y'], + formatted='(self, x, y)') class C: @functools.wraps(mod.spam) def ham(self, x, y): @@ -779,11 +749,11 @@ class TestClassesAndFunctions(unittest.TestCase): with self.assertRaises(TypeError): inspect.getfullargspec(builtin) - def test_getargspec_method(self): + def test_getfullargspec_method(self): class A(object): def m(self): pass - self.assertArgSpecEquals(A.m, ['self']) + self.assertFullArgSpecEquals(A.m, ['self']) def test_classify_newstyle(self): class A(object): diff --git a/Lib/test/test_itertools.py b/Lib/test/test_itertools.py index 5b3ba7e..9e55b2a 100644 --- a/Lib/test/test_itertools.py +++ b/Lib/test/test_itertools.py @@ -613,6 +613,56 @@ class TestBasicOps(unittest.TestCase): for proto in range(pickle.HIGHEST_PROTOCOL + 1): self.pickletest(proto, cycle('abc')) + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + # test with partial consumed input iterable + it = iter('abcde') + c = cycle(it) + _ = [next(c) for i in range(2)] # consume 2 of 5 inputs + p = pickle.dumps(c, proto) + d = pickle.loads(p) # rebuild the cycle object + self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab')) + + # test with completely consumed input iterable + it = iter('abcde') + c = cycle(it) + _ = [next(c) for i in range(7)] # consume 7 of 5 inputs + p = pickle.dumps(c, proto) + d = pickle.loads(p) # rebuild the cycle object + self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab')) + + def test_cycle_setstate(self): + # Verify both modes for restoring state + + # Mode 0 is efficient. It uses an incompletely consumed input + # iterator to build a cycle object and then passes in state with + # a list of previously consumed values. There is no data + # overlap bewteen the two. + c = cycle('defg') + c.__setstate__((list('abc'), 0)) + self.assertEqual(take(20, c), list('defgabcdefgabcdefgab')) + + # Mode 1 is inefficient. It starts with a cycle object built + # from an iterator over the remaining elements in a partial + # cycle and then passes in state with all of the previously + # seen values (this overlaps values included in the iterator). + c = cycle('defg') + c.__setstate__((list('abcdefg'), 1)) + self.assertEqual(take(20, c), list('defgabcdefgabcdefgab')) + + # The first argument to setstate needs to be a tuple + with self.assertRaises(SystemError): + cycle('defg').__setstate__([list('abcdefg'), 0]) + + # The first argument in the setstate tuple must be a list + with self.assertRaises(TypeError): + c = cycle('defg') + c.__setstate__((dict.fromkeys('defg'), 0)) + take(20, c) + + # The first argument in the setstate tuple must be a list + with self.assertRaises(TypeError): + cycle('defg').__setstate__((list('abcdefg'), 'x')) + def test_groupby(self): # Check whether it accepts arguments correctly self.assertEqual([], list(groupby([]))) diff --git a/Lib/test/test_linecache.py b/Lib/test/test_linecache.py index 21ef738..240db7f 100644 --- a/Lib/test/test_linecache.py +++ b/Lib/test/test_linecache.py @@ -3,6 +3,8 @@ import linecache import unittest import os.path +import tempfile +import tokenize from test import support @@ -10,8 +12,6 @@ FILENAME = linecache.__file__ NONEXISTENT_FILENAME = FILENAME + '.missing' INVALID_NAME = '!@$)(!@#_1' EMPTY = '' -TESTS = 'inspect_fodder inspect_fodder2 mapping_tests' -TESTS = TESTS.split() TEST_PATH = os.path.dirname(__file__) MODULES = "linecache abc".split() MODULE_PATH = os.path.dirname(FILENAME) @@ -37,6 +37,65 @@ def f(): return 3''' # No ending newline +class TempFile: + + def setUp(self): + super().setUp() + with tempfile.NamedTemporaryFile(delete=False) as fp: + self.file_name = fp.name + fp.write(self.file_byte_string) + self.addCleanup(support.unlink, self.file_name) + + +class GetLineTestsGoodData(TempFile): + # file_list = ['list\n', 'of\n', 'good\n', 'strings\n'] + + def setUp(self): + self.file_byte_string = ''.join(self.file_list).encode('utf-8') + super().setUp() + + def test_getline(self): + with tokenize.open(self.file_name) as fp: + for index, line in enumerate(fp): + if not line.endswith('\n'): + line += '\n' + + cached_line = linecache.getline(self.file_name, index + 1) + self.assertEqual(line, cached_line) + + def test_getlines(self): + lines = linecache.getlines(self.file_name) + self.assertEqual(lines, self.file_list) + + +class GetLineTestsBadData(TempFile): + # file_byte_string = b'Bad data goes here' + + def test_getline(self): + self.assertRaises((SyntaxError, UnicodeDecodeError), + linecache.getline, self.file_name, 1) + + def test_getlines(self): + self.assertRaises((SyntaxError, UnicodeDecodeError), + linecache.getlines, self.file_name) + + +class EmptyFile(GetLineTestsGoodData, unittest.TestCase): + file_list = [] + + +class SingleEmptyLine(GetLineTestsGoodData, unittest.TestCase): + file_list = ['\n'] + + +class GoodUnicode(GetLineTestsGoodData, unittest.TestCase): + file_list = ['á\n', 'b\n', 'abcdef\n', 'ááááá\n'] + + +class BadUnicode(GetLineTestsBadData, unittest.TestCase): + file_byte_string = b'\x80abc' + + class LineCacheTests(unittest.TestCase): def test_getline(self): @@ -53,13 +112,6 @@ class LineCacheTests(unittest.TestCase): self.assertEqual(getline(EMPTY, 1), EMPTY) self.assertEqual(getline(INVALID_NAME, 1), EMPTY) - # Check whether lines correspond to those from file iteration - for entry in TESTS: - filename = os.path.join(TEST_PATH, entry) + '.py' - with open(filename) as file: - for index, line in enumerate(file): - self.assertEqual(line, getline(filename, index + 1)) - # Check module loading for entry in MODULES: filename = os.path.join(MODULE_PATH, entry) + '.py' @@ -80,12 +132,13 @@ class LineCacheTests(unittest.TestCase): def test_clearcache(self): cached = [] - for entry in TESTS: - filename = os.path.join(TEST_PATH, entry) + '.py' + for entry in MODULES: + filename = os.path.join(MODULE_PATH, entry) + '.py' cached.append(filename) linecache.getline(filename, 1) # Are all files cached? + self.assertNotEqual(cached, []) cached_empty = [fn for fn in cached if fn not in linecache.cache] self.assertEqual(cached_empty, []) diff --git a/Lib/test/test_operator.py b/Lib/test/test_operator.py index da9c8ef..27501c2 100644 --- a/Lib/test/test_operator.py +++ b/Lib/test/test_operator.py @@ -120,63 +120,63 @@ class OperatorTestCase: operator = self.module self.assertRaises(TypeError, operator.add) self.assertRaises(TypeError, operator.add, None, None) - self.assertTrue(operator.add(3, 4) == 7) + self.assertEqual(operator.add(3, 4), 7) def test_bitwise_and(self): operator = self.module self.assertRaises(TypeError, operator.and_) self.assertRaises(TypeError, operator.and_, None, None) - self.assertTrue(operator.and_(0xf, 0xa) == 0xa) + self.assertEqual(operator.and_(0xf, 0xa), 0xa) def test_concat(self): operator = self.module self.assertRaises(TypeError, operator.concat) self.assertRaises(TypeError, operator.concat, None, None) - self.assertTrue(operator.concat('py', 'thon') == 'python') - self.assertTrue(operator.concat([1, 2], [3, 4]) == [1, 2, 3, 4]) - self.assertTrue(operator.concat(Seq1([5, 6]), Seq1([7])) == [5, 6, 7]) - self.assertTrue(operator.concat(Seq2([5, 6]), Seq2([7])) == [5, 6, 7]) + self.assertEqual(operator.concat('py', 'thon'), 'python') + self.assertEqual(operator.concat([1, 2], [3, 4]), [1, 2, 3, 4]) + self.assertEqual(operator.concat(Seq1([5, 6]), Seq1([7])), [5, 6, 7]) + self.assertEqual(operator.concat(Seq2([5, 6]), Seq2([7])), [5, 6, 7]) self.assertRaises(TypeError, operator.concat, 13, 29) def test_countOf(self): operator = self.module self.assertRaises(TypeError, operator.countOf) self.assertRaises(TypeError, operator.countOf, None, None) - self.assertTrue(operator.countOf([1, 2, 1, 3, 1, 4], 3) == 1) - self.assertTrue(operator.countOf([1, 2, 1, 3, 1, 4], 5) == 0) + self.assertEqual(operator.countOf([1, 2, 1, 3, 1, 4], 3), 1) + self.assertEqual(operator.countOf([1, 2, 1, 3, 1, 4], 5), 0) def test_delitem(self): operator = self.module a = [4, 3, 2, 1] self.assertRaises(TypeError, operator.delitem, a) self.assertRaises(TypeError, operator.delitem, a, None) - self.assertTrue(operator.delitem(a, 1) is None) - self.assertTrue(a == [4, 2, 1]) + self.assertIsNone(operator.delitem(a, 1)) + self.assertEqual(a, [4, 2, 1]) def test_floordiv(self): operator = self.module self.assertRaises(TypeError, operator.floordiv, 5) self.assertRaises(TypeError, operator.floordiv, None, None) - self.assertTrue(operator.floordiv(5, 2) == 2) + self.assertEqual(operator.floordiv(5, 2), 2) def test_truediv(self): operator = self.module self.assertRaises(TypeError, operator.truediv, 5) self.assertRaises(TypeError, operator.truediv, None, None) - self.assertTrue(operator.truediv(5, 2) == 2.5) + self.assertEqual(operator.truediv(5, 2), 2.5) def test_getitem(self): operator = self.module a = range(10) self.assertRaises(TypeError, operator.getitem) self.assertRaises(TypeError, operator.getitem, a, None) - self.assertTrue(operator.getitem(a, 2) == 2) + self.assertEqual(operator.getitem(a, 2), 2) def test_indexOf(self): operator = self.module self.assertRaises(TypeError, operator.indexOf) self.assertRaises(TypeError, operator.indexOf, None, None) - self.assertTrue(operator.indexOf([4, 3, 2, 1], 3) == 1) + self.assertEqual(operator.indexOf([4, 3, 2, 1], 3), 1) self.assertRaises(ValueError, operator.indexOf, [4, 3, 2, 1], 0) def test_invert(self): @@ -189,21 +189,21 @@ class OperatorTestCase: operator = self.module self.assertRaises(TypeError, operator.lshift) self.assertRaises(TypeError, operator.lshift, None, 42) - self.assertTrue(operator.lshift(5, 1) == 10) - self.assertTrue(operator.lshift(5, 0) == 5) + self.assertEqual(operator.lshift(5, 1), 10) + self.assertEqual(operator.lshift(5, 0), 5) self.assertRaises(ValueError, operator.lshift, 2, -1) def test_mod(self): operator = self.module self.assertRaises(TypeError, operator.mod) self.assertRaises(TypeError, operator.mod, None, 42) - self.assertTrue(operator.mod(5, 2) == 1) + self.assertEqual(operator.mod(5, 2), 1) def test_mul(self): operator = self.module self.assertRaises(TypeError, operator.mul) self.assertRaises(TypeError, operator.mul, None, None) - self.assertTrue(operator.mul(5, 2) == 10) + self.assertEqual(operator.mul(5, 2), 10) def test_matmul(self): operator = self.module @@ -227,7 +227,7 @@ class OperatorTestCase: operator = self.module self.assertRaises(TypeError, operator.or_) self.assertRaises(TypeError, operator.or_, None, None) - self.assertTrue(operator.or_(0xa, 0x5) == 0xf) + self.assertEqual(operator.or_(0xa, 0x5), 0xf) def test_pos(self): operator = self.module @@ -250,8 +250,8 @@ class OperatorTestCase: operator = self.module self.assertRaises(TypeError, operator.rshift) self.assertRaises(TypeError, operator.rshift, None, 42) - self.assertTrue(operator.rshift(5, 1) == 2) - self.assertTrue(operator.rshift(5, 0) == 5) + self.assertEqual(operator.rshift(5, 1), 2) + self.assertEqual(operator.rshift(5, 0), 5) self.assertRaises(ValueError, operator.rshift, 2, -1) def test_contains(self): @@ -266,15 +266,15 @@ class OperatorTestCase: a = list(range(3)) self.assertRaises(TypeError, operator.setitem, a) self.assertRaises(TypeError, operator.setitem, a, None, None) - self.assertTrue(operator.setitem(a, 0, 2) is None) - self.assertTrue(a == [2, 1, 2]) + self.assertIsNone(operator.setitem(a, 0, 2)) + self.assertEqual(a, [2, 1, 2]) self.assertRaises(IndexError, operator.setitem, a, 4, 2) def test_sub(self): operator = self.module self.assertRaises(TypeError, operator.sub) self.assertRaises(TypeError, operator.sub, None, None) - self.assertTrue(operator.sub(5, 2) == 3) + self.assertEqual(operator.sub(5, 2), 3) def test_truth(self): operator = self.module @@ -292,7 +292,7 @@ class OperatorTestCase: operator = self.module self.assertRaises(TypeError, operator.xor) self.assertRaises(TypeError, operator.xor, None, None) - self.assertTrue(operator.xor(0xb, 0xc) == 0x7) + self.assertEqual(operator.xor(0xb, 0xc), 0x7) def test_is(self): operator = self.module @@ -596,5 +596,38 @@ class CCOperatorPickleTestCase(OperatorPickleTestCase, unittest.TestCase): module2 = c_operator +class SubscriptTestCase: + def test_subscript(self): + subscript = self.module.subscript + self.assertIsNone(subscript[None]) + self.assertEqual(subscript[0], 0) + self.assertEqual(subscript[0:1:2], slice(0, 1, 2)) + self.assertEqual( + subscript[0, ..., :2, ...], + (0, Ellipsis, slice(2), Ellipsis), + ) + + def test_pickle(self): + from operator import subscript + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(proto=proto): + self.assertIs( + pickle.loads(pickle.dumps(subscript, proto)), + subscript, + ) + + def test_singleton(self): + with self.assertRaises(TypeError): + type(self.module.subscript)() + + def test_immutable(self): + with self.assertRaises(AttributeError): + self.module.subscript.attr = None + + +class PySubscriptTestCase(SubscriptTestCase, PyOperatorTestCase): + pass + + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index bb717cc..d4b9e9c 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -226,15 +226,10 @@ class FileTests(unittest.TestCase): # Test attributes on return values from os.*stat* family. class StatAttributeTests(unittest.TestCase): def setUp(self): - os.mkdir(support.TESTFN) - self.fname = os.path.join(support.TESTFN, "f1") - f = open(self.fname, 'wb') - f.write(b"ABC") - f.close() - - def tearDown(self): - os.unlink(self.fname) - os.rmdir(support.TESTFN) + self.fname = support.TESTFN + self.addCleanup(support.unlink, self.fname) + with open(self.fname, 'wb') as fp: + fp.write(b"ABC") @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') def check_stat_attributes(self, fname): @@ -426,7 +421,11 @@ class StatAttributeTests(unittest.TestCase): 0) # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set) - result = os.stat(support.TESTFN) + dirname = support.TESTFN + "dir" + os.mkdir(dirname) + self.addCleanup(os.rmdir, dirname) + + result = os.stat(dirname) self.check_file_attributes(result) self.assertEqual( result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, @@ -1226,13 +1225,15 @@ class URandomTests(unittest.TestCase): self.assertNotEqual(data1, data2) -HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1) -HAVE_GETRANDOM = (sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1) +# os.urandom() doesn't use a file descriptor when it is implemented with the +# getentropy() function, the getrandom() function or the getrandom() syscall +OS_URANDOM_DONT_USE_FD = ( + sysconfig.get_config_var('HAVE_GETENTROPY') == 1 + or sysconfig.get_config_var('HAVE_GETRANDOM') == 1 + or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1) -@unittest.skipIf(HAVE_GETENTROPY, - "getentropy() does not use a file descriptor") -@unittest.skipIf(HAVE_GETRANDOM, - "getrandom() does not use a file descriptor") +@unittest.skipIf(OS_URANDOM_DONT_USE_FD , + "os.random() does not use a file descriptor") class URandomFDTests(unittest.TestCase): @unittest.skipUnless(resource, "test requires the resource module") def test_urandom_failure(self): diff --git a/Lib/test/test_pydoc.py b/Lib/test/test_pydoc.py index ec5c31b..0533a03 100644 --- a/Lib/test/test_pydoc.py +++ b/Lib/test/test_pydoc.py @@ -811,6 +811,22 @@ class TestDescriptions(unittest.TestCase): self.assertEqual(self._get_summary_line(t.wrap), "wrap(text) method of textwrap.TextWrapper instance") + def test_field_order_for_named_tuples(self): + Person = namedtuple('Person', ['nickname', 'firstname', 'agegroup']) + s = pydoc.render_doc(Person) + self.assertLess(s.index('nickname'), s.index('firstname')) + self.assertLess(s.index('firstname'), s.index('agegroup')) + + class NonIterableFields: + _fields = None + + class NonHashableFields: + _fields = [[]] + + # Make sure these doesn't fail + pydoc.render_doc(NonIterableFields) + pydoc.render_doc(NonHashableFields) + @requires_docstrings def test_bound_builtin_method(self): s = StringIO() diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py index a398a4f..54bbfe4 100644 --- a/Lib/test/test_regrtest.py +++ b/Lib/test/test_regrtest.py @@ -1,21 +1,36 @@ """ Tests of regrtest.py. + +Note: test_regrtest cannot be run twice in parallel. """ import argparse import faulthandler import getopt import os.path +import platform +import re +import subprocess +import sys +import textwrap import unittest -from test import regrtest, support +from test import libregrtest +from test import support + + +Py_DEBUG = hasattr(sys, 'getobjects') +ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..') +ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR)) -class ParseArgsTestCase(unittest.TestCase): - """Test regrtest's argument parsing.""" +class ParseArgsTestCase(unittest.TestCase): + """ + Test regrtest's argument parsing, function _parse_args(). + """ def checkError(self, args, msg): with support.captured_stderr() as err, self.assertRaises(SystemExit): - regrtest._parse_args(args) + libregrtest._parse_args(args) self.assertIn(msg, err.getvalue()) def test_help(self): @@ -23,82 +38,82 @@ class ParseArgsTestCase(unittest.TestCase): with self.subTest(opt=opt): with support.captured_stdout() as out, \ self.assertRaises(SystemExit): - regrtest._parse_args([opt]) + libregrtest._parse_args([opt]) self.assertIn('Run Python regression tests.', out.getvalue()) @unittest.skipUnless(hasattr(faulthandler, 'dump_traceback_later'), "faulthandler.dump_traceback_later() required") def test_timeout(self): - ns = regrtest._parse_args(['--timeout', '4.2']) + ns = libregrtest._parse_args(['--timeout', '4.2']) self.assertEqual(ns.timeout, 4.2) self.checkError(['--timeout'], 'expected one argument') self.checkError(['--timeout', 'foo'], 'invalid float value') def test_wait(self): - ns = regrtest._parse_args(['--wait']) + ns = libregrtest._parse_args(['--wait']) self.assertTrue(ns.wait) def test_slaveargs(self): - ns = regrtest._parse_args(['--slaveargs', '[[], {}]']) + ns = libregrtest._parse_args(['--slaveargs', '[[], {}]']) self.assertEqual(ns.slaveargs, '[[], {}]') self.checkError(['--slaveargs'], 'expected one argument') def test_start(self): for opt in '-S', '--start': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt, 'foo']) + ns = libregrtest._parse_args([opt, 'foo']) self.assertEqual(ns.start, 'foo') self.checkError([opt], 'expected one argument') def test_verbose(self): - ns = regrtest._parse_args(['-v']) + ns = libregrtest._parse_args(['-v']) self.assertEqual(ns.verbose, 1) - ns = regrtest._parse_args(['-vvv']) + ns = libregrtest._parse_args(['-vvv']) self.assertEqual(ns.verbose, 3) - ns = regrtest._parse_args(['--verbose']) + ns = libregrtest._parse_args(['--verbose']) self.assertEqual(ns.verbose, 1) - ns = regrtest._parse_args(['--verbose'] * 3) + ns = libregrtest._parse_args(['--verbose'] * 3) self.assertEqual(ns.verbose, 3) - ns = regrtest._parse_args([]) + ns = libregrtest._parse_args([]) self.assertEqual(ns.verbose, 0) def test_verbose2(self): for opt in '-w', '--verbose2': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt]) + ns = libregrtest._parse_args([opt]) self.assertTrue(ns.verbose2) def test_verbose3(self): for opt in '-W', '--verbose3': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt]) + ns = libregrtest._parse_args([opt]) self.assertTrue(ns.verbose3) def test_quiet(self): for opt in '-q', '--quiet': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt]) + ns = libregrtest._parse_args([opt]) self.assertTrue(ns.quiet) self.assertEqual(ns.verbose, 0) def test_slow(self): for opt in '-o', '--slow': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt]) + ns = libregrtest._parse_args([opt]) self.assertTrue(ns.print_slow) def test_header(self): - ns = regrtest._parse_args(['--header']) + ns = libregrtest._parse_args(['--header']) self.assertTrue(ns.header) def test_randomize(self): for opt in '-r', '--randomize': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt]) + ns = libregrtest._parse_args([opt]) self.assertTrue(ns.randomize) def test_randseed(self): - ns = regrtest._parse_args(['--randseed', '12345']) + ns = libregrtest._parse_args(['--randseed', '12345']) self.assertEqual(ns.random_seed, 12345) self.assertTrue(ns.randomize) self.checkError(['--randseed'], 'expected one argument') @@ -107,7 +122,7 @@ class ParseArgsTestCase(unittest.TestCase): def test_fromfile(self): for opt in '-f', '--fromfile': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt, 'foo']) + ns = libregrtest._parse_args([opt, 'foo']) self.assertEqual(ns.fromfile, 'foo') self.checkError([opt], 'expected one argument') self.checkError([opt, 'foo', '-s'], "don't go together") @@ -115,42 +130,42 @@ class ParseArgsTestCase(unittest.TestCase): def test_exclude(self): for opt in '-x', '--exclude': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt]) + ns = libregrtest._parse_args([opt]) self.assertTrue(ns.exclude) def test_single(self): for opt in '-s', '--single': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt]) + ns = libregrtest._parse_args([opt]) self.assertTrue(ns.single) self.checkError([opt, '-f', 'foo'], "don't go together") def test_match(self): for opt in '-m', '--match': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt, 'pattern']) + ns = libregrtest._parse_args([opt, 'pattern']) self.assertEqual(ns.match_tests, 'pattern') self.checkError([opt], 'expected one argument') def test_failfast(self): for opt in '-G', '--failfast': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt, '-v']) + ns = libregrtest._parse_args([opt, '-v']) self.assertTrue(ns.failfast) - ns = regrtest._parse_args([opt, '-W']) + ns = libregrtest._parse_args([opt, '-W']) self.assertTrue(ns.failfast) self.checkError([opt], '-G/--failfast needs either -v or -W') def test_use(self): for opt in '-u', '--use': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt, 'gui,network']) + ns = libregrtest._parse_args([opt, 'gui,network']) self.assertEqual(ns.use_resources, ['gui', 'network']) - ns = regrtest._parse_args([opt, 'gui,none,network']) + ns = libregrtest._parse_args([opt, 'gui,none,network']) self.assertEqual(ns.use_resources, ['network']) - expected = list(regrtest.RESOURCE_NAMES) + expected = list(libregrtest.RESOURCE_NAMES) expected.remove('gui') - ns = regrtest._parse_args([opt, 'all,-gui']) + ns = libregrtest._parse_args([opt, 'all,-gui']) self.assertEqual(ns.use_resources, expected) self.checkError([opt], 'expected one argument') self.checkError([opt, 'foo'], 'invalid resource') @@ -158,31 +173,31 @@ class ParseArgsTestCase(unittest.TestCase): def test_memlimit(self): for opt in '-M', '--memlimit': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt, '4G']) + ns = libregrtest._parse_args([opt, '4G']) self.assertEqual(ns.memlimit, '4G') self.checkError([opt], 'expected one argument') def test_testdir(self): - ns = regrtest._parse_args(['--testdir', 'foo']) + ns = libregrtest._parse_args(['--testdir', 'foo']) self.assertEqual(ns.testdir, os.path.join(support.SAVEDCWD, 'foo')) self.checkError(['--testdir'], 'expected one argument') def test_runleaks(self): for opt in '-L', '--runleaks': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt]) + ns = libregrtest._parse_args([opt]) self.assertTrue(ns.runleaks) def test_huntrleaks(self): for opt in '-R', '--huntrleaks': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt, ':']) + ns = libregrtest._parse_args([opt, ':']) self.assertEqual(ns.huntrleaks, (5, 4, 'reflog.txt')) - ns = regrtest._parse_args([opt, '6:']) + ns = libregrtest._parse_args([opt, '6:']) self.assertEqual(ns.huntrleaks, (6, 4, 'reflog.txt')) - ns = regrtest._parse_args([opt, ':3']) + ns = libregrtest._parse_args([opt, ':3']) self.assertEqual(ns.huntrleaks, (5, 3, 'reflog.txt')) - ns = regrtest._parse_args([opt, '6:3:leaks.log']) + ns = libregrtest._parse_args([opt, '6:3:leaks.log']) self.assertEqual(ns.huntrleaks, (6, 3, 'leaks.log')) self.checkError([opt], 'expected one argument') self.checkError([opt, '6'], @@ -193,7 +208,7 @@ class ParseArgsTestCase(unittest.TestCase): def test_multiprocess(self): for opt in '-j', '--multiprocess': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt, '2']) + ns = libregrtest._parse_args([opt, '2']) self.assertEqual(ns.use_mp, 2) self.checkError([opt], 'expected one argument') self.checkError([opt, 'foo'], 'invalid int value') @@ -204,13 +219,13 @@ class ParseArgsTestCase(unittest.TestCase): def test_coverage(self): for opt in '-T', '--coverage': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt]) + ns = libregrtest._parse_args([opt]) self.assertTrue(ns.trace) def test_coverdir(self): for opt in '-D', '--coverdir': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt, 'foo']) + ns = libregrtest._parse_args([opt, 'foo']) self.assertEqual(ns.coverdir, os.path.join(support.SAVEDCWD, 'foo')) self.checkError([opt], 'expected one argument') @@ -218,13 +233,13 @@ class ParseArgsTestCase(unittest.TestCase): def test_nocoverdir(self): for opt in '-N', '--nocoverdir': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt]) + ns = libregrtest._parse_args([opt]) self.assertIsNone(ns.coverdir) def test_threshold(self): for opt in '-t', '--threshold': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt, '1000']) + ns = libregrtest._parse_args([opt, '1000']) self.assertEqual(ns.threshold, 1000) self.checkError([opt], 'expected one argument') self.checkError([opt, 'foo'], 'invalid int value') @@ -232,13 +247,13 @@ class ParseArgsTestCase(unittest.TestCase): def test_nowindows(self): for opt in '-n', '--nowindows': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt]) + ns = libregrtest._parse_args([opt]) self.assertTrue(ns.nowindows) def test_forever(self): for opt in '-F', '--forever': with self.subTest(opt=opt): - ns = regrtest._parse_args([opt]) + ns = libregrtest._parse_args([opt]) self.assertTrue(ns.forever) @@ -246,30 +261,330 @@ class ParseArgsTestCase(unittest.TestCase): self.checkError(['--xxx'], 'usage:') def test_long_option__partial(self): - ns = regrtest._parse_args(['--qui']) + ns = libregrtest._parse_args(['--qui']) self.assertTrue(ns.quiet) self.assertEqual(ns.verbose, 0) def test_two_options(self): - ns = regrtest._parse_args(['--quiet', '--exclude']) + ns = libregrtest._parse_args(['--quiet', '--exclude']) self.assertTrue(ns.quiet) self.assertEqual(ns.verbose, 0) self.assertTrue(ns.exclude) def test_option_with_empty_string_value(self): - ns = regrtest._parse_args(['--start', '']) + ns = libregrtest._parse_args(['--start', '']) self.assertEqual(ns.start, '') def test_arg(self): - ns = regrtest._parse_args(['foo']) + ns = libregrtest._parse_args(['foo']) self.assertEqual(ns.args, ['foo']) def test_option_and_arg(self): - ns = regrtest._parse_args(['--quiet', 'foo']) + ns = libregrtest._parse_args(['--quiet', 'foo']) self.assertTrue(ns.quiet) self.assertEqual(ns.verbose, 0) self.assertEqual(ns.args, ['foo']) +class BaseTestCase(unittest.TestCase): + TEST_UNIQUE_ID = 1 + TESTNAME_PREFIX = 'test_regrtest_' + TESTNAME_REGEX = r'test_[a-z0-9_]+' + + def setUp(self): + self.testdir = os.path.join(ROOT_DIR, 'Lib', 'test') + + # When test_regrtest is interrupted by CTRL+c, it can leave + # temporary test files + remove = [entry.path + for entry in os.scandir(self.testdir) + if (entry.name.startswith(self.TESTNAME_PREFIX) + and entry.name.endswith(".py"))] + for path in remove: + print("WARNING: test_regrtest: remove %s" % path) + support.unlink(path) + + def create_test(self, name=None, code=''): + if not name: + name = 'noop%s' % BaseTestCase.TEST_UNIQUE_ID + BaseTestCase.TEST_UNIQUE_ID += 1 + + # test_regrtest cannot be run twice in parallel because + # of setUp() and create_test() + name = self.TESTNAME_PREFIX + "%s_%s" % (os.getpid(), name) + path = os.path.join(self.testdir, name + '.py') + + self.addCleanup(support.unlink, path) + # Use 'x' mode to ensure that we do not override existing tests + with open(path, 'x', encoding='utf-8') as fp: + fp.write(code) + return name + + def regex_search(self, regex, output): + match = re.search(regex, output, re.MULTILINE) + if not match: + self.fail("%r not found in %r" % (regex, output)) + return match + + def check_line(self, output, regex): + regex = re.compile(r'^' + regex, re.MULTILINE) + self.assertRegex(output, regex) + + def parse_executed_tests(self, output): + parser = re.finditer(r'^\[[0-9]+/[0-9]+\] (%s)$' % self.TESTNAME_REGEX, + output, + re.MULTILINE) + return set(match.group(1) for match in parser) + + def check_executed_tests(self, output, tests, skipped=None): + if isinstance(tests, str): + tests = [tests] + executed = self.parse_executed_tests(output) + self.assertEqual(executed, set(tests), output) + ntest = len(tests) + if skipped: + if isinstance(skipped, str): + skipped = [skipped] + nskipped = len(skipped) + + plural = 's' if nskipped != 1 else '' + names = ' '.join(sorted(skipped)) + expected = (r'%s test%s skipped:\n %s$' + % (nskipped, plural, names)) + self.check_line(output, expected) + + ok = ntest - nskipped + if ok: + self.check_line(output, r'%s test OK\.$' % ok) + else: + self.check_line(output, r'All %s tests OK\.$' % ntest) + + def parse_random_seed(self, output): + match = self.regex_search(r'Using random seed ([0-9]+)', output) + randseed = int(match.group(1)) + self.assertTrue(0 <= randseed <= 10000000, randseed) + return randseed + + def run_command(self, args, input=None): + if not input: + input = '' + try: + return subprocess.run(args, + check=True, universal_newlines=True, + input=input, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + except subprocess.CalledProcessError as exc: + self.fail("%s\n" + "\n" + "stdout:\n" + "%s\n" + "\n" + "stderr:\n" + "%s" + % (str(exc), exc.stdout, exc.stderr)) + + + def run_python(self, args, **kw): + args = [sys.executable, '-X', 'faulthandler', '-I', *args] + proc = self.run_command(args, **kw) + return proc.stdout + + +class ProgramsTestCase(BaseTestCase): + """ + Test various ways to run the Python test suite. Use options close + to options used on the buildbot. + """ + + NTEST = 4 + + def setUp(self): + super().setUp() + + # Create NTEST tests doing nothing + self.tests = [self.create_test() for index in range(self.NTEST)] + + self.python_args = ['-Wd', '-E', '-bb'] + self.regrtest_args = ['-uall', '-rwW', '--timeout', '3600', '-j4'] + if sys.platform == 'win32': + self.regrtest_args.append('-n') + + def check_output(self, output): + self.parse_random_seed(output) + self.check_executed_tests(output, self.tests) + + def run_tests(self, args): + stdout = self.run_python(args) + self.check_output(stdout) + + def test_script_regrtest(self): + # Lib/test/regrtest.py + script = os.path.join(ROOT_DIR, 'Lib', 'test', 'regrtest.py') + + args = [*self.python_args, script, *self.regrtest_args, *self.tests] + self.run_tests(args) + + def test_module_test(self): + # -m test + args = [*self.python_args, '-m', 'test', + *self.regrtest_args, *self.tests] + self.run_tests(args) + + def test_module_regrtest(self): + # -m test.regrtest + args = [*self.python_args, '-m', 'test.regrtest', + *self.regrtest_args, *self.tests] + self.run_tests(args) + + def test_module_autotest(self): + # -m test.autotest + args = [*self.python_args, '-m', 'test.autotest', + *self.regrtest_args, *self.tests] + self.run_tests(args) + + def test_module_from_test_autotest(self): + # from test import autotest + code = 'from test import autotest' + args = [*self.python_args, '-c', code, + *self.regrtest_args, *self.tests] + self.run_tests(args) + + def test_script_autotest(self): + # Lib/test/autotest.py + script = os.path.join(ROOT_DIR, 'Lib', 'test', 'autotest.py') + args = [*self.python_args, script, *self.regrtest_args, *self.tests] + self.run_tests(args) + + def test_tools_script_run_tests(self): + # Tools/scripts/run_tests.py + script = os.path.join(ROOT_DIR, 'Tools', 'scripts', 'run_tests.py') + self.run_tests([script, *self.tests]) + + def run_batch(self, *args): + proc = self.run_command(args) + self.check_output(proc.stdout) + + @unittest.skipUnless(sys.platform == 'win32', 'Windows only') + def test_tools_buildbot_test(self): + # Tools\buildbot\test.bat + script = os.path.join(ROOT_DIR, 'Tools', 'buildbot', 'test.bat') + test_args = [] + if platform.architecture()[0] == '64bit': + test_args.append('-x64') # 64-bit build + if not Py_DEBUG: + test_args.append('+d') # Release build, use python.exe + self.run_batch(script, *test_args, *self.tests) + + @unittest.skipUnless(sys.platform == 'win32', 'Windows only') + def test_pcbuild_rt(self): + # PCbuild\rt.bat + script = os.path.join(ROOT_DIR, r'PCbuild\rt.bat') + rt_args = ["-q"] # Quick, don't run tests twice + if platform.architecture()[0] == '64bit': + rt_args.append('-x64') # 64-bit build + if Py_DEBUG: + rt_args.append('-d') # Debug build, use python_d.exe + self.run_batch(script, *rt_args, *self.regrtest_args, *self.tests) + + +class ArgsTestCase(BaseTestCase): + """ + Test arguments of the Python test suite. + """ + + def run_tests(self, *args, input=None): + return self.run_python(['-m', 'test', *args], input=input) + + def test_resources(self): + # test -u command line option + tests = {} + for resource in ('audio', 'network'): + code = 'from test import support\nsupport.requires(%r)' % resource + tests[resource] = self.create_test(resource, code) + test_names = sorted(tests.values()) + + # -u all: 2 resources enabled + output = self.run_tests('-u', 'all', *test_names) + self.check_executed_tests(output, test_names) + + # -u audio: 1 resource enabled + output = self.run_tests('-uaudio', *test_names) + self.check_executed_tests(output, test_names, + skipped=tests['network']) + + # no option: 0 resources enabled + output = self.run_tests(*test_names) + self.check_executed_tests(output, test_names, + skipped=test_names) + + def test_random(self): + # test -r and --randseed command line option + code = textwrap.dedent(""" + import random + print("TESTRANDOM: %s" % random.randint(1, 1000)) + """) + test = self.create_test('random', code) + + # first run to get the output with the random seed + output = self.run_tests('-r', test) + randseed = self.parse_random_seed(output) + match = self.regex_search(r'TESTRANDOM: ([0-9]+)', output) + test_random = int(match.group(1)) + + # try to reproduce with the random seed + output = self.run_tests('-r', '--randseed=%s' % randseed, test) + randseed2 = self.parse_random_seed(output) + self.assertEqual(randseed2, randseed) + + match = self.regex_search(r'TESTRANDOM: ([0-9]+)', output) + test_random2 = int(match.group(1)) + self.assertEqual(test_random2, test_random) + + def test_fromfile(self): + # test --fromfile + tests = [self.create_test() for index in range(5)] + + # Write the list of files using a format similar to regrtest output: + # [1/2] test_1 + # [2/2] test_2 + filename = support.TESTFN + self.addCleanup(support.unlink, filename) + with open(filename, "w") as fp: + for index, name in enumerate(tests, 1): + print("[%s/%s] %s" % (index, len(tests), name), file=fp) + + output = self.run_tests('--fromfile', filename) + self.check_executed_tests(output, tests) + + def test_slow(self): + # test --slow + tests = [self.create_test() for index in range(3)] + output = self.run_tests("--slow", *tests) + self.check_executed_tests(output, tests) + regex = ('10 slowest tests:\n' + '(?:%s: [0-9]+\.[0-9]+s\n){%s}' + % (self.TESTNAME_REGEX, len(tests))) + self.check_line(output, regex) + + @unittest.skipIf(sys.platform == 'win32', + "FIXME: coverage doesn't work on Windows") + def test_coverage(self): + # test --coverage + test = self.create_test() + output = self.run_tests("--coverage", test) + executed = self.parse_executed_tests(output) + self.assertEqual(executed, {test}, output) + regex = ('lines +cov% +module +\(path\)\n' + '(?: *[0-9]+ *[0-9]{1,2}% *[^ ]+ +\([^)]+\)+)+') + self.check_line(output, regex) + + def test_wait(self): + # test --wait + test = self.create_test() + output = self.run_tests("--wait", test, input='key') + self.check_line(output, 'Press any key to continue') + + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_rlcompleter.py b/Lib/test/test_rlcompleter.py index d37b620..2ff0788 100644 --- a/Lib/test/test_rlcompleter.py +++ b/Lib/test/test_rlcompleter.py @@ -5,6 +5,7 @@ import rlcompleter class CompleteMe: """ Trivial class used in testing rlcompleter.Completer. """ spam = 1 + _ham = 2 class TestRlcompleter(unittest.TestCase): @@ -51,11 +52,25 @@ class TestRlcompleter(unittest.TestCase): ['str.{}('.format(x) for x in dir(str) if x.startswith('s')]) self.assertEqual(self.stdcompleter.attr_matches('tuple.foospamegg'), []) + expected = sorted({'None.%s%s' % (x, '(' if x != '__doc__' else '') + for x in dir(None)}) + self.assertEqual(self.stdcompleter.attr_matches('None.'), expected) + self.assertEqual(self.stdcompleter.attr_matches('None._'), expected) + self.assertEqual(self.stdcompleter.attr_matches('None.__'), expected) # test with a customized namespace self.assertEqual(self.completer.attr_matches('CompleteMe.sp'), ['CompleteMe.spam']) self.assertEqual(self.completer.attr_matches('Completeme.egg'), []) + self.assertEqual(self.completer.attr_matches('CompleteMe.'), + ['CompleteMe.mro(', 'CompleteMe.spam']) + self.assertEqual(self.completer.attr_matches('CompleteMe._'), + ['CompleteMe._ham']) + matches = self.completer.attr_matches('CompleteMe.__') + for x in matches: + self.assertTrue(x.startswith('CompleteMe.__'), x) + self.assertIn('CompleteMe.__name__', matches) + self.assertIn('CompleteMe.__new__(', matches) CompleteMe.me = CompleteMe self.assertEqual(self.completer.attr_matches('CompleteMe.me.me.sp'), @@ -67,10 +82,15 @@ class TestRlcompleter(unittest.TestCase): def test_complete(self): completer = rlcompleter.Completer() self.assertEqual(completer.complete('', 0), '\t') - self.assertEqual(completer.complete('a', 0), 'and') - self.assertEqual(completer.complete('a', 1), 'as') - self.assertEqual(completer.complete('as', 2), 'assert') - self.assertEqual(completer.complete('an', 0), 'and') + self.assertEqual(completer.complete('a', 0), 'and ') + self.assertEqual(completer.complete('a', 1), 'as ') + self.assertEqual(completer.complete('as', 2), 'assert ') + self.assertEqual(completer.complete('an', 0), 'and ') + self.assertEqual(completer.complete('pa', 0), 'pass') + self.assertEqual(completer.complete('Fa', 0), 'False') + self.assertEqual(completer.complete('el', 0), 'elif ') + self.assertEqual(completer.complete('el', 1), 'else') + self.assertEqual(completer.complete('tr', 0), 'try:') if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_set.py b/Lib/test/test_set.py index 54de508..ade39fb 100644 --- a/Lib/test/test_set.py +++ b/Lib/test/test_set.py @@ -10,6 +10,8 @@ import sys import warnings import collections import collections.abc +import itertools +import string class PassThru(Exception): pass @@ -711,6 +713,28 @@ class TestFrozenSet(TestJointOps, unittest.TestCase): addhashvalue(hash(frozenset([e for e, m in elemmasks if m&i]))) self.assertEqual(len(hashvalues), 2**n) + def letter_range(n): + return string.ascii_letters[:n] + + def zf_range(n): + # https://en.wikipedia.org/wiki/Set-theoretic_definition_of_natural_numbers + nums = [frozenset()] + for i in range(n-1): + num = frozenset(nums) + nums.append(num) + return nums[:n] + + def powerset(s): + for i in range(len(s)+1): + yield from map(frozenset, itertools.combinations(s, i)) + + for n in range(18): + t = 2 ** n + mask = t - 1 + for nums in (range, letter_range, zf_range): + u = len({h & mask for h in map(hash, powerset(nums(n)))}) + self.assertGreater(4*u, t) + class FrozenSetSubclass(frozenset): pass diff --git a/Lib/test/test_symbol.py b/Lib/test/test_symbol.py new file mode 100644 index 0000000..2dcb9de --- /dev/null +++ b/Lib/test/test_symbol.py @@ -0,0 +1,55 @@ +import unittest +from test import support +import filecmp +import os +import sys +import subprocess + + +SYMBOL_FILE = support.findfile('symbol.py') +GRAMMAR_FILE = os.path.join(os.path.dirname(__file__), + '..', '..', 'Include', 'graminit.h') +TEST_PY_FILE = 'symbol_test.py' + + +class TestSymbolGeneration(unittest.TestCase): + + def _copy_file_without_generated_symbols(self, source_file, dest_file): + with open(source_file) as fp: + lines = fp.readlines() + with open(dest_file, 'w') as fp: + fp.writelines(lines[:lines.index("#--start constants--\n") + 1]) + fp.writelines(lines[lines.index("#--end constants--\n"):]) + + def _generate_symbols(self, grammar_file, target_symbol_py_file): + proc = subprocess.Popen([sys.executable, + SYMBOL_FILE, + grammar_file, + target_symbol_py_file], stderr=subprocess.PIPE) + stderr = proc.communicate()[1] + return proc.returncode, stderr + + def compare_files(self, file1, file2): + with open(file1) as fp: + lines1 = fp.readlines() + with open(file2) as fp: + lines2 = fp.readlines() + self.assertEqual(lines1, lines2) + + @unittest.skipIf(not os.path.exists(GRAMMAR_FILE), + 'test only works from source build directory') + def test_real_grammar_and_symbol_file(self): + output = support.TESTFN + self.addCleanup(support.unlink, output) + + self._copy_file_without_generated_symbols(SYMBOL_FILE, output) + + exitcode, stderr = self._generate_symbols(GRAMMAR_FILE, output) + self.assertEqual(b'', stderr) + self.assertEqual(0, exitcode) + + self.compare_files(SYMBOL_FILE, output) + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_time.py b/Lib/test/test_time.py index de0cbc4..f883c45 100644 --- a/Lib/test/test_time.py +++ b/Lib/test/test_time.py @@ -1,6 +1,8 @@ from test import support +import decimal import enum import locale +import math import platform import sys import sysconfig @@ -21,17 +23,27 @@ SIZEOF_INT = sysconfig.get_config_var('SIZEOF_INT') or 4 TIME_MAXYEAR = (1 << 8 * SIZEOF_INT - 1) - 1 TIME_MINYEAR = -TIME_MAXYEAR - 1 +SEC_TO_US = 10 ** 6 US_TO_NS = 10 ** 3 MS_TO_NS = 10 ** 6 SEC_TO_NS = 10 ** 9 +NS_TO_SEC = 10 ** 9 class _PyTime(enum.IntEnum): # Round towards minus infinity (-inf) ROUND_FLOOR = 0 # Round towards infinity (+inf) ROUND_CEILING = 1 + # Round to nearest with ties going to nearest even integer + ROUND_HALF_EVEN = 2 -ALL_ROUNDING_METHODS = (_PyTime.ROUND_FLOOR, _PyTime.ROUND_CEILING) +# Rounding modes supported by PyTime +ROUNDING_MODES = ( + # (PyTime rounding method, decimal rounding method) + (_PyTime.ROUND_FLOOR, decimal.ROUND_FLOOR), + (_PyTime.ROUND_CEILING, decimal.ROUND_CEILING), + (_PyTime.ROUND_HALF_EVEN, decimal.ROUND_HALF_EVEN), +) class TimeTestCase(unittest.TestCase): @@ -607,79 +619,6 @@ class TestStrftime4dyear(_TestStrftimeYear, _Test4dYear, unittest.TestCase): class TestPytime(unittest.TestCase): - def setUp(self): - self.invalid_values = ( - -(2 ** 100), 2 ** 100, - -(2.0 ** 100.0), 2.0 ** 100.0, - ) - - @support.cpython_only - def test_time_t(self): - from _testcapi import pytime_object_to_time_t - for obj, time_t, rnd in ( - # Round towards minus infinity (-inf) - (0, 0, _PyTime.ROUND_FLOOR), - (-1, -1, _PyTime.ROUND_FLOOR), - (-1.0, -1, _PyTime.ROUND_FLOOR), - (-1.9, -2, _PyTime.ROUND_FLOOR), - (1.0, 1, _PyTime.ROUND_FLOOR), - (1.9, 1, _PyTime.ROUND_FLOOR), - # Round towards infinity (+inf) - (0, 0, _PyTime.ROUND_CEILING), - (-1, -1, _PyTime.ROUND_CEILING), - (-1.0, -1, _PyTime.ROUND_CEILING), - (-1.9, -1, _PyTime.ROUND_CEILING), - (1.0, 1, _PyTime.ROUND_CEILING), - (1.9, 2, _PyTime.ROUND_CEILING), - ): - self.assertEqual(pytime_object_to_time_t(obj, rnd), time_t) - - rnd = _PyTime.ROUND_FLOOR - for invalid in self.invalid_values: - self.assertRaises(OverflowError, - pytime_object_to_time_t, invalid, rnd) - - @support.cpython_only - def test_timespec(self): - from _testcapi import pytime_object_to_timespec - for obj, timespec, rnd in ( - # Round towards minus infinity (-inf) - (0, (0, 0), _PyTime.ROUND_FLOOR), - (-1, (-1, 0), _PyTime.ROUND_FLOOR), - (-1.0, (-1, 0), _PyTime.ROUND_FLOOR), - (1e-9, (0, 1), _PyTime.ROUND_FLOOR), - (1e-10, (0, 0), _PyTime.ROUND_FLOOR), - (-1e-9, (-1, 999999999), _PyTime.ROUND_FLOOR), - (-1e-10, (-1, 999999999), _PyTime.ROUND_FLOOR), - (-1.2, (-2, 800000000), _PyTime.ROUND_FLOOR), - (0.9999999999, (0, 999999999), _PyTime.ROUND_FLOOR), - (1.1234567890, (1, 123456789), _PyTime.ROUND_FLOOR), - (1.1234567899, (1, 123456789), _PyTime.ROUND_FLOOR), - (-1.1234567890, (-2, 876543211), _PyTime.ROUND_FLOOR), - (-1.1234567891, (-2, 876543210), _PyTime.ROUND_FLOOR), - # Round towards infinity (+inf) - (0, (0, 0), _PyTime.ROUND_CEILING), - (-1, (-1, 0), _PyTime.ROUND_CEILING), - (-1.0, (-1, 0), _PyTime.ROUND_CEILING), - (1e-9, (0, 1), _PyTime.ROUND_CEILING), - (1e-10, (0, 1), _PyTime.ROUND_CEILING), - (-1e-9, (-1, 999999999), _PyTime.ROUND_CEILING), - (-1e-10, (0, 0), _PyTime.ROUND_CEILING), - (-1.2, (-2, 800000000), _PyTime.ROUND_CEILING), - (0.9999999999, (1, 0), _PyTime.ROUND_CEILING), - (1.1234567890, (1, 123456790), _PyTime.ROUND_CEILING), - (1.1234567899, (1, 123456790), _PyTime.ROUND_CEILING), - (-1.1234567890, (-2, 876543211), _PyTime.ROUND_CEILING), - (-1.1234567891, (-2, 876543211), _PyTime.ROUND_CEILING), - ): - with self.subTest(obj=obj, round=rnd, timespec=timespec): - self.assertEqual(pytime_object_to_timespec(obj, rnd), timespec) - - rnd = _PyTime.ROUND_FLOOR - for invalid in self.invalid_values: - self.assertRaises(OverflowError, - pytime_object_to_timespec, invalid, rnd) - @unittest.skipUnless(time._STRUCT_TM_ITEMS == 11, "needs tm_zone support") def test_localtime_timezone(self): @@ -734,266 +673,291 @@ class TestPytime(unittest.TestCase): self.assertIs(lt.tm_zone, None) -@unittest.skipUnless(_testcapi is not None, - 'need the _testcapi module') -class TestPyTime_t(unittest.TestCase): +@unittest.skipIf(_testcapi is None, 'need the _testcapi module') +class CPyTimeTestCase: + """ + Base class to test the C _PyTime_t API. + """ + OVERFLOW_SECONDS = None + + def setUp(self): + from _testcapi import SIZEOF_TIME_T + bits = SIZEOF_TIME_T * 8 - 1 + self.time_t_min = -2 ** bits + self.time_t_max = 2 ** bits - 1 + + def time_t_filter(self, seconds): + return (self.time_t_min <= seconds <= self.time_t_max) + + def _rounding_values(self, use_float): + "Build timestamps used to test rounding." + + units = [1, US_TO_NS, MS_TO_NS, SEC_TO_NS] + if use_float: + # picoseconds are only tested to pytime_converter accepting floats + units.append(1e-3) + + values = ( + # small values + 1, 2, 5, 7, 123, 456, 1234, + # 10^k - 1 + 9, + 99, + 999, + 9999, + 99999, + 999999, + # test half even rounding near 0.5, 1.5, 2.5, 3.5, 4.5 + 499, 500, 501, + 1499, 1500, 1501, + 2500, + 3500, + 4500, + ) + + ns_timestamps = [0] + for unit in units: + for value in values: + ns = value * unit + ns_timestamps.extend((-ns, ns)) + for pow2 in (0, 5, 10, 15, 22, 23, 24, 30, 33): + ns = (2 ** pow2) * SEC_TO_NS + ns_timestamps.extend(( + -ns-1, -ns, -ns+1, + ns-1, ns, ns+1 + )) + for seconds in (_testcapi.INT_MIN, _testcapi.INT_MAX): + ns_timestamps.append(seconds * SEC_TO_NS) + if use_float: + # numbers with an extract representation in IEEE 754 (base 2) + for pow2 in (3, 7, 10, 15): + ns = 2.0 ** (-pow2) + ns_timestamps.extend((-ns, ns)) + + # seconds close to _PyTime_t type limit + ns = (2 ** 63 // SEC_TO_NS) * SEC_TO_NS + ns_timestamps.extend((-ns, ns)) + + return ns_timestamps + + def _check_rounding(self, pytime_converter, expected_func, + use_float, unit_to_sec, value_filter=None): + + def convert_values(ns_timestamps): + if use_float: + unit_to_ns = SEC_TO_NS / float(unit_to_sec) + values = [ns / unit_to_ns for ns in ns_timestamps] + else: + unit_to_ns = SEC_TO_NS // unit_to_sec + values = [ns // unit_to_ns for ns in ns_timestamps] + + if value_filter: + values = filter(value_filter, values) + + # remove duplicates and sort + return sorted(set(values)) + + # test rounding + ns_timestamps = self._rounding_values(use_float) + valid_values = convert_values(ns_timestamps) + for time_rnd, decimal_rnd in ROUNDING_MODES : + context = decimal.getcontext() + context.rounding = decimal_rnd + + for value in valid_values: + debug_info = {'value': value, 'rounding': decimal_rnd} + try: + result = pytime_converter(value, time_rnd) + expected = expected_func(value) + except Exception as exc: + self.fail("Error on timestamp conversion: %s" % debug_info) + self.assertEqual(result, + expected, + debug_info) + + # test overflow + ns = self.OVERFLOW_SECONDS * SEC_TO_NS + ns_timestamps = (-ns, ns) + overflow_values = convert_values(ns_timestamps) + for time_rnd, _ in ROUNDING_MODES : + for value in overflow_values: + debug_info = {'value': value, 'rounding': time_rnd} + with self.assertRaises(OverflowError, msg=debug_info): + pytime_converter(value, time_rnd) + + def check_int_rounding(self, pytime_converter, expected_func, + unit_to_sec=1, value_filter=None): + self._check_rounding(pytime_converter, expected_func, + False, unit_to_sec, value_filter) + + def check_float_rounding(self, pytime_converter, expected_func, + unit_to_sec=1, value_filter=None): + self._check_rounding(pytime_converter, expected_func, + True, unit_to_sec, value_filter) + + def decimal_round(self, x): + d = decimal.Decimal(x) + d = d.quantize(1) + return int(d) + + +class TestCPyTime(CPyTimeTestCase, unittest.TestCase): + """ + Test the C _PyTime_t API. + """ + # _PyTime_t is a 64-bit signed integer + OVERFLOW_SECONDS = math.ceil((2**63 + 1) / SEC_TO_NS) + def test_FromSeconds(self): from _testcapi import PyTime_FromSeconds - for seconds in (0, 3, -456, _testcapi.INT_MAX, _testcapi.INT_MIN): - with self.subTest(seconds=seconds): - self.assertEqual(PyTime_FromSeconds(seconds), - seconds * SEC_TO_NS) + + # PyTime_FromSeconds() expects a C int, reject values out of range + def c_int_filter(secs): + return (_testcapi.INT_MIN <= secs <= _testcapi.INT_MAX) + + self.check_int_rounding(lambda secs, rnd: PyTime_FromSeconds(secs), + lambda secs: secs * SEC_TO_NS, + value_filter=c_int_filter) def test_FromSecondsObject(self): from _testcapi import PyTime_FromSecondsObject - # Conversion giving the same result for all rounding methods - for rnd in ALL_ROUNDING_METHODS: - for obj, ts in ( - # integers - (0, 0), - (1, SEC_TO_NS), - (-3, -3 * SEC_TO_NS), - - # float: subseconds - (0.0, 0), - (1e-9, 1), - (1e-6, 10 ** 3), - (1e-3, 10 ** 6), - - # float: seconds - (2.0, 2 * SEC_TO_NS), - (123.0, 123 * SEC_TO_NS), - (-7.0, -7 * SEC_TO_NS), - - # nanosecond are kept for value <= 2^23 seconds - (2**22 - 1e-9, 4194303999999999), - (2**22, 4194304000000000), - (2**22 + 1e-9, 4194304000000001), - (2**23 - 1e-9, 8388607999999999), - (2**23, 8388608000000000), - - # start loosing precision for value > 2^23 seconds - (2**23 + 1e-9, 8388608000000002), - - # nanoseconds are lost for value > 2^23 seconds - (2**24 - 1e-9, 16777215999999998), - (2**24, 16777216000000000), - (2**24 + 1e-9, 16777216000000000), - (2**25 - 1e-9, 33554432000000000), - (2**25 , 33554432000000000), - (2**25 + 1e-9, 33554432000000000), - - # close to 2^63 nanoseconds (_PyTime_t limit) - (9223372036, 9223372036 * SEC_TO_NS), - (9223372036.0, 9223372036 * SEC_TO_NS), - (-9223372036, -9223372036 * SEC_TO_NS), - (-9223372036.0, -9223372036 * SEC_TO_NS), - ): - with self.subTest(obj=obj, round=rnd, timestamp=ts): - self.assertEqual(PyTime_FromSecondsObject(obj, rnd), ts) - - with self.subTest(round=rnd): - with self.assertRaises(OverflowError): - PyTime_FromSecondsObject(9223372037, rnd) - PyTime_FromSecondsObject(9223372037.0, rnd) - PyTime_FromSecondsObject(-9223372037, rnd) - PyTime_FromSecondsObject(-9223372037.0, rnd) - - # Conversion giving different results depending on the rounding method - FLOOR = _PyTime.ROUND_FLOOR - CEILING = _PyTime.ROUND_CEILING - for obj, ts, rnd in ( - # close to zero - ( 1e-10, 0, FLOOR), - ( 1e-10, 1, CEILING), - (-1e-10, -1, FLOOR), - (-1e-10, 0, CEILING), - - # test rounding of the last nanosecond - ( 1.1234567899, 1123456789, FLOOR), - ( 1.1234567899, 1123456790, CEILING), - (-1.1234567899, -1123456790, FLOOR), - (-1.1234567899, -1123456789, CEILING), - - # close to 1 second - ( 0.9999999999, 999999999, FLOOR), - ( 0.9999999999, 1000000000, CEILING), - (-0.9999999999, -1000000000, FLOOR), - (-0.9999999999, -999999999, CEILING), - ): - with self.subTest(obj=obj, round=rnd, timestamp=ts): - self.assertEqual(PyTime_FromSecondsObject(obj, rnd), ts) + self.check_int_rounding( + PyTime_FromSecondsObject, + lambda secs: secs * SEC_TO_NS) + + self.check_float_rounding( + PyTime_FromSecondsObject, + lambda ns: self.decimal_round(ns * SEC_TO_NS)) def test_AsSecondsDouble(self): from _testcapi import PyTime_AsSecondsDouble - for nanoseconds, seconds in ( - # near 1 nanosecond - ( 0, 0.0), - ( 1, 1e-9), - (-1, -1e-9), - - # near 1 second - (SEC_TO_NS + 1, 1.0 + 1e-9), - (SEC_TO_NS, 1.0), - (SEC_TO_NS - 1, 1.0 - 1e-9), - - # a few seconds - (123 * SEC_TO_NS, 123.0), - (-567 * SEC_TO_NS, -567.0), - - # nanosecond are kept for value <= 2^23 seconds - (4194303999999999, 2**22 - 1e-9), - (4194304000000000, 2**22), - (4194304000000001, 2**22 + 1e-9), - - # start loosing precision for value > 2^23 seconds - (8388608000000002, 2**23 + 1e-9), - - # nanoseconds are lost for value > 2^23 seconds - (16777215999999998, 2**24 - 1e-9), - (16777215999999999, 2**24 - 1e-9), - (16777216000000000, 2**24 ), - (16777216000000001, 2**24 ), - (16777216000000002, 2**24 + 2e-9), - - (33554432000000000, 2**25 ), - (33554432000000002, 2**25 ), - (33554432000000004, 2**25 + 4e-9), - - # close to 2^63 nanoseconds (_PyTime_t limit) - (9223372036 * SEC_TO_NS, 9223372036.0), - (-9223372036 * SEC_TO_NS, -9223372036.0), - ): - with self.subTest(nanoseconds=nanoseconds, seconds=seconds): - self.assertEqual(PyTime_AsSecondsDouble(nanoseconds), - seconds) - - def test_timeval(self): + def float_converter(ns): + if abs(ns) % SEC_TO_NS == 0: + return float(ns // SEC_TO_NS) + else: + return float(ns) / SEC_TO_NS + + self.check_int_rounding(lambda ns, rnd: PyTime_AsSecondsDouble(ns), + float_converter, + NS_TO_SEC) + + def create_decimal_converter(self, denominator): + denom = decimal.Decimal(denominator) + + def converter(value): + d = decimal.Decimal(value) / denom + return self.decimal_round(d) + + return converter + + def test_AsTimeval(self): from _testcapi import PyTime_AsTimeval - for rnd in ALL_ROUNDING_METHODS: - for ns, tv in ( - # microseconds - (0, (0, 0)), - (1000, (0, 1)), - (-1000, (-1, 999999)), - - # seconds - (2 * SEC_TO_NS, (2, 0)), - (-3 * SEC_TO_NS, (-3, 0)), - ): - with self.subTest(nanoseconds=ns, timeval=tv, round=rnd): - self.assertEqual(PyTime_AsTimeval(ns, rnd), tv) - - FLOOR = _PyTime.ROUND_FLOOR - CEILING = _PyTime.ROUND_CEILING - for ns, tv, rnd in ( - # nanoseconds - (1, (0, 0), FLOOR), - (1, (0, 1), CEILING), - (-1, (-1, 999999), FLOOR), - (-1, (0, 0), CEILING), - - # seconds + nanoseconds - (1234567001, (1, 234567), FLOOR), - (1234567001, (1, 234568), CEILING), - (-1234567001, (-2, 765432), FLOOR), - (-1234567001, (-2, 765433), CEILING), - ): - with self.subTest(nanoseconds=ns, timeval=tv, round=rnd): - self.assertEqual(PyTime_AsTimeval(ns, rnd), tv) + + us_converter = self.create_decimal_converter(US_TO_NS) + + def timeval_converter(ns): + us = us_converter(ns) + return divmod(us, SEC_TO_US) + + if sys.platform == 'win32': + from _testcapi import LONG_MIN, LONG_MAX + + # On Windows, timeval.tv_sec type is a C long + def seconds_filter(secs): + return LONG_MIN <= secs <= LONG_MAX + else: + seconds_filter = self.time_t_filter + + self.check_int_rounding(PyTime_AsTimeval, + timeval_converter, + NS_TO_SEC, + value_filter=seconds_filter) @unittest.skipUnless(hasattr(_testcapi, 'PyTime_AsTimespec'), 'need _testcapi.PyTime_AsTimespec') - def test_timespec(self): + def test_AsTimespec(self): from _testcapi import PyTime_AsTimespec - for ns, ts in ( - # nanoseconds - (0, (0, 0)), - (1, (0, 1)), - (-1, (-1, 999999999)), - - # seconds - (2 * SEC_TO_NS, (2, 0)), - (-3 * SEC_TO_NS, (-3, 0)), - - # seconds + nanoseconds - (1234567890, (1, 234567890)), - (-1234567890, (-2, 765432110)), - ): - with self.subTest(nanoseconds=ns, timespec=ts): - self.assertEqual(PyTime_AsTimespec(ns), ts) - - def test_milliseconds(self): + + def timespec_converter(ns): + return divmod(ns, SEC_TO_NS) + + self.check_int_rounding(lambda ns, rnd: PyTime_AsTimespec(ns), + timespec_converter, + NS_TO_SEC, + value_filter=self.time_t_filter) + + def test_AsMilliseconds(self): from _testcapi import PyTime_AsMilliseconds - for rnd in ALL_ROUNDING_METHODS: - for ns, tv in ( - # milliseconds - (1 * MS_TO_NS, 1), - (-2 * MS_TO_NS, -2), - - # seconds - (2 * SEC_TO_NS, 2000), - (-3 * SEC_TO_NS, -3000), - ): - with self.subTest(nanoseconds=ns, timeval=tv, round=rnd): - self.assertEqual(PyTime_AsMilliseconds(ns, rnd), tv) - - FLOOR = _PyTime.ROUND_FLOOR - CEILING = _PyTime.ROUND_CEILING - for ns, ms, rnd in ( - # nanoseconds - (1, 0, FLOOR), - (1, 1, CEILING), - (-1, -1, FLOOR), - (-1, 0, CEILING), - - # seconds + nanoseconds - (1234 * MS_TO_NS + 1, 1234, FLOOR), - (1234 * MS_TO_NS + 1, 1235, CEILING), - (-1234 * MS_TO_NS - 1, -1235, FLOOR), - (-1234 * MS_TO_NS - 1, -1234, CEILING), - ): - with self.subTest(nanoseconds=ns, milliseconds=ms, round=rnd): - self.assertEqual(PyTime_AsMilliseconds(ns, rnd), ms) - - def test_microseconds(self): + + self.check_int_rounding(PyTime_AsMilliseconds, + self.create_decimal_converter(MS_TO_NS), + NS_TO_SEC) + + def test_AsMicroseconds(self): from _testcapi import PyTime_AsMicroseconds - for rnd in ALL_ROUNDING_METHODS: - for ns, tv in ( - # microseconds - (1 * US_TO_NS, 1), - (-2 * US_TO_NS, -2), - - # milliseconds - (1 * MS_TO_NS, 1000), - (-2 * MS_TO_NS, -2000), - - # seconds - (2 * SEC_TO_NS, 2000000), - (-3 * SEC_TO_NS, -3000000), - ): - with self.subTest(nanoseconds=ns, timeval=tv, round=rnd): - self.assertEqual(PyTime_AsMicroseconds(ns, rnd), tv) - - FLOOR = _PyTime.ROUND_FLOOR - CEILING = _PyTime.ROUND_CEILING - for ns, ms, rnd in ( - # nanoseconds - (1, 0, FLOOR), - (1, 1, CEILING), - (-1, -1, FLOOR), - (-1, 0, CEILING), - - # seconds + nanoseconds - (1234 * US_TO_NS + 1, 1234, FLOOR), - (1234 * US_TO_NS + 1, 1235, CEILING), - (-1234 * US_TO_NS - 1, -1235, FLOOR), - (-1234 * US_TO_NS - 1, -1234, CEILING), - ): - with self.subTest(nanoseconds=ns, milliseconds=ms, round=rnd): - self.assertEqual(PyTime_AsMicroseconds(ns, rnd), ms) + + self.check_int_rounding(PyTime_AsMicroseconds, + self.create_decimal_converter(US_TO_NS), + NS_TO_SEC) + + +class TestOldPyTime(CPyTimeTestCase, unittest.TestCase): + """ + Test the old C _PyTime_t API: _PyTime_ObjectToXXX() functions. + """ + + # time_t is a 32-bit or 64-bit signed integer + OVERFLOW_SECONDS = 2 ** 64 + + def test_object_to_time_t(self): + from _testcapi import pytime_object_to_time_t + + self.check_int_rounding(pytime_object_to_time_t, + lambda secs: secs, + value_filter=self.time_t_filter) + + self.check_float_rounding(pytime_object_to_time_t, + self.decimal_round, + value_filter=self.time_t_filter) + + def create_converter(self, sec_to_unit): + def converter(secs): + floatpart, intpart = math.modf(secs) + intpart = int(intpart) + floatpart *= sec_to_unit + floatpart = self.decimal_round(floatpart) + if floatpart < 0: + floatpart += sec_to_unit + intpart -= 1 + elif floatpart >= sec_to_unit: + floatpart -= sec_to_unit + intpart += 1 + return (intpart, floatpart) + return converter + + def test_object_to_timeval(self): + from _testcapi import pytime_object_to_timeval + + self.check_int_rounding(pytime_object_to_timeval, + lambda secs: (secs, 0), + value_filter=self.time_t_filter) + + self.check_float_rounding(pytime_object_to_timeval, + self.create_converter(SEC_TO_US), + value_filter=self.time_t_filter) + + def test_object_to_timespec(self): + from _testcapi import pytime_object_to_timespec + + self.check_int_rounding(pytime_object_to_timespec, + lambda secs: (secs, 0), + value_filter=self.time_t_filter) + + self.check_float_rounding(pytime_object_to_timespec, + self.create_converter(SEC_TO_NS), + value_filter=self.time_t_filter) if __name__ == "__main__": diff --git a/Lib/test/test_tools/test_unparse.py b/Lib/test/test_tools/test_unparse.py index 976a6c5..4b47916 100644 --- a/Lib/test/test_tools/test_unparse.py +++ b/Lib/test/test_tools/test_unparse.py @@ -134,6 +134,15 @@ class ASTTestCase(unittest.TestCase): class UnparseTestCase(ASTTestCase): # Tests for specific bugs found in earlier versions of unparse + def test_fstrings(self): + # See issue 25180 + self.check_roundtrip(r"""f'{f"{0}"*3}'""") + self.check_roundtrip(r"""f'{f"{y}"*3}'""") + self.check_roundtrip(r"""f'{f"{\'x\'}"*3}'""") + + self.check_roundtrip(r'''f"{r'x' f'{\"s\"}'}"''') + self.check_roundtrip(r'''f"{r'x'rf'{\"s\"}'}"''') + def test_del_statement(self): self.check_roundtrip("del x, y, z") diff --git a/Lib/test/test_urlparse.py b/Lib/test/test_urlparse.py index 0552f90..fcf5082 100644 --- a/Lib/test/test_urlparse.py +++ b/Lib/test/test_urlparse.py @@ -554,29 +554,27 @@ class UrlParseTestCase(unittest.TestCase): self.assertEqual(p.port, 80) self.assertEqual(p.geturl(), url) - # Verify an illegal port is returned as None + # Verify an illegal port raises ValueError url = b"HTTP://WWW.PYTHON.ORG:65536/doc/#frag" p = urllib.parse.urlsplit(url) - self.assertEqual(p.port, None) + with self.assertRaisesRegex(ValueError, "out of range"): + p.port def test_attributes_bad_port(self): - """Check handling of non-integer ports.""" - p = urllib.parse.urlsplit("http://www.example.net:foo") - self.assertEqual(p.netloc, "www.example.net:foo") - self.assertRaises(ValueError, lambda: p.port) - - p = urllib.parse.urlparse("http://www.example.net:foo") - self.assertEqual(p.netloc, "www.example.net:foo") - self.assertRaises(ValueError, lambda: p.port) - - # Once again, repeat ourselves to test bytes - p = urllib.parse.urlsplit(b"http://www.example.net:foo") - self.assertEqual(p.netloc, b"www.example.net:foo") - self.assertRaises(ValueError, lambda: p.port) - - p = urllib.parse.urlparse(b"http://www.example.net:foo") - self.assertEqual(p.netloc, b"www.example.net:foo") - self.assertRaises(ValueError, lambda: p.port) + """Check handling of invalid ports.""" + for bytes in (False, True): + for parse in (urllib.parse.urlsplit, urllib.parse.urlparse): + for port in ("foo", "1.5", "-1", "0x10"): + with self.subTest(bytes=bytes, parse=parse, port=port): + netloc = "www.example.net:" + port + url = "http://" + netloc + if bytes: + netloc = netloc.encode("ascii") + url = url.encode("ascii") + p = parse(url) + self.assertEqual(p.netloc, netloc) + with self.assertRaises(ValueError): + p.port def test_attributes_without_netloc(self): # This example is straight from RFC 3261. It looks like it diff --git a/Lib/test/test_warnings/data/import_warning.py b/Lib/test/test_warnings/data/import_warning.py index d6ea2ce..32daec1 100644 --- a/Lib/test/test_warnings/data/import_warning.py +++ b/Lib/test/test_warnings/data/import_warning.py @@ -1,3 +1,3 @@ import warnings -warnings.warn('module-level warning', DeprecationWarning, stacklevel=2)
\ No newline at end of file +warnings.warn('module-level warning', DeprecationWarning, stacklevel=2) diff --git a/Lib/test/test_zipimport.py b/Lib/test/test_zipimport.py index a97a778..4f19535 100644 --- a/Lib/test/test_zipimport.py +++ b/Lib/test/test_zipimport.py @@ -214,7 +214,8 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase): packdir2 = packdir + TESTPACK2 + os.sep files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc), packdir2 + "__init__" + pyc_ext: (NOW, test_pyc), - packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)} + packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc), + "spam" + pyc_ext: (NOW, test_pyc)} z = ZipFile(TEMP_ZIP, "w") try: @@ -228,6 +229,14 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase): zi = zipimport.zipimporter(TEMP_ZIP) self.assertEqual(zi.archive, TEMP_ZIP) self.assertEqual(zi.is_package(TESTPACK), True) + + find_mod = zi.find_module('spam') + self.assertIsNotNone(find_mod) + self.assertIsInstance(find_mod, zipimport.zipimporter) + self.assertFalse(find_mod.is_package('spam')) + load_mod = find_mod.load_module('spam') + self.assertEqual(find_mod.get_filename('spam'), load_mod.__file__) + mod = zi.load_module(TESTPACK) self.assertEqual(zi.get_filename(TESTPACK), mod.__file__) @@ -287,6 +296,16 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase): self.assertEqual( zi.is_package(TESTPACK2 + os.sep + TESTMOD), False) + pkg_path = TEMP_ZIP + os.sep + packdir + TESTPACK2 + zi2 = zipimport.zipimporter(pkg_path) + find_mod_dotted = zi2.find_module(TESTMOD) + self.assertIsNotNone(find_mod_dotted) + self.assertIsInstance(find_mod_dotted, zipimport.zipimporter) + self.assertFalse(zi2.is_package(TESTMOD)) + load_mod = find_mod_dotted.load_module(TESTMOD) + self.assertEqual( + find_mod_dotted.get_filename(TESTMOD), load_mod.__file__) + mod_path = TESTPACK2 + os.sep + TESTMOD mod_name = module_path_to_dotted_name(mod_path) __import__(mod_name) |