diff options
Diffstat (limited to 'Lib/test')
28 files changed, 601 insertions, 72 deletions
diff --git a/Lib/test/string_tests.py b/Lib/test/string_tests.py index 5ed01f2..242a931 100644 --- a/Lib/test/string_tests.py +++ b/Lib/test/string_tests.py @@ -1,5 +1,5 @@ """ -Common tests shared by test_str, test_unicode, test_userstring and test_string. +Common tests shared by test_unicode, test_userstring and test_string. """ import unittest, string, sys, struct @@ -79,11 +79,9 @@ class BaseTest: def checkraises(self, exc, obj, methodname, *args): obj = self.fixtype(obj) args = self.fixtype(args) - self.assertRaises( - exc, - getattr(obj, methodname), - *args - ) + with self.assertRaises(exc) as cm: + getattr(obj, methodname)(*args) + self.assertNotEqual(str(cm.exception), '') # call obj.method(*args) without any checks def checkcall(self, obj, methodname, *args): @@ -1119,8 +1117,7 @@ class MixinStrUnicodeUserStringTest: def test_join(self): # join now works with any sequence type # moved here, because the argument order is - # different in string.join (see the test in - # test.test_string.StringTest.test_join) + # different in string.join self.checkequal('a b c d', ' ', 'join', ['a', 'b', 'c', 'd']) self.checkequal('abcd', '', 'join', ('a', 'b', 'c', 'd')) self.checkequal('bd', '', 'join', ('', 'b', '', 'd')) @@ -1140,6 +1137,7 @@ class MixinStrUnicodeUserStringTest: self.checkequal('a b c', ' ', 'join', BadSeq2()) self.checkraises(TypeError, ' ', 'join') + self.checkraises(TypeError, ' ', 'join', None) self.checkraises(TypeError, ' ', 'join', 7) self.checkraises(TypeError, ' ', 'join', [1, 2, bytes()]) try: diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index ca12101..294872a 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -2,6 +2,7 @@ import errno import logging +import math import socket import sys import time @@ -73,13 +74,6 @@ class BaseEventLoopTests(test_utils.TestCase): self.assertFalse(self.loop._scheduled) self.assertIn(h, self.loop._ready) - def test__add_callback_timer(self): - h = asyncio.TimerHandle(time.monotonic()+10, lambda: False, (), - self.loop) - - self.loop._add_callback(h) - self.assertIn(h, self.loop._scheduled) - def test__add_callback_cancelled_handle(self): h = asyncio.Handle(lambda: False, (), self.loop) h.cancel() @@ -283,6 +277,82 @@ class BaseEventLoopTests(test_utils.TestCase): self.assertTrue(processed) self.assertEqual([handle], list(self.loop._ready)) + def test__run_once_cancelled_event_cleanup(self): + self.loop._process_events = mock.Mock() + + self.assertTrue( + 0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0) + + def cb(): + pass + + # Set up one "blocking" event that will not be cancelled to + # ensure later cancelled events do not make it to the head + # of the queue and get cleaned. + not_cancelled_count = 1 + self.loop.call_later(3000, cb) + + # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES) + # cancelled handles, ensure they aren't removed + + cancelled_count = 2 + for x in range(2): + h = self.loop.call_later(3600, cb) + h.cancel() + + # Add some cancelled events that will be at head and removed + cancelled_count += 2 + for x in range(2): + h = self.loop.call_later(100, cb) + h.cancel() + + # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low + self.assertLessEqual(cancelled_count + not_cancelled_count, + base_events._MIN_SCHEDULED_TIMER_HANDLES) + + self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) + + self.loop._run_once() + + cancelled_count -= 2 + + self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) + + self.assertEqual(len(self.loop._scheduled), + cancelled_count + not_cancelled_count) + + # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION + # so that deletion of cancelled events will occur on next _run_once + add_cancel_count = int(math.ceil( + base_events._MIN_SCHEDULED_TIMER_HANDLES * + base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1 + + add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES - + add_cancel_count, 0) + + # Add some events that will not be cancelled + not_cancelled_count += add_not_cancel_count + for x in range(add_not_cancel_count): + self.loop.call_later(3600, cb) + + # Add enough cancelled events + cancelled_count += add_cancel_count + for x in range(add_cancel_count): + h = self.loop.call_later(3600, cb) + h.cancel() + + # Ensure all handles are still scheduled + self.assertEqual(len(self.loop._scheduled), + cancelled_count + not_cancelled_count) + + self.loop._run_once() + + # Ensure cancelled events were removed + self.assertEqual(len(self.loop._scheduled), not_cancelled_count) + + # Ensure only uncancelled events remain scheduled + self.assertTrue(all([not x._cancelled for x in self.loop._scheduled])) + def test_run_until_complete_type_error(self): self.assertRaises(TypeError, self.loop.run_until_complete, 'blah') diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 7ac845a..a305e66 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1890,9 +1890,17 @@ class HandleTests(test_utils.TestCase): # cancelled handle h.cancel() - self.assertEqual(repr(h), - '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' - % (filename, lineno, create_filename, create_lineno)) + self.assertEqual( + repr(h), + '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' + % (filename, lineno, create_filename, create_lineno)) + + # double cancellation won't overwrite _repr + h.cancel() + self.assertEqual( + repr(h), + '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' + % (filename, lineno, create_filename, create_lineno)) def test_handle_source_traceback(self): loop = asyncio.get_event_loop_policy().new_event_loop() diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index e25aa4d..770f218 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -1,5 +1,6 @@ """Tests for tasks.py.""" +import os import re import sys import types @@ -1768,25 +1769,31 @@ class GatherTestsBase: self.assertEqual(fut.result(), [3, 1, exc, exc2]) def test_env_var_debug(self): + aio_path = os.path.dirname(os.path.dirname(asyncio.__file__)) + code = '\n'.join(( 'import asyncio.coroutines', 'print(asyncio.coroutines._DEBUG)')) # Test with -E to not fail if the unit test was run with # PYTHONASYNCIODEBUG set to a non-empty string - sts, stdout, stderr = assert_python_ok('-E', '-c', code) + sts, stdout, stderr = assert_python_ok('-E', '-c', code, + PYTHONPATH=aio_path) self.assertEqual(stdout.rstrip(), b'False') sts, stdout, stderr = assert_python_ok('-c', code, - PYTHONASYNCIODEBUG='') + PYTHONASYNCIODEBUG='', + PYTHONPATH=aio_path) self.assertEqual(stdout.rstrip(), b'False') sts, stdout, stderr = assert_python_ok('-c', code, - PYTHONASYNCIODEBUG='1') + PYTHONASYNCIODEBUG='1', + PYTHONPATH=aio_path) self.assertEqual(stdout.rstrip(), b'True') sts, stdout, stderr = assert_python_ok('-E', '-c', code, - PYTHONASYNCIODEBUG='1') + PYTHONASYNCIODEBUG='1', + PYTHONPATH=aio_path) self.assertEqual(stdout.rstrip(), b'False') diff --git a/Lib/test/test_bytes.py b/Lib/test/test_bytes.py index 43b6c82..1a351a5 100644 --- a/Lib/test/test_bytes.py +++ b/Lib/test/test_bytes.py @@ -298,6 +298,7 @@ class BaseBytesTest: seq = [b"abc"] * 1000 expected = b"abc" + b".:abc" * 999 self.assertEqual(dot_join(seq), expected) + self.assertRaises(TypeError, self.type2test(b" ").join, None) # Error handling and cleanup when some item in the middle of the # sequence has the wrong type. with self.assertRaises(TypeError): diff --git a/Lib/test/test_doctest.py b/Lib/test/test_doctest.py index 56193e8..4137d5a 100644 --- a/Lib/test/test_doctest.py +++ b/Lib/test/test_doctest.py @@ -2613,6 +2613,36 @@ Test the verbose output: >>> sys.argv = save_argv """ +def test_lineendings(): r""" +*nix systems use \n line endings, while Windows systems use \r\n. Python +handles this using universal newline mode for reading files. Let's make +sure doctest does so (issue 8473) by creating temporary test files using each +of the two line disciplines. One of the two will be the "wrong" one for the +platform the test is run on. + +Windows line endings first: + + >>> import tempfile, os + >>> fn = tempfile.mktemp() + >>> with open(fn, 'w') as f: + ... f.write('Test:\r\n\r\n >>> x = 1 + 1\r\n\r\nDone.\r\n') + 35 + >>> doctest.testfile(fn, False) + TestResults(failed=0, attempted=1) + >>> os.remove(fn) + +And now *nix line endings: + + >>> fn = tempfile.mktemp() + >>> with open(fn, 'w') as f: + ... f.write('Test:\n\n >>> x = 1 + 1\n\nDone.\n') + 30 + >>> doctest.testfile(fn, False) + TestResults(failed=0, attempted=1) + >>> os.remove(fn) + +""" + def test_testmod(): r""" Tests for the testmod function. More might be useful, but for now we're just testing the case raised by Issue 6195, where trying to doctest a C module would diff --git a/Lib/test/test_faulthandler.py b/Lib/test/test_faulthandler.py index 41ccd1e..8dcefe4 100644 --- a/Lib/test/test_faulthandler.py +++ b/Lib/test/test_faulthandler.py @@ -184,10 +184,10 @@ class FaultHandlerTests(unittest.TestCase): self.check_fatal_error(""" import faulthandler faulthandler.enable() - faulthandler._read_null(True) + faulthandler._sigsegv(True) """, 3, - '(?:Segmentation fault|Bus error|Illegal instruction)') + 'Segmentation fault') def test_enable_file(self): with temporary_filename() as filename: @@ -220,7 +220,7 @@ class FaultHandlerTests(unittest.TestCase): """ not_expected = 'Fatal Python error' stderr, exitcode = self.get_output(code) - stder = '\n'.join(stderr) + stderr = '\n'.join(stderr) self.assertTrue(not_expected not in stderr, "%r is present in %r" % (not_expected, stderr)) self.assertNotEqual(exitcode, 0) diff --git a/Lib/test/test_imp.py b/Lib/test/test_imp.py index 024f438..80b9ec3 100644 --- a/Lib/test/test_imp.py +++ b/Lib/test/test_imp.py @@ -198,6 +198,7 @@ class ImportTests(unittest.TestCase): support.unlink(temp_mod_name + ext) support.unlink(init_file_name + ext) support.rmtree(test_package_name) + support.rmtree('__pycache__') def test_issue9319(self): path = os.path.dirname(__file__) diff --git a/Lib/test/test_import.py b/Lib/test/test_import.py index 781a159..b4842c5 100644 --- a/Lib/test/test_import.py +++ b/Lib/test/test_import.py @@ -1062,6 +1062,7 @@ class ImportTracebackTests(unittest.TestCase): # Issue #11619: The Python parser and the import machinery must not # encode filenames, especially on Windows pyname = script_helper.make_script('', TESTFN_UNENCODABLE, 'pass') + self.addCleanup(unlink, pyname) name = pyname[:-3] script_helper.assert_python_ok("-c", "mod = __import__(%a)" % name, __isolated=False) diff --git a/Lib/test/test_inspect.py b/Lib/test/test_inspect.py index da0572d..66d3fab 100644 --- a/Lib/test/test_inspect.py +++ b/Lib/test/test_inspect.py @@ -432,10 +432,11 @@ class TestBuggyCases(GetSourceBase): def test_method_in_dynamic_class(self): self.assertSourceEqual(mod2.method_in_dynamic_class, 95, 97) - @unittest.skipIf( - not hasattr(unicodedata, '__file__') or - unicodedata.__file__[-4:] in (".pyc", ".pyo"), - "unicodedata is not an external binary module") + # This should not skip for CPython, but might on a repackaged python where + # unicodedata is not an external module, or on pypy. + @unittest.skipIf(not hasattr(unicodedata, '__file__') or + unicodedata.__file__.endswith('.py'), + "unicodedata is not an external binary module") def test_findsource_binary(self): self.assertRaises(OSError, inspect.getsource, unicodedata) self.assertRaises(OSError, inspect.findsource, unicodedata) diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 32e908d..b5506b0 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -1567,6 +1567,12 @@ class BufferedRWPairTest(unittest.TestCase): pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) self.assertTrue(pair.isatty()) + def test_weakref_clearing(self): + brw = self.tp(self.MockRawIO(), self.MockRawIO()) + ref = weakref.ref(brw) + brw = None + ref = None # Shouldn't segfault. + class CBufferedRWPairTest(BufferedRWPairTest): tp = io.BufferedRWPair diff --git a/Lib/test/test_locale.py b/Lib/test/test_locale.py index ab990ae..e979753 100644 --- a/Lib/test/test_locale.py +++ b/Lib/test/test_locale.py @@ -425,7 +425,7 @@ class NormalizeTest(unittest.TestCase): def test_valencia_modifier(self): self.check('ca_ES.UTF-8@valencia', 'ca_ES.UTF-8@valencia') - self.check('ca_ES@valencia', 'ca_ES.ISO8859-1@valencia') + self.check('ca_ES@valencia', 'ca_ES.ISO8859-15@valencia') self.check('ca@valencia', 'ca_ES.ISO8859-1@valencia') def test_devanagari_modifier(self): diff --git a/Lib/test/test_macpath.py b/Lib/test/test_macpath.py index ae4e564..22f8491 100644 --- a/Lib/test/test_macpath.py +++ b/Lib/test/test_macpath.py @@ -49,16 +49,40 @@ class MacPathTestCase(unittest.TestCase): def test_join(self): join = macpath.join self.assertEqual(join('a', 'b'), ':a:b') + self.assertEqual(join(':a', 'b'), ':a:b') + self.assertEqual(join(':a:', 'b'), ':a:b') + self.assertEqual(join(':a::', 'b'), ':a::b') + self.assertEqual(join(':a', '::b'), ':a::b') + self.assertEqual(join('a', ':'), ':a:') + self.assertEqual(join('a:', ':'), 'a:') + self.assertEqual(join('a', ''), ':a:') + self.assertEqual(join('a:', ''), 'a:') + self.assertEqual(join('', ''), '') self.assertEqual(join('', 'a:b'), 'a:b') + self.assertEqual(join('', 'a', 'b'), ':a:b') self.assertEqual(join('a:b', 'c'), 'a:b:c') self.assertEqual(join('a:b', ':c'), 'a:b:c') self.assertEqual(join('a', ':b', ':c'), ':a:b:c') + self.assertEqual(join('a', 'b:'), 'b:') + self.assertEqual(join('a:', 'b:'), 'b:') self.assertEqual(join(b'a', b'b'), b':a:b') + self.assertEqual(join(b':a', b'b'), b':a:b') + self.assertEqual(join(b':a:', b'b'), b':a:b') + self.assertEqual(join(b':a::', b'b'), b':a::b') + self.assertEqual(join(b':a', b'::b'), b':a::b') + self.assertEqual(join(b'a', b':'), b':a:') + self.assertEqual(join(b'a:', b':'), b'a:') + self.assertEqual(join(b'a', b''), b':a:') + self.assertEqual(join(b'a:', b''), b'a:') + self.assertEqual(join(b'', b''), b'') self.assertEqual(join(b'', b'a:b'), b'a:b') + self.assertEqual(join(b'', b'a', b'b'), b':a:b') self.assertEqual(join(b'a:b', b'c'), b'a:b:c') self.assertEqual(join(b'a:b', b':c'), b'a:b:c') self.assertEqual(join(b'a', b':b', b':c'), b':a:b:c') + self.assertEqual(join(b'a', b'b:'), b'b:') + self.assertEqual(join(b'a:', b'b:'), b'b:') def test_splitext(self): splitext = macpath.splitext diff --git a/Lib/test/test_pdb.py b/Lib/test/test_pdb.py index 895be02..edc9e75 100644 --- a/Lib/test/test_pdb.py +++ b/Lib/test/test_pdb.py @@ -916,6 +916,7 @@ class PdbTestCase(unittest.TestCase): with open(filename, 'w') as f: f.write(textwrap.dedent(script)) self.addCleanup(support.unlink, filename) + self.addCleanup(support.rmtree, '__pycache__') cmd = [sys.executable, '-m', 'pdb', filename] stdout = stderr = None with subprocess.Popen(cmd, stdout=subprocess.PIPE, diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py index 3fae2b1..72fdd16 100644 --- a/Lib/test/test_posix.py +++ b/Lib/test/test_posix.py @@ -1125,18 +1125,19 @@ class PosixTester(unittest.TestCase): """ Test functions that call path_error2(), providing two filenames in their exceptions. """ - for name in ("rename", "replace", "link", "symlink"): + for name in ("rename", "replace", "link"): function = getattr(os, name, None) + if function is None: + continue - if function: - for dst in ("noodly2", support.TESTFN): - try: - function('doesnotexistfilename', dst) - except OSError as e: - self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e)) - break - else: - self.fail("No valid path_error2() test for os." + name) + for dst in ("noodly2", support.TESTFN): + try: + function('doesnotexistfilename', dst) + except OSError as e: + self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e)) + break + else: + self.fail("No valid path_error2() test for os." + name) class PosixGroupsTester(unittest.TestCase): diff --git a/Lib/test/test_re.py b/Lib/test/test_re.py index 011fba9..0584f19 100644 --- a/Lib/test/test_re.py +++ b/Lib/test/test_re.py @@ -766,8 +766,8 @@ class ReTests(unittest.TestCase): self.assertTrue(re.match((r"[\x%02xz]" % i).encode(), bytes([i]))) self.assertTrue(re.match(br"[\u]", b'u')) self.assertTrue(re.match(br"[\U]", b'U')) - self.assertRaises(re.error, re.match, br"[\911]", "") - self.assertRaises(re.error, re.match, br"[\x1z]", "") + self.assertRaises(re.error, re.match, br"[\911]", b"") + self.assertRaises(re.error, re.match, br"[\x1z]", b"") def test_bug_113254(self): self.assertEqual(re.match(r'(a)|(b)', 'b').start(1), -1) @@ -1203,16 +1203,33 @@ class ReTests(unittest.TestCase): self.assertEqual(m.group(2), "y") def test_debug_flag(self): + pat = r'(\.)(?:[ch]|py)(?(1)$|: )' with captured_stdout() as out: - re.compile('foo', re.DEBUG) - self.assertEqual(out.getvalue().splitlines(), - ['literal 102 ', 'literal 111 ', 'literal 111 ']) + re.compile(pat, re.DEBUG) + dump = '''\ +subpattern 1 + literal 46 +subpattern None + branch + in + literal 99 + literal 104 + or + literal 112 + literal 121 +subpattern None + groupref_exists 1 + at at_end + else + literal 58 + literal 32 +''' + self.assertEqual(out.getvalue(), dump) # Debug output is output again even a second time (bypassing # the cache -- issue #20426). with captured_stdout() as out: - re.compile('foo', re.DEBUG) - self.assertEqual(out.getvalue().splitlines(), - ['literal 102 ', 'literal 111 ', 'literal 111 ']) + re.compile(pat, re.DEBUG) + self.assertEqual(out.getvalue(), dump) def test_keyword_parameters(self): # Issue #20283: Accepting the string keyword parameter. diff --git a/Lib/test/test_source_encoding.py b/Lib/test/test_source_encoding.py index 0c41e50..39a7c56 100644 --- a/Lib/test/test_source_encoding.py +++ b/Lib/test/test_source_encoding.py @@ -1,7 +1,7 @@ # -*- coding: koi8-r -*- import unittest -from test.support import TESTFN, unlink, unload +from test.support import TESTFN, unlink, unload, rmtree import importlib import os import sys @@ -129,6 +129,7 @@ class SourceEncodingTest(unittest.TestCase): unlink(filename + "c") unlink(filename + "o") unload(TESTFN) + rmtree('__pycache__') def test_error_from_string(self): # See http://bugs.python.org/issue6289 diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index d1cf5b2..e71a400 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -1016,6 +1016,29 @@ class ContextTests(unittest.TestCase): self.assertRaises(TypeError, ctx.load_default_certs, None) self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH') + @unittest.skipIf(sys.platform == "win32", "not-Windows specific") + def test_load_default_certs_env(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + with support.EnvironmentVarGuard() as env: + env["SSL_CERT_DIR"] = CAPATH + env["SSL_CERT_FILE"] = CERTFILE + ctx.load_default_certs() + self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0}) + + @unittest.skipUnless(sys.platform == "win32", "Windows specific") + def test_load_default_certs_env_windows(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.load_default_certs() + stats = ctx.cert_store_stats() + + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + with support.EnvironmentVarGuard() as env: + env["SSL_CERT_DIR"] = CAPATH + env["SSL_CERT_FILE"] = CERTFILE + ctx.load_default_certs() + stats["x509"] += 1 + self.assertEqual(ctx.cert_store_stats(), stats) + def test_create_default_context(self): ctx = ssl.create_default_context() self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) diff --git a/Lib/test/test_statistics.py b/Lib/test/test_statistics.py index 6f5c9a6..f1da21e 100644 --- a/Lib/test/test_statistics.py +++ b/Lib/test/test_statistics.py @@ -991,14 +991,14 @@ class SumSpecialValues(NumericTestCase): result = statistics._sum([1, 2, inf, 3, -inf, 4]) self.assertTrue(math.isnan(result)) - def test_decimal_mismatched_infs_to_nan(self): + def test_decimal_extendedcontext_mismatched_infs_to_nan(self): # Test adding Decimal INFs with opposite sign returns NAN. inf = Decimal('inf') data = [1, 2, inf, 3, -inf, 4] with decimal.localcontext(decimal.ExtendedContext): self.assertTrue(math.isnan(statistics._sum(data))) - def test_decimal_mismatched_infs_to_nan(self): + def test_decimal_basiccontext_mismatched_infs_to_nan(self): # Test adding Decimal INFs with opposite sign raises InvalidOperation. inf = Decimal('inf') data = [1, 2, inf, 3, -inf, 4] diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index d0ab718..5381115 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -1008,6 +1008,39 @@ class ProcessTestCase(BaseTestCase): p = subprocess.Popen([sys.executable, "-c", "pass"], bufsize=None) self.assertEqual(p.wait(), 0) + def _test_bufsize_equal_one(self, line, expected, universal_newlines): + # subprocess may deadlock with bufsize=1, see issue #21332 + with subprocess.Popen([sys.executable, "-c", "import sys;" + "sys.stdout.write(sys.stdin.readline());" + "sys.stdout.flush()"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + bufsize=1, + universal_newlines=universal_newlines) as p: + p.stdin.write(line) # expect that it flushes the line in text mode + os.close(p.stdin.fileno()) # close it without flushing the buffer + read_line = p.stdout.readline() + try: + p.stdin.close() + except OSError: + pass + p.stdin = None + self.assertEqual(p.returncode, 0) + self.assertEqual(read_line, expected) + + def test_bufsize_equal_one_text_mode(self): + # line is flushed in text mode with bufsize=1. + # we should get the full line in return + line = "line\n" + self._test_bufsize_equal_one(line, line, universal_newlines=True) + + def test_bufsize_equal_one_binary_mode(self): + # line is not flushed in binary mode with bufsize=1. + # we should get empty response + line = b'line' + os.linesep.encode() # assume ascii-based locale + self._test_bufsize_equal_one(line, b'', universal_newlines=False) + def test_leaking_fds_on_error(self): # see bug #5179: Popen leaks file descriptors to PIPEs if # the child fails to execute; this will eventually exhaust @@ -2183,6 +2216,39 @@ class POSIXProcessTestCase(BaseTestCase): self.assertNotIn(fd, remaining_fds) + @support.cpython_only + def test_fork_exec(self): + # Issue #22290: fork_exec() must not crash on memory allocation failure + # or other errors + import _posixsubprocess + gc_enabled = gc.isenabled() + try: + # Use a preexec function and enable the garbage collector + # to force fork_exec() to re-enable the garbage collector + # on error. + func = lambda: None + gc.enable() + + executable_list = "exec" # error: must be a sequence + + for args, exe_list, cwd, env_list in ( + (123, [b"exe"], None, [b"env"]), + ([b"arg"], 123, None, [b"env"]), + ([b"arg"], [b"exe"], 123, [b"env"]), + ([b"arg"], [b"exe"], None, 123), + ): + with self.assertRaises(TypeError): + _posixsubprocess.fork_exec( + args, exe_list, + True, [], cwd, env_list, + -1, -1, -1, -1, + 1, 2, 3, 4, + True, True, func) + finally: + if not gc_enabled: + gc.disable() + + @unittest.skipUnless(mswindows, "Windows specific tests") class Win32ProcessTestCase(BaseTestCase): diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index 9bd0a01..03ce9d1 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -69,6 +69,7 @@ class TestSupport(unittest.TestCase): finally: del sys.path[0] support.unlink(mod_filename) + support.rmtree('__pycache__') def test_HOST(self): s = socket.socket() diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py index ec975f8..2e10fdd 100644 --- a/Lib/test/test_tempfile.py +++ b/Lib/test/test_tempfile.py @@ -1211,6 +1211,30 @@ class TestTemporaryDirectory(BaseTestCase): self.assertNotIn("Exception ", err) self.assertIn("ResourceWarning: Implicitly cleaning up", err) + def test_exit_on_shutdown(self): + # Issue #22427 + with self.do_create() as dir: + code = """if True: + import sys + import tempfile + import warnings + + def generator(): + with tempfile.TemporaryDirectory(dir={dir!r}) as tmp: + yield tmp + g = generator() + sys.stdout.buffer.write(next(g).encode()) + + warnings.filterwarnings("always", category=ResourceWarning) + """.format(dir=dir) + rc, out, err = script_helper.assert_python_ok("-c", code) + tmp_name = out.decode().strip() + self.assertFalse(os.path.exists(tmp_name), + "TemporaryDirectory %s exists after cleanup" % tmp_name) + err = err.decode('utf-8', 'backslashreplace') + self.assertNotIn("Exception ", err) + self.assertIn("ResourceWarning: Implicitly cleaning up", err) + def test_warnings_on_cleanup(self): # ResourceWarning will be triggered by __del__ with self.do_create() as dir: diff --git a/Lib/test/test_threaded_import.py b/Lib/test/test_threaded_import.py index c7bfea0..192fa08 100644 --- a/Lib/test/test_threaded_import.py +++ b/Lib/test/test_threaded_import.py @@ -13,7 +13,8 @@ import time import shutil import unittest from test.support import ( - verbose, import_module, run_unittest, TESTFN, reap_threads, forget, unlink) + verbose, import_module, run_unittest, TESTFN, reap_threads, + forget, unlink, rmtree) threading = import_module('threading') def task(N, done, done_tasks, errors): @@ -222,6 +223,7 @@ class ThreadedImportTests(unittest.TestCase): f.write(code.encode('utf-8')) self.addCleanup(unlink, filename) self.addCleanup(forget, TESTFN) + self.addCleanup(rmtree, '__pycache__') importlib.invalidate_caches() __import__(TESTFN) diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 7170f60..98f01ee 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -4,7 +4,7 @@ Tests for the threading module. import test.support from test.support import verbose, strip_python_stderr, import_module, cpython_only -from test.script_helper import assert_python_ok +from test.script_helper import assert_python_ok, assert_python_failure import random import re @@ -15,7 +15,6 @@ import time import unittest import weakref import os -from test.script_helper import assert_python_ok, assert_python_failure import subprocess from test import lock_tests @@ -962,6 +961,88 @@ class ThreadingExceptionTests(BaseTestCase): self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode()) self.assertEqual(data, expected_output) + def test_print_exception(self): + script = r"""if True: + import threading + import time + + running = False + def run(): + global running + running = True + while running: + time.sleep(0.01) + 1/0 + t = threading.Thread(target=run) + t.start() + while not running: + time.sleep(0.01) + running = False + t.join() + """ + rc, out, err = assert_python_ok("-c", script) + self.assertEqual(out, b'') + err = err.decode() + self.assertIn("Exception in thread", err) + self.assertIn("Traceback (most recent call last):", err) + self.assertIn("ZeroDivisionError", err) + self.assertNotIn("Unhandled exception", err) + + def test_print_exception_stderr_is_none_1(self): + script = r"""if True: + import sys + import threading + import time + + running = False + def run(): + global running + running = True + while running: + time.sleep(0.01) + 1/0 + t = threading.Thread(target=run) + t.start() + while not running: + time.sleep(0.01) + sys.stderr = None + running = False + t.join() + """ + rc, out, err = assert_python_ok("-c", script) + self.assertEqual(out, b'') + err = err.decode() + self.assertIn("Exception in thread", err) + self.assertIn("Traceback (most recent call last):", err) + self.assertIn("ZeroDivisionError", err) + self.assertNotIn("Unhandled exception", err) + + def test_print_exception_stderr_is_none_2(self): + script = r"""if True: + import sys + import threading + import time + + running = False + def run(): + global running + running = True + while running: + time.sleep(0.01) + 1/0 + sys.stderr = None + t = threading.Thread(target=run) + t.start() + while not running: + time.sleep(0.01) + running = False + t.join() + """ + rc, out, err = assert_python_ok("-c", script) + self.assertEqual(out, b'') + self.assertNotIn("Unhandled exception", err.decode()) + + class TimerTests(BaseTestCase): def setUp(self): diff --git a/Lib/test/test_userstring.py b/Lib/test/test_userstring.py index 34c629c..9bc8edd 100644 --- a/Lib/test/test_userstring.py +++ b/Lib/test/test_userstring.py @@ -28,14 +28,12 @@ class UserStringTest( realresult ) - def checkraises(self, exc, object, methodname, *args): - object = self.fixtype(object) + def checkraises(self, exc, obj, methodname, *args): + obj = self.fixtype(obj) # we don't fix the arguments, because UserString can't cope with it - self.assertRaises( - exc, - getattr(object, methodname), - *args - ) + with self.assertRaises(exc) as cm: + getattr(obj, methodname)(*args) + self.assertNotEqual(str(cm.exception), '') def checkcall(self, object, methodname, *args): object = self.fixtype(object) diff --git a/Lib/test/test_weakref.py b/Lib/test/test_weakref.py index 0a59120..3e7347c 100644 --- a/Lib/test/test_weakref.py +++ b/Lib/test/test_weakref.py @@ -1298,6 +1298,36 @@ class MappingTestCase(TestBase): dict.clear() self.assertEqual(len(dict), 0) + def check_weak_del_and_len_while_iterating(self, dict, testcontext): + # Check that len() works when both iterating and removing keys + # explicitly through various means (.pop(), .clear()...), while + # implicit mutation is deferred because an iterator is alive. + # (each call to testcontext() should schedule one item for removal + # for this test to work properly) + o = Object(123456) + with testcontext(): + n = len(dict) + dict.popitem() + self.assertEqual(len(dict), n - 1) + dict[o] = o + self.assertEqual(len(dict), n) + with testcontext(): + self.assertEqual(len(dict), n - 1) + dict.pop(next(dict.keys())) + self.assertEqual(len(dict), n - 2) + with testcontext(): + self.assertEqual(len(dict), n - 3) + del dict[next(dict.keys())] + self.assertEqual(len(dict), n - 4) + with testcontext(): + self.assertEqual(len(dict), n - 5) + dict.popitem() + self.assertEqual(len(dict), n - 6) + with testcontext(): + dict.clear() + self.assertEqual(len(dict), 0) + self.assertEqual(len(dict), 0) + def test_weak_keys_destroy_while_iterating(self): # Issue #7105: iterators shouldn't crash when a key is implicitly removed dict, objects = self.make_weak_keyed_dict() @@ -1319,6 +1349,10 @@ class MappingTestCase(TestBase): it = None # should commit all removals gc.collect() self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) + # Issue #21173: len() fragile when keys are both implicitly and + # explicitly removed. + dict, objects = self.make_weak_keyed_dict() + self.check_weak_del_and_len_while_iterating(dict, testcontext) def test_weak_values_destroy_while_iterating(self): # Issue #7105: iterators shouldn't crash when a key is implicitly removed @@ -1342,6 +1376,8 @@ class MappingTestCase(TestBase): it = None # should commit all removals gc.collect() self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) + dict, objects = self.make_weak_valued_dict() + self.check_weak_del_and_len_while_iterating(dict, testcontext) def test_make_weak_keyed_dict_from_dict(self): o = Object(3) diff --git a/Lib/test/test_zipfile.py b/Lib/test/test_zipfile.py index 8dae2eb..a17eaaa 100644 --- a/Lib/test/test_zipfile.py +++ b/Lib/test/test_zipfile.py @@ -462,7 +462,9 @@ class AbstractTestZip64InSmallFiles: def setUp(self): self._limit = zipfile.ZIP64_LIMIT - zipfile.ZIP64_LIMIT = 5 + self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT + zipfile.ZIP64_LIMIT = 1000 + zipfile.ZIP_FILECOUNT_LIMIT = 9 # Make a source file with some lines with open(TESTFN, "wb") as fp: @@ -529,8 +531,67 @@ class AbstractTestZip64InSmallFiles: for f in get_files(self): self.zip_test(f, self.compression) + def test_too_many_files(self): + # This test checks that more than 64k files can be added to an archive, + # and that the resulting archive can be read properly by ZipFile + zipf = zipfile.ZipFile(TESTFN, "w", self.compression, + allowZip64=True) + zipf.debug = 100 + numfiles = 15 + for i in range(numfiles): + zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) + self.assertEqual(len(zipf.namelist()), numfiles) + zipf.close() + + zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) + self.assertEqual(len(zipf2.namelist()), numfiles) + for i in range(numfiles): + content = zipf2.read("foo%08d" % i).decode('ascii') + self.assertEqual(content, "%d" % (i**3 % 57)) + zipf2.close() + + def test_too_many_files_append(self): + zipf = zipfile.ZipFile(TESTFN, "w", self.compression, + allowZip64=False) + zipf.debug = 100 + numfiles = 9 + for i in range(numfiles): + zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) + self.assertEqual(len(zipf.namelist()), numfiles) + with self.assertRaises(zipfile.LargeZipFile): + zipf.writestr("foo%08d" % numfiles, b'') + self.assertEqual(len(zipf.namelist()), numfiles) + zipf.close() + + zipf = zipfile.ZipFile(TESTFN, "a", self.compression, + allowZip64=False) + zipf.debug = 100 + self.assertEqual(len(zipf.namelist()), numfiles) + with self.assertRaises(zipfile.LargeZipFile): + zipf.writestr("foo%08d" % numfiles, b'') + self.assertEqual(len(zipf.namelist()), numfiles) + zipf.close() + + zipf = zipfile.ZipFile(TESTFN, "a", self.compression, + allowZip64=True) + zipf.debug = 100 + self.assertEqual(len(zipf.namelist()), numfiles) + numfiles2 = 15 + for i in range(numfiles, numfiles2): + zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) + self.assertEqual(len(zipf.namelist()), numfiles2) + zipf.close() + + zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) + self.assertEqual(len(zipf2.namelist()), numfiles2) + for i in range(numfiles2): + content = zipf2.read("foo%08d" % i).decode('ascii') + self.assertEqual(content, "%d" % (i**3 % 57)) + zipf2.close() + def tearDown(self): zipfile.ZIP64_LIMIT = self._limit + zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit unlink(TESTFN) unlink(TESTFN2) @@ -1635,11 +1696,48 @@ class TestWithDirectory(unittest.TestCase): os.mkdir(os.path.join(TESTFN2, "a")) self.test_extract_dir() - def test_store_dir(self): + def test_write_dir(self): + dirpath = os.path.join(TESTFN2, "x") + os.mkdir(dirpath) + mode = os.stat(dirpath).st_mode & 0xFFFF + with zipfile.ZipFile(TESTFN, "w") as zipf: + zipf.write(dirpath) + zinfo = zipf.filelist[0] + self.assertTrue(zinfo.filename.endswith("/x/")) + self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) + zipf.write(dirpath, "y") + zinfo = zipf.filelist[1] + self.assertTrue(zinfo.filename, "y/") + self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) + with zipfile.ZipFile(TESTFN, "r") as zipf: + zinfo = zipf.filelist[0] + self.assertTrue(zinfo.filename.endswith("/x/")) + self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) + zinfo = zipf.filelist[1] + self.assertTrue(zinfo.filename, "y/") + self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) + target = os.path.join(TESTFN2, "target") + os.mkdir(target) + zipf.extractall(target) + self.assertTrue(os.path.isdir(os.path.join(target, "y"))) + self.assertEqual(len(os.listdir(target)), 2) + + def test_writestr_dir(self): os.mkdir(os.path.join(TESTFN2, "x")) - zipf = zipfile.ZipFile(TESTFN, "w") - zipf.write(os.path.join(TESTFN2, "x"), "x") - self.assertTrue(zipf.filelist[0].filename.endswith("x/")) + with zipfile.ZipFile(TESTFN, "w") as zipf: + zipf.writestr("x/", b'') + zinfo = zipf.filelist[0] + self.assertEqual(zinfo.filename, "x/") + self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) + with zipfile.ZipFile(TESTFN, "r") as zipf: + zinfo = zipf.filelist[0] + self.assertTrue(zinfo.filename.endswith("x/")) + self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) + target = os.path.join(TESTFN2, "target") + os.mkdir(target) + zipf.extractall(target) + self.assertTrue(os.path.isdir(os.path.join(target, "x"))) + self.assertEqual(os.listdir(target), ["x"]) def tearDown(self): rmtree(TESTFN2) diff --git a/Lib/test/test_zipfile64.py b/Lib/test/test_zipfile64.py index 498d464..7dea8a3 100644 --- a/Lib/test/test_zipfile64.py +++ b/Lib/test/test_zipfile64.py @@ -18,7 +18,7 @@ import sys from io import StringIO from tempfile import TemporaryFile -from test.support import TESTFN, run_unittest, requires_zlib +from test.support import TESTFN, requires_zlib TESTFN2 = TESTFN + "2" @@ -92,7 +92,7 @@ class OtherTests(unittest.TestCase): def testMoreThan64kFiles(self): # This test checks that more than 64k files can be added to an archive, # and that the resulting archive can be read properly by ZipFile - zipf = zipfile.ZipFile(TESTFN, mode="w", allowZip64=False) + zipf = zipfile.ZipFile(TESTFN, mode="w", allowZip64=True) zipf.debug = 100 numfiles = (1 << 16) * 3//2 for i in range(numfiles): @@ -105,14 +105,47 @@ class OtherTests(unittest.TestCase): for i in range(numfiles): content = zipf2.read("foo%08d" % i).decode('ascii') self.assertEqual(content, "%d" % (i**3 % 57)) + zipf2.close() + + def testMoreThan64kFilesAppend(self): + zipf = zipfile.ZipFile(TESTFN, mode="w", allowZip64=False) + zipf.debug = 100 + numfiles = (1 << 16) - 1 + for i in range(numfiles): + zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) + self.assertEqual(len(zipf.namelist()), numfiles) + with self.assertRaises(zipfile.LargeZipFile): + zipf.writestr("foo%08d" % numfiles, b'') + self.assertEqual(len(zipf.namelist()), numfiles) zipf.close() + zipf = zipfile.ZipFile(TESTFN, mode="a", allowZip64=False) + zipf.debug = 100 + self.assertEqual(len(zipf.namelist()), numfiles) + with self.assertRaises(zipfile.LargeZipFile): + zipf.writestr("foo%08d" % numfiles, b'') + self.assertEqual(len(zipf.namelist()), numfiles) + zipf.close() + + zipf = zipfile.ZipFile(TESTFN, mode="a", allowZip64=True) + zipf.debug = 100 + self.assertEqual(len(zipf.namelist()), numfiles) + numfiles2 = (1 << 16) * 3//2 + for i in range(numfiles, numfiles2): + zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) + self.assertEqual(len(zipf.namelist()), numfiles2) + zipf.close() + + zipf2 = zipfile.ZipFile(TESTFN, mode="r") + self.assertEqual(len(zipf2.namelist()), numfiles2) + for i in range(numfiles2): + content = zipf2.read("foo%08d" % i).decode('ascii') + self.assertEqual(content, "%d" % (i**3 % 57)) + zipf2.close() + def tearDown(self): support.unlink(TESTFN) support.unlink(TESTFN2) -def test_main(): - run_unittest(TestsWithSourceFile, OtherTests) - if __name__ == "__main__": - test_main() + unittest.main() |
