diff options
Diffstat (limited to 'Lib/test')
50 files changed, 991 insertions, 878 deletions
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py index 8f3b689..c342d43 100755 --- a/Lib/test/regrtest.py +++ b/Lib/test/regrtest.py @@ -1621,20 +1621,6 @@ _expectations = ( test_ossaudiodev test_socketserver """), - ('os2emx', - """ - test_audioop - test_curses - test_epoll - test_kqueue - test_largefile - test_mmap - test_openpty - test_ossaudiodev - test_pty - test_resource - test_signal - """), ('freebsd', """ test_devpoll diff --git a/Lib/test/support.py b/Lib/test/support.py index c5640e0..ec4b47d 100644 --- a/Lib/test/support.py +++ b/Lib/test/support.py @@ -603,6 +603,49 @@ else: # module name. TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) +# FS_NONASCII: non-ASCII character encodable by os.fsencode(), +# or None if there is no such character. +FS_NONASCII = None +for character in ( + # First try printable and common characters to have a readable filename. + # For each character, the encoding list are just example of encodings able + # to encode the character (the list is not exhaustive). + + # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 + '\u00E6', + # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 + '\u0130', + # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 + '\u0141', + # U+03C6 (Greek Small Letter Phi): cp1253 + '\u03C6', + # U+041A (Cyrillic Capital Letter Ka): cp1251 + '\u041A', + # U+05D0 (Hebrew Letter Alef): Encodable to cp424 + '\u05D0', + # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic + '\u060C', + # U+062A (Arabic Letter Teh): cp720 + '\u062A', + # U+0E01 (Thai Character Ko Kai): cp874 + '\u0E01', + + # Then try more "special" characters. "special" because they may be + # interpreted or displayed differently depending on the exact locale + # encoding and the font. + + # U+00A0 (No-Break Space) + '\u00A0', + # U+20AC (Euro Sign) + '\u20AC', +): + try: + os.fsdecode(os.fsencode(character)) + except UnicodeError: + pass + else: + FS_NONASCII = character + break # TESTFN_UNICODE is a non-ascii filename TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f" @@ -647,6 +690,26 @@ elif sys.platform != 'darwin': # the byte 0xff. Skip some unicode filename tests. pass +# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be +# decoded from the filesystem encoding (in strict mode). It can be None if we +# cannot generate such filename. +TESTFN_UNDECODABLE = None +# b'\xff' is not decodable by os.fsdecode() with code page 932. Windows +# accepts it to create a file or a directory, or don't accept to enter to +# such directory (when the bytes name is used). So test b'\xe7' first: it is +# not decodable from cp932. +for name in (b'\xe7w\xf0', b'abc\xff'): + try: + os.fsdecode(name) + except UnicodeDecodeError: + TESTFN_UNDECODABLE = name + break + +if FS_NONASCII: + TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII +else: + TESTFN_NONASCII = None + # Save the initial cwd SAVEDCWD = os.getcwd() diff --git a/Lib/test/test_bytes.py b/Lib/test/test_bytes.py index fe6e939..8ae90e4 100644 --- a/Lib/test/test_bytes.py +++ b/Lib/test/test_bytes.py @@ -288,8 +288,22 @@ class BaseBytesTest(unittest.TestCase): self.assertEqual(self.type2test(b"").join(lst), b"abc") self.assertEqual(self.type2test(b"").join(tuple(lst)), b"abc") self.assertEqual(self.type2test(b"").join(iter(lst)), b"abc") - self.assertEqual(self.type2test(b".").join([b"ab", b"cd"]), b"ab.cd") - # XXX more... + dot_join = self.type2test(b".:").join + self.assertEqual(dot_join([b"ab", b"cd"]), b"ab.:cd") + self.assertEqual(dot_join([memoryview(b"ab"), b"cd"]), b"ab.:cd") + self.assertEqual(dot_join([b"ab", memoryview(b"cd")]), b"ab.:cd") + self.assertEqual(dot_join([bytearray(b"ab"), b"cd"]), b"ab.:cd") + self.assertEqual(dot_join([b"ab", bytearray(b"cd")]), b"ab.:cd") + # Stress it with many items + seq = [b"abc"] * 1000 + expected = b"abc" + b".:abc" * 999 + self.assertEqual(dot_join(seq), expected) + # Error handling and cleanup when some item in the middle of the + # sequence has the wrong type. + with self.assertRaises(TypeError): + dot_join([bytearray(b"ab"), "cd", b"ef"]) + with self.assertRaises(TypeError): + dot_join([memoryview(b"ab"), "cd", b"ef"]) def test_count(self): b = self.type2test(b'mississippi') @@ -1267,6 +1281,11 @@ class BytearrayPEP3137Test(unittest.TestCase, self.assertEqual(val, newval) self.assertTrue(val is not newval, expr+' returned val on a mutable object') + sep = self.marshal(b'') + newval = sep.join([val]) + self.assertEqual(val, newval) + self.assertIsNot(val, newval) + class FixedStringTest(test.string_tests.BaseTest): diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py index f4e81db..24a1813 100644 --- a/Lib/test/test_bz2.py +++ b/Lib/test/test_bz2.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 from test import support -from test.support import TESTFN, bigmemtest, _4G +from test.support import bigmemtest, _4G import unittest from io import BytesIO @@ -18,10 +18,10 @@ except ImportError: bz2 = support.import_module('bz2') from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor -has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx") class BaseTest(unittest.TestCase): "Base for other testcases." + TEXT_LINES = [ b'root:x:0:0:root:/root:/bin/bash\n', b'bin:x:1:1:bin:/bin:\n', @@ -49,13 +49,17 @@ class BaseTest(unittest.TestCase): DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`' def setUp(self): - self.filename = TESTFN + self.filename = support.TESTFN def tearDown(self): if os.path.isfile(self.filename): os.unlink(self.filename) - if has_cmdline_bunzip2: + if sys.platform == "win32": + # bunzip2 isn't available to run on Windows. + def decompress(self, data): + return bz2.decompress(data) + else: def decompress(self, data): pop = subprocess.Popen("bunzip2", shell=True, stdin=subprocess.PIPE, @@ -69,31 +73,21 @@ class BaseTest(unittest.TestCase): ret = bz2.decompress(data) return ret - else: - # bunzip2 isn't available to run on Windows. - def decompress(self, data): - return bz2.decompress(data) class BZ2FileTest(BaseTest): - "Test BZ2File type miscellaneous methods." + "Test the BZ2File class." def createTempFile(self, streams=1): with open(self.filename, "wb") as f: f.write(self.DATA * streams) def testBadArgs(self): - with self.assertRaises(TypeError): - BZ2File(123.456) - with self.assertRaises(ValueError): - BZ2File("/dev/null", "z") - with self.assertRaises(ValueError): - BZ2File("/dev/null", "rx") - with self.assertRaises(ValueError): - BZ2File("/dev/null", "rbt") - with self.assertRaises(ValueError): - BZ2File("/dev/null", compresslevel=0) - with self.assertRaises(ValueError): - BZ2File("/dev/null", compresslevel=10) + self.assertRaises(TypeError, BZ2File, 123.456) + self.assertRaises(ValueError, BZ2File, "/dev/null", "z") + self.assertRaises(ValueError, BZ2File, "/dev/null", "rx") + self.assertRaises(ValueError, BZ2File, "/dev/null", "rbt") + self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=0) + self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=10) def testRead(self): self.createTempFile() @@ -214,9 +208,8 @@ class BZ2FileTest(BaseTest): self.createTempFile() bz2f = BZ2File(self.filename) bz2f.close() - self.assertRaises(ValueError, bz2f.__next__) - # This call will deadlock if the above .__next__ call failed to - # release the lock. + self.assertRaises(ValueError, next, bz2f) + # This call will deadlock if the above call failed to release the lock. self.assertRaises(ValueError, bz2f.readlines) def testWrite(self): @@ -379,7 +372,7 @@ class BZ2FileTest(BaseTest): bz2f.close() self.assertRaises(ValueError, bz2f.seekable) - bz2f = BZ2File(BytesIO(), mode="w") + bz2f = BZ2File(BytesIO(), "w") try: self.assertFalse(bz2f.seekable()) finally: @@ -405,7 +398,7 @@ class BZ2FileTest(BaseTest): bz2f.close() self.assertRaises(ValueError, bz2f.readable) - bz2f = BZ2File(BytesIO(), mode="w") + bz2f = BZ2File(BytesIO(), "w") try: self.assertFalse(bz2f.readable()) finally: @@ -422,7 +415,7 @@ class BZ2FileTest(BaseTest): bz2f.close() self.assertRaises(ValueError, bz2f.writable) - bz2f = BZ2File(BytesIO(), mode="w") + bz2f = BZ2File(BytesIO(), "w") try: self.assertTrue(bz2f.writable()) finally: @@ -476,7 +469,7 @@ class BZ2FileTest(BaseTest): # Issue #7205: Using a BZ2File from several threads shouldn't deadlock. data = b"1" * 2**20 nthreads = 10 - with bz2.BZ2File(self.filename, 'wb') as f: + with BZ2File(self.filename, 'wb') as f: def comp(): for i in range(5): f.write(data) @@ -487,28 +480,27 @@ class BZ2FileTest(BaseTest): t.join() def testWithoutThreading(self): - bz2 = support.import_fresh_module("bz2", blocked=("threading",)) - with bz2.BZ2File(self.filename, "wb") as f: + module = support.import_fresh_module("bz2", blocked=("threading",)) + with module.BZ2File(self.filename, "wb") as f: f.write(b"abc") - with bz2.BZ2File(self.filename, "rb") as f: + with module.BZ2File(self.filename, "rb") as f: self.assertEqual(f.read(), b"abc") def testMixedIterationAndReads(self): self.createTempFile() linelen = len(self.TEXT_LINES[0]) halflen = linelen // 2 - with bz2.BZ2File(self.filename) as bz2f: + with BZ2File(self.filename) as bz2f: bz2f.read(halflen) self.assertEqual(next(bz2f), self.TEXT_LINES[0][halflen:]) self.assertEqual(bz2f.read(), self.TEXT[linelen:]) - with bz2.BZ2File(self.filename) as bz2f: + with BZ2File(self.filename) as bz2f: bz2f.readline() self.assertEqual(next(bz2f), self.TEXT_LINES[1]) self.assertEqual(bz2f.readline(), self.TEXT_LINES[2]) - with bz2.BZ2File(self.filename) as bz2f: + with BZ2File(self.filename) as bz2f: bz2f.readlines() - with self.assertRaises(StopIteration): - next(bz2f) + self.assertRaises(StopIteration, next, bz2f) self.assertEqual(bz2f.readlines(), []) def testMultiStreamOrdering(self): @@ -576,6 +568,7 @@ class BZ2FileTest(BaseTest): bz2f.seek(-150, 1) self.assertEqual(bz2f.read(), self.TEXT[500-150:]) + class BZ2CompressorTest(BaseTest): def testCompress(self): bz2c = BZ2Compressor() @@ -688,97 +681,102 @@ class CompressDecompressTest(BaseTest): class OpenTest(BaseTest): + "Test the open function." + + def open(self, *args, **kwargs): + return bz2.open(*args, **kwargs) + def test_binary_modes(self): - with bz2.open(self.filename, "wb") as f: + with self.open(self.filename, "wb") as f: f.write(self.TEXT) with open(self.filename, "rb") as f: - file_data = bz2.decompress(f.read()) + file_data = self.decompress(f.read()) self.assertEqual(file_data, self.TEXT) - with bz2.open(self.filename, "rb") as f: + with self.open(self.filename, "rb") as f: self.assertEqual(f.read(), self.TEXT) - with bz2.open(self.filename, "ab") as f: + with self.open(self.filename, "ab") as f: f.write(self.TEXT) with open(self.filename, "rb") as f: - file_data = bz2.decompress(f.read()) + file_data = self.decompress(f.read()) self.assertEqual(file_data, self.TEXT * 2) def test_implicit_binary_modes(self): # Test implicit binary modes (no "b" or "t" in mode string). - with bz2.open(self.filename, "w") as f: + with self.open(self.filename, "w") as f: f.write(self.TEXT) with open(self.filename, "rb") as f: - file_data = bz2.decompress(f.read()) + file_data = self.decompress(f.read()) self.assertEqual(file_data, self.TEXT) - with bz2.open(self.filename, "r") as f: + with self.open(self.filename, "r") as f: self.assertEqual(f.read(), self.TEXT) - with bz2.open(self.filename, "a") as f: + with self.open(self.filename, "a") as f: f.write(self.TEXT) with open(self.filename, "rb") as f: - file_data = bz2.decompress(f.read()) + file_data = self.decompress(f.read()) self.assertEqual(file_data, self.TEXT * 2) def test_text_modes(self): text = self.TEXT.decode("ascii") text_native_eol = text.replace("\n", os.linesep) - with bz2.open(self.filename, "wt") as f: + with self.open(self.filename, "wt") as f: f.write(text) with open(self.filename, "rb") as f: - file_data = bz2.decompress(f.read()).decode("ascii") + file_data = self.decompress(f.read()).decode("ascii") self.assertEqual(file_data, text_native_eol) - with bz2.open(self.filename, "rt") as f: + with self.open(self.filename, "rt") as f: self.assertEqual(f.read(), text) - with bz2.open(self.filename, "at") as f: + with self.open(self.filename, "at") as f: f.write(text) with open(self.filename, "rb") as f: - file_data = bz2.decompress(f.read()).decode("ascii") + file_data = self.decompress(f.read()).decode("ascii") self.assertEqual(file_data, text_native_eol * 2) def test_fileobj(self): - with bz2.open(BytesIO(self.DATA), "r") as f: + with self.open(BytesIO(self.DATA), "r") as f: self.assertEqual(f.read(), self.TEXT) - with bz2.open(BytesIO(self.DATA), "rb") as f: + with self.open(BytesIO(self.DATA), "rb") as f: self.assertEqual(f.read(), self.TEXT) text = self.TEXT.decode("ascii") - with bz2.open(BytesIO(self.DATA), "rt") as f: + with self.open(BytesIO(self.DATA), "rt") as f: self.assertEqual(f.read(), text) def test_bad_params(self): # Test invalid parameter combinations. - with self.assertRaises(ValueError): - bz2.open(self.filename, "wbt") - with self.assertRaises(ValueError): - bz2.open(self.filename, "rb", encoding="utf-8") - with self.assertRaises(ValueError): - bz2.open(self.filename, "rb", errors="ignore") - with self.assertRaises(ValueError): - bz2.open(self.filename, "rb", newline="\n") + self.assertRaises(ValueError, + self.open, self.filename, "wbt") + self.assertRaises(ValueError, + self.open, self.filename, "rb", encoding="utf-8") + self.assertRaises(ValueError, + self.open, self.filename, "rb", errors="ignore") + self.assertRaises(ValueError, + self.open, self.filename, "rb", newline="\n") def test_encoding(self): # Test non-default encoding. text = self.TEXT.decode("ascii") text_native_eol = text.replace("\n", os.linesep) - with bz2.open(self.filename, "wt", encoding="utf-16-le") as f: + with self.open(self.filename, "wt", encoding="utf-16-le") as f: f.write(text) with open(self.filename, "rb") as f: - file_data = bz2.decompress(f.read()).decode("utf-16-le") + file_data = self.decompress(f.read()).decode("utf-16-le") self.assertEqual(file_data, text_native_eol) - with bz2.open(self.filename, "rt", encoding="utf-16-le") as f: + with self.open(self.filename, "rt", encoding="utf-16-le") as f: self.assertEqual(f.read(), text) def test_encoding_error_handler(self): # Test with non-default encoding error handler. - with bz2.open(self.filename, "wb") as f: + with self.open(self.filename, "wb") as f: f.write(b"foo\xffbar") - with bz2.open(self.filename, "rt", encoding="ascii", errors="ignore") \ + with self.open(self.filename, "rt", encoding="ascii", errors="ignore") \ as f: self.assertEqual(f.read(), "foobar") def test_newline(self): # Test with explicit newline (universal newline mode disabled). text = self.TEXT.decode("ascii") - with bz2.open(self.filename, "wt", newline="\n") as f: + with self.open(self.filename, "wt", newline="\n") as f: f.write(text) - with bz2.open(self.filename, "rt", newline="\r") as f: + with self.open(self.filename, "rt", newline="\r") as f: self.assertEqual(f.readlines(), [text]) diff --git a/Lib/test/test_cmd_line.py b/Lib/test/test_cmd_line.py index 2b0c6e2..7a10288 100644 --- a/Lib/test/test_cmd_line.py +++ b/Lib/test/test_cmd_line.py @@ -93,15 +93,15 @@ class CmdLineTest(unittest.TestCase): # All good if execution is successful assert_python_ok('-c', 'pass') - @unittest.skipIf(sys.getfilesystemencoding() == 'ascii', - 'need a filesystem encoding different than ASCII') + @unittest.skipUnless(test.support.FS_NONASCII, 'need support.FS_NONASCII') def test_non_ascii(self): # Test handling of non-ascii data if test.support.verbose: import locale print('locale encoding = %s, filesystem encoding = %s' % (locale.getpreferredencoding(), sys.getfilesystemencoding())) - command = "assert(ord('\xe9') == 0xe9)" + command = ("assert(ord(%r) == %s)" + % (test.support.FS_NONASCII, ord(test.support.FS_NONASCII))) assert_python_ok('-c', command) # On Windows, pass bytes to subprocess doesn't test how Python decodes the @@ -216,6 +216,23 @@ class CmdLineTest(unittest.TestCase): self.assertIn(path1.encode('ascii'), out) self.assertIn(path2.encode('ascii'), out) + def test_empty_PYTHONPATH_issue16309(self): + """On Posix, it is documented that setting PATH to the + empty string is equivalent to not setting PATH at all, + which is an exception to the rule that in a string like + "/bin::/usr/bin" the empty string in the middle gets + interpreted as '.'""" + code = """if 1: + import sys + path = ":".join(sys.path) + path = path.encode("ascii", "backslashreplace") + sys.stdout.buffer.write(path)""" + rc1, out1, err1 = assert_python_ok('-c', code, PYTHONPATH="") + rc2, out2, err2 = assert_python_ok('-c', code) + # regarding to Posix specification, outputs should be equal + # for empty and unset PYTHONPATH + self.assertEquals(out1, out2) + def test_displayhook_unencodable(self): for encoding in ('ascii', 'latin-1', 'utf-8'): env = os.environ.copy() @@ -293,7 +310,7 @@ class CmdLineTest(unittest.TestCase): rc, out, err = assert_python_ok('-c', code) self.assertEqual(b'', out) self.assertRegex(err.decode('ascii', 'ignore'), - 'Exception OSError: .* ignored') + 'Exception ignored in.*\nOSError: .*') def test_closed_stdout(self): # Issue #13444: if stdout has been explicitly closed, we should diff --git a/Lib/test/test_cmd_line_script.py b/Lib/test/test_cmd_line_script.py index 6e097e3..77152b3 100644 --- a/Lib/test/test_cmd_line_script.py +++ b/Lib/test/test_cmd_line_script.py @@ -363,14 +363,19 @@ class CmdLineTest(unittest.TestCase): self.assertTrue(text[1].startswith(' File ')) self.assertTrue(text[3].startswith('NameError')) - def test_non_utf8(self): + @unittest.skipUnless(support.TESTFN_NONASCII, 'need support.TESTFN_NONASCII') + def test_non_ascii(self): # Issue #16218 - with temp_dir() as script_dir: - script_name = _make_test_script(script_dir, - '\udcf1\udcea\udcf0\udce8\udcef\udcf2') - self._check_script(script_name, script_name, script_name, - script_dir, None, - importlib.machinery.SourceFileLoader) + source = 'print(ascii(__file__))\n' + script_name = _make_test_script(os.curdir, support.TESTFN_NONASCII, source) + self.addCleanup(support.unlink, script_name) + rc, stdout, stderr = assert_python_ok(script_name) + self.assertEqual( + ascii(script_name), + stdout.rstrip().decode('ascii'), + 'stdout=%r stderr=%r' % (stdout, stderr)) + self.assertEqual(0, rc) + def test_main(): support.run_unittest(CmdLineTest) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 6ae450d..4ad3309 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -15,6 +15,7 @@ import sys import threading import time import unittest +import weakref from concurrent import futures from concurrent.futures._base import ( @@ -52,6 +53,11 @@ def sleep_and_print(t, msg): sys.stdout.flush() +class MyObject(object): + def my_method(self): + pass + + class ExecutorMixin: worker_count = 5 @@ -396,6 +402,22 @@ class ExecutorTest(unittest.TestCase): self.executor.map(str, [2] * (self.worker_count + 1)) self.executor.shutdown() + @test.support.cpython_only + def test_no_stale_references(self): + # Issue #16284: check that the executors don't unnecessarily hang onto + # references. + my_object = MyObject() + my_object_collected = threading.Event() + my_object_callback = weakref.ref( + my_object, lambda obj: my_object_collected.set()) + # Deliberately discarding the future. + self.executor.submit(my_object.my_method) + del my_object + + collected = my_object_collected.wait(timeout=5.0) + self.assertTrue(collected, + "Stale reference not collected within timeout.") + class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): def test_map_submits_without_iteration(self): diff --git a/Lib/test/test_enumerate.py b/Lib/test/test_enumerate.py index 2e904cf..a2d18d0 100644 --- a/Lib/test/test_enumerate.py +++ b/Lib/test/test_enumerate.py @@ -1,4 +1,5 @@ import unittest +import operator import sys import pickle @@ -168,15 +169,12 @@ class TestReversed(unittest.TestCase, PickleTest): x = range(1) self.assertEqual(type(reversed(x)), type(iter(x))) - @support.cpython_only def test_len(self): - # This is an implementation detail, not an interface requirement - from test.test_iterlen import len for s in ('hello', tuple('hello'), list('hello'), range(5)): - self.assertEqual(len(reversed(s)), len(s)) + self.assertEqual(operator.length_hint(reversed(s)), len(s)) r = reversed(s) list(r) - self.assertEqual(len(r), 0) + self.assertEqual(operator.length_hint(r), 0) class SeqWithWeirdLen: called = False def __len__(self): @@ -187,7 +185,7 @@ class TestReversed(unittest.TestCase, PickleTest): def __getitem__(self, index): return index r = reversed(SeqWithWeirdLen()) - self.assertRaises(ZeroDivisionError, len, r) + self.assertRaises(ZeroDivisionError, operator.length_hint, r) def test_gc(self): diff --git a/Lib/test/test_exceptions.py b/Lib/test/test_exceptions.py index 413f4dd..8e5a767 100644 --- a/Lib/test/test_exceptions.py +++ b/Lib/test/test_exceptions.py @@ -8,7 +8,7 @@ import weakref import errno from test.support import (TESTFN, unlink, run_unittest, captured_output, - gc_collect, cpython_only, no_tracing) + check_warnings, gc_collect, cpython_only, no_tracing) class NaiveException(Exception): def __init__(self, x): @@ -939,9 +939,10 @@ class ImportErrorTests(unittest.TestCase): def test_non_str_argument(self): # Issue #15778 - arg = b'abc' - exc = ImportError(arg) - self.assertEqual(str(arg), str(exc)) + with check_warnings(('', BytesWarning), quiet=True): + arg = b'abc' + exc = ImportError(arg) + self.assertEqual(str(arg), str(exc)) def test_main(): diff --git a/Lib/test/test_fcntl.py b/Lib/test/test_fcntl.py index 29af99c..6d9ee0b 100644 --- a/Lib/test/test_fcntl.py +++ b/Lib/test/test_fcntl.py @@ -1,7 +1,4 @@ """Test program for the fcntl C module. - -OS/2+EMX doesn't support the file locking operations. - """ import os import struct @@ -35,8 +32,6 @@ def get_lockdata(): fcntl.F_WRLCK, 0) elif sys.platform in ['aix3', 'aix4', 'hp-uxB', 'unixware7']: lockdata = struct.pack('hhlllii', fcntl.F_WRLCK, 0, 0, 0, 0, 0, 0) - elif sys.platform in ['os2emx']: - lockdata = None else: lockdata = struct.pack('hh'+start_len+'hh', fcntl.F_WRLCK, 0, 0, 0, 0, 0) if lockdata: @@ -62,18 +57,16 @@ class TestFcntl(unittest.TestCase): rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) if verbose: print('Status from fcntl with O_NONBLOCK: ', rv) - if sys.platform not in ['os2emx']: - rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETLKW, lockdata) - if verbose: - print('String from fcntl with F_SETLKW: ', repr(rv)) + rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETLKW, lockdata) + if verbose: + print('String from fcntl with F_SETLKW: ', repr(rv)) self.f.close() def test_fcntl_file_descriptor(self): # again, but pass the file rather than numeric descriptor self.f = open(TESTFN, 'wb') rv = fcntl.fcntl(self.f, fcntl.F_SETFL, os.O_NONBLOCK) - if sys.platform not in ['os2emx']: - rv = fcntl.fcntl(self.f, fcntl.F_SETLKW, lockdata) + rv = fcntl.fcntl(self.f, fcntl.F_SETLKW, lockdata) self.f.close() def test_fcntl_64_bit(self): diff --git a/Lib/test/test_format.py b/Lib/test/test_format.py index b6e2540..e6b0d20 100644 --- a/Lib/test/test_format.py +++ b/Lib/test/test_format.py @@ -307,6 +307,22 @@ class FormatTest(unittest.TestCase): finally: locale.setlocale(locale.LC_ALL, oldloc) + @support.cpython_only + def test_optimisations(self): + text = "abcde" # 5 characters + + self.assertIs("%s" % text, text) + self.assertIs("%.5s" % text, text) + self.assertIs("%.10s" % text, text) + self.assertIs("%1s" % text, text) + self.assertIs("%5s" % text, text) + + self.assertIs("{0}".format(text), text) + self.assertIs("{0:s}".format(text), text) + self.assertIs("{0:.5s}".format(text), text) + self.assertIs("{0:.10s}".format(text), text) + self.assertIs("{0:1s}".format(text), text) + self.assertIs("{0:5s}".format(text), text) def test_main(): diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py index c59b72e..85dbc97 100644 --- a/Lib/test/test_gc.py +++ b/Lib/test/test_gc.py @@ -610,6 +610,32 @@ class GCTests(unittest.TestCase): stderr = run_command(code % "gc.DEBUG_SAVEALL") self.assertNotIn(b"uncollectable objects at shutdown", stderr) + def test_get_stats(self): + stats = gc.get_stats() + self.assertEqual(len(stats), 3) + for st in stats: + self.assertIsInstance(st, dict) + self.assertEqual(set(st), + {"collected", "collections", "uncollectable"}) + self.assertGreaterEqual(st["collected"], 0) + self.assertGreaterEqual(st["collections"], 0) + self.assertGreaterEqual(st["uncollectable"], 0) + # Check that collection counts are incremented correctly + if gc.isenabled(): + self.addCleanup(gc.enable) + gc.disable() + old = gc.get_stats() + gc.collect(0) + new = gc.get_stats() + self.assertEqual(new[0]["collections"], old[0]["collections"] + 1) + self.assertEqual(new[1]["collections"], old[1]["collections"]) + self.assertEqual(new[2]["collections"], old[2]["collections"]) + gc.collect(2) + new = gc.get_stats() + self.assertEqual(new[0]["collections"], old[0]["collections"] + 1) + self.assertEqual(new[1]["collections"], old[1]["collections"]) + self.assertEqual(new[2]["collections"], old[2]["collections"] + 1) + class GCCallbackTests(unittest.TestCase): def setUp(self): diff --git a/Lib/test/test_generators.py b/Lib/test/test_generators.py index 06f67c2..f5c1a7d 100644 --- a/Lib/test/test_generators.py +++ b/Lib/test/test_generators.py @@ -1728,9 +1728,7 @@ Our ill-behaved code should be invoked during GC: >>> g = f() >>> next(g) >>> del g ->>> sys.stderr.getvalue().startswith( -... "Exception RuntimeError: 'generator ignored GeneratorExit' in " -... ) +>>> "RuntimeError: generator ignored GeneratorExit" in sys.stderr.getvalue() True >>> sys.stderr = old @@ -1840,22 +1838,23 @@ to test. ... sys.stderr = io.StringIO() ... class Leaker: ... def __del__(self): -... raise RuntimeError +... def invoke(message): +... raise RuntimeError(message) +... invoke("test") ... ... l = Leaker() ... del l ... err = sys.stderr.getvalue().strip() -... err.startswith( -... "Exception RuntimeError: RuntimeError() in <" -... ) -... err.endswith("> ignored") -... len(err.splitlines()) +... "Exception ignored in" in err +... "RuntimeError: test" in err +... "Traceback" in err +... "in invoke" in err ... finally: ... sys.stderr = old True True -1 - +True +True These refleak tests should perhaps be in a testfile of their own, diff --git a/Lib/test/test_genericpath.py b/Lib/test/test_genericpath.py index 3eadd58..9d1d2bc 100644 --- a/Lib/test/test_genericpath.py +++ b/Lib/test/test_genericpath.py @@ -308,19 +308,28 @@ class CommonTest(GenericTest): for path in ('', 'fuu', 'f\xf9\xf9', '/fuu', 'U:\\'): self.assertIsInstance(abspath(path), str) - @unittest.skipIf(sys.platform == 'darwin', - "Mac OS X denies the creation of a directory with an invalid utf8 name") def test_nonascii_abspath(self): - name = b'\xe7w\xf0' - if sys.platform == 'win32': - try: - os.fsdecode(name) - except UnicodeDecodeError: - self.skipTest("the filename %a is not decodable " - "from the ANSI code page %s" - % (name, sys.getfilesystemencoding())) - - # Test non-ASCII, non-UTF8 bytes in the path. + # Test non-ASCII in the path + if sys.platform in ('win32', 'darwin'): + if support.TESTFN_NONASCII: + name = support.TESTFN_NONASCII + else: + # Mac OS X denies the creation of a directory with an invalid + # UTF-8 name. Windows allows to create a directory with an + # arbitrary bytes name, but fails to enter this directory + # (when the bytes name is used). + self.skipTest("need support.TESTFN_NONASCII") + else: + if support.TESTFN_UNDECODABLE: + name = support.TESTFN_UNDECODABLE + elif support.TESTFN_NONASCII: + name = support.TESTFN_NONASCII + else: + # On UNIX, the surrogateescape error handler is used to + # decode paths, so any byte is allowed, it does not depend + # on the locale + name = b'a\xffb\xe7w\xf0' + with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) with support.temp_cwd(name): diff --git a/Lib/test/test_hashlib.py b/Lib/test/test_hashlib.py index 32f85e9..54201a1 100644 --- a/Lib/test/test_hashlib.py +++ b/Lib/test/test_hashlib.py @@ -36,7 +36,10 @@ def hexstr(s): class HashLibTestCase(unittest.TestCase): supported_hash_names = ( 'md5', 'MD5', 'sha1', 'SHA1', 'sha224', 'SHA224', 'sha256', 'SHA256', - 'sha384', 'SHA384', 'sha512', 'SHA512' ) + 'sha384', 'SHA384', 'sha512', 'SHA512', + 'sha3_224', 'sha3_256', 'sha3_384', + 'sha3_512', 'SHA3_224', 'SHA3_256', + 'SHA3_384', 'SHA3_512' ) # Issue #14693: fallback modules are always compiled under POSIX _warn_on_extension_import = os.name == 'posix' or COMPILED_WITH_PYDEBUG @@ -93,6 +96,12 @@ class HashLibTestCase(unittest.TestCase): if _sha512: self.constructors_to_test['sha384'].add(_sha512.sha384) self.constructors_to_test['sha512'].add(_sha512.sha512) + _sha3 = self._conditional_import_module('_sha3') + if _sha3: + self.constructors_to_test['sha3_224'].add(_sha3.sha3_224) + self.constructors_to_test['sha3_256'].add(_sha3.sha3_256) + self.constructors_to_test['sha3_384'].add(_sha3.sha3_384) + self.constructors_to_test['sha3_512'].add(_sha3.sha3_512) super(HashLibTestCase, self).__init__(*args, **kwargs) @@ -158,6 +167,7 @@ class HashLibTestCase(unittest.TestCase): self.assertEqual(m1.digest(), m2.digest()) def check(self, name, data, digest): + digest = digest.lower() constructors = self.constructors_to_test[name] # 2 is for hashlib.name(...) and hashlib.new(name, ...) self.assertGreaterEqual(len(constructors), 2) @@ -183,6 +193,10 @@ class HashLibTestCase(unittest.TestCase): self.check_no_unicode('sha256') self.check_no_unicode('sha384') self.check_no_unicode('sha512') + self.check_no_unicode('sha3_224') + self.check_no_unicode('sha3_256') + self.check_no_unicode('sha3_384') + self.check_no_unicode('sha3_512') def test_case_md5_0(self): self.check('md5', b'', 'd41d8cd98f00b204e9800998ecf8427e') @@ -318,11 +332,122 @@ class HashLibTestCase(unittest.TestCase): "e718483d0ce769644e2e42c7bc15b4638e1f98b13b2044285632a803afa973eb"+ "de0ff244877ea60a4cb0432ce577c31beb009c5c2c49aa2e4eadb217ad8cc09b") + # SHA-3 family + def test_case_sha3_224_0(self): + self.check('sha3_224', b"", + "F71837502BA8E10837BDD8D365ADB85591895602FC552B48B7390ABD") + + def test_case_sha3_224_1(self): + self.check('sha3_224', bytes.fromhex("CC"), + "A9CAB59EB40A10B246290F2D6086E32E3689FAF1D26B470C899F2802") + + def test_case_sha3_224_2(self): + self.check('sha3_224', bytes.fromhex("41FB"), + "615BA367AFDC35AAC397BC7EB5D58D106A734B24986D5D978FEFD62C") + + def test_case_sha3_224_3(self): + self.check('sha3_224', bytes.fromhex( + "433C5303131624C0021D868A30825475E8D0BD3052A022180398F4CA4423B9"+ + "8214B6BEAAC21C8807A2C33F8C93BD42B092CC1B06CEDF3224D5ED1EC29784"+ + "444F22E08A55AA58542B524B02CD3D5D5F6907AFE71C5D7462224A3F9D9E53"+ + "E7E0846DCBB4CE"), + "62B10F1B6236EBC2DA72957742A8D4E48E213B5F8934604BFD4D2C3A") + + @bigmemtest(size=_4G + 5, memuse=1) + def test_case_sha3_224_huge(self, size): + if size == _4G + 5: + try: + self.check('sha3_224', b'A'*size, + '58ef60057c9dddb6a87477e9ace5a26f0d9db01881cf9b10a9f8c224') + except OverflowError: + pass # 32-bit arch + + + def test_case_sha3_256_0(self): + self.check('sha3_256', b"", + "C5D2460186F7233C927E7DB2DCC703C0E500B653CA82273B7BFAD8045D85A470") + + def test_case_sha3_256_1(self): + self.check('sha3_256', bytes.fromhex("CC"), + "EEAD6DBFC7340A56CAEDC044696A168870549A6A7F6F56961E84A54BD9970B8A") + + def test_case_sha3_256_2(self): + self.check('sha3_256', bytes.fromhex("41FB"), + "A8EACEDA4D47B3281A795AD9E1EA2122B407BAF9AABCB9E18B5717B7873537D2") + + def test_case_sha3_256_3(self): + self.check('sha3_256', bytes.fromhex( + "433C5303131624C0021D868A30825475E8D0BD3052A022180398F4CA4423B9"+ + "8214B6BEAAC21C8807A2C33F8C93BD42B092CC1B06CEDF3224D5ED1EC29784"+ + "444F22E08A55AA58542B524B02CD3D5D5F6907AFE71C5D7462224A3F9D9E53"+ + "E7E0846DCBB4CE"), + "CE87A5173BFFD92399221658F801D45C294D9006EE9F3F9D419C8D427748DC41") + + + def test_case_sha3_384_0(self): + self.check('sha3_384', b"", + "2C23146A63A29ACF99E73B88F8C24EAA7DC60AA771780CCC006AFBFA8FE2479B"+ + "2DD2B21362337441AC12B515911957FF") + + def test_case_sha3_384_1(self): + self.check('sha3_384', bytes.fromhex("CC"), + "1B84E62A46E5A201861754AF5DC95C4A1A69CAF4A796AE405680161E29572641"+ + "F5FA1E8641D7958336EE7B11C58F73E9") + + def test_case_sha3_384_2(self): + self.check('sha3_384', bytes.fromhex("41FB"), + "495CCE2714CD72C8C53C3363D22C58B55960FE26BE0BF3BBC7A3316DD563AD1D"+ + "B8410E75EEFEA655E39D4670EC0B1792") + + def test_case_sha3_384_3(self): + self.check('sha3_384', bytes.fromhex( + "433C5303131624C0021D868A30825475E8D0BD3052A022180398F4CA4423B9"+ + "8214B6BEAAC21C8807A2C33F8C93BD42B092CC1B06CEDF3224D5ED1EC29784"+ + "444F22E08A55AA58542B524B02CD3D5D5F6907AFE71C5D7462224A3F9D9E53"+ + "E7E0846DCBB4CE"), + "135114508DD63E279E709C26F7817C0482766CDE49132E3EDF2EEDD8996F4E35"+ + "96D184100B384868249F1D8B8FDAA2C9") + + + def test_case_sha3_512_0(self): + self.check('sha3_512', b"", + "0EAB42DE4C3CEB9235FC91ACFFE746B29C29A8C366B7C60E4E67C466F36A4304"+ + "C00FA9CAF9D87976BA469BCBE06713B435F091EF2769FB160CDAB33D3670680E") + + def test_case_sha3_512_1(self): + self.check('sha3_512', bytes.fromhex("CC"), + "8630C13CBD066EA74BBE7FE468FEC1DEE10EDC1254FB4C1B7C5FD69B646E4416"+ + "0B8CE01D05A0908CA790DFB080F4B513BC3B6225ECE7A810371441A5AC666EB9") + + def test_case_sha3_512_2(self): + self.check('sha3_512', bytes.fromhex("41FB"), + "551DA6236F8B96FCE9F97F1190E901324F0B45E06DBBB5CDB8355D6ED1DC34B3"+ + "F0EAE7DCB68622FF232FA3CECE0D4616CDEB3931F93803662A28DF1CD535B731") + + def test_case_sha3_512_3(self): + self.check('sha3_512', bytes.fromhex( + "433C5303131624C0021D868A30825475E8D0BD3052A022180398F4CA4423B9"+ + "8214B6BEAAC21C8807A2C33F8C93BD42B092CC1B06CEDF3224D5ED1EC29784"+ + "444F22E08A55AA58542B524B02CD3D5D5F6907AFE71C5D7462224A3F9D9E53"+ + "E7E0846DCBB4CE"), + "527D28E341E6B14F4684ADB4B824C496C6482E51149565D3D17226828884306B"+ + "51D6148A72622C2B75F5D3510B799D8BDC03EAEDE453676A6EC8FE03A1AD0EAB") + + def test_gil(self): # Check things work fine with an input larger than the size required # for multithreaded operation (which is hardwired to 2048). gil_minsize = 2048 + for name in self.supported_hash_names: + m = hashlib.new(name) + m.update(b'1') + m.update(b'#' * gil_minsize) + m.update(b'1') + + m = hashlib.new(name, b'x' * gil_minsize) + m.update(b'1') + m = hashlib.md5() m.update(b'1') m.update(b'#' * gil_minsize) diff --git a/Lib/test/test_httpservers.py b/Lib/test/test_httpservers.py index 75133c9..c5db620 100644 --- a/Lib/test/test_httpservers.py +++ b/Lib/test/test_httpservers.py @@ -92,6 +92,9 @@ class BaseHTTPServerTestCase(BaseTestCase): def do_KEYERROR(self): self.send_error(999) + def do_NOTFOUND(self): + self.send_error(404) + def do_CUSTOM(self): self.send_response(999) self.send_header('Content-Type', 'text/html') @@ -211,6 +214,14 @@ class BaseHTTPServerTestCase(BaseTestCase): self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind') self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8')) + def test_error_content_length(self): + # Issue #16088: standard error responses should have a content-length + self.con.request('NOTFOUND', '/') + res = self.con.getresponse() + self.assertEqual(res.status, 404) + data = res.read() + self.assertEqual(int(res.getheader('Content-Length')), len(data)) + class SimpleHTTPServerTestCase(BaseTestCase): class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler): diff --git a/Lib/test/test_importlib/import_/test_fromlist.py b/Lib/test/test_importlib/import_/test_fromlist.py index 27b2270..c16c337 100644 --- a/Lib/test/test_importlib/import_/test_fromlist.py +++ b/Lib/test/test_importlib/import_/test_fromlist.py @@ -80,7 +80,7 @@ class HandlingFromlist(unittest.TestCase): with util.import_state(meta_path=[importer]): with self.assertRaises(ImportError) as exc: import_util.import_('pkg', fromlist=['mod']) - self.assertEquals('i_do_not_exist', exc.exception.name) + self.assertEqual('i_do_not_exist', exc.exception.name) def test_empty_string(self): with util.mock_modules('pkg.__init__', 'pkg.mod') as importer: diff --git a/Lib/test/test_importlib/source/test_abc_loader.py b/Lib/test/test_importlib/source/test_abc_loader.py index 0d912b6..78a8faa 100644 --- a/Lib/test/test_importlib/source/test_abc_loader.py +++ b/Lib/test/test_importlib/source/test_abc_loader.py @@ -70,483 +70,9 @@ class SourceLoaderMock(SourceOnlyLoaderMock): return path == self.bytecode_path -class PyLoaderMock(abc.PyLoader): - - # Globals that should be defined for all modules. - source = (b"_ = '::'.join([__name__, __file__, __package__, " - b"repr(__loader__)])") - - def __init__(self, data): - """Take a dict of 'module_name: path' pairings. - - Paths should have no file extension, allowing packages to be denoted by - ending in '__init__'. - - """ - self.module_paths = data - self.path_to_module = {val:key for key,val in data.items()} - - def get_data(self, path): - if path not in self.path_to_module: - raise IOError - return self.source - - def is_package(self, name): - filename = os.path.basename(self.get_filename(name)) - return os.path.splitext(filename)[0] == '__init__' - - def source_path(self, name): - try: - return self.module_paths[name] - except KeyError: - raise ImportError - - def get_filename(self, name): - """Silence deprecation warning.""" - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - path = super().get_filename(name) - assert len(w) == 1 - assert issubclass(w[0].category, DeprecationWarning) - return path - - def module_repr(self): - return '<module>' - - -class PyLoaderCompatMock(PyLoaderMock): - - """Mock that matches what is suggested to have a loader that is compatible - from Python 3.1 onwards.""" - - def get_filename(self, fullname): - try: - return self.module_paths[fullname] - except KeyError: - raise ImportError - - def source_path(self, fullname): - try: - return self.get_filename(fullname) - except ImportError: - return None - - -class PyPycLoaderMock(abc.PyPycLoader, PyLoaderMock): - - default_mtime = 1 - - def __init__(self, source, bc={}): - """Initialize mock. - - 'bc' is a dict keyed on a module's name. The value is dict with - possible keys of 'path', 'mtime', 'magic', and 'bc'. Except for 'path', - each of those keys control if any part of created bytecode is to - deviate from default values. - - """ - super().__init__(source) - self.module_bytecode = {} - self.path_to_bytecode = {} - self.bytecode_to_path = {} - for name, data in bc.items(): - self.path_to_bytecode[data['path']] = name - self.bytecode_to_path[name] = data['path'] - magic = data.get('magic', imp.get_magic()) - mtime = importlib._w_long(data.get('mtime', self.default_mtime)) - source_size = importlib._w_long(len(self.source) & 0xFFFFFFFF) - if 'bc' in data: - bc = data['bc'] - else: - bc = self.compile_bc(name) - self.module_bytecode[name] = magic + mtime + source_size + bc - - def compile_bc(self, name): - source_path = self.module_paths.get(name, '<test>') or '<test>' - code = compile(self.source, source_path, 'exec') - return marshal.dumps(code) - - def source_mtime(self, name): - if name in self.module_paths: - return self.default_mtime - elif name in self.module_bytecode: - return None - else: - raise ImportError - - def bytecode_path(self, name): - try: - return self.bytecode_to_path[name] - except KeyError: - if name in self.module_paths: - return None - else: - raise ImportError - - def write_bytecode(self, name, bytecode): - self.module_bytecode[name] = bytecode - return True - - def get_data(self, path): - if path in self.path_to_module: - return super().get_data(path) - elif path in self.path_to_bytecode: - name = self.path_to_bytecode[path] - return self.module_bytecode[name] - else: - raise IOError - - def is_package(self, name): - try: - return super().is_package(name) - except TypeError: - return '__init__' in self.bytecode_to_path[name] - - def get_code(self, name): - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - code_object = super().get_code(name) - assert len(w) == 1 - assert issubclass(w[0].category, DeprecationWarning) - return code_object - -class PyLoaderTests(testing_abc.LoaderTests): - - """Tests for importlib.abc.PyLoader.""" - - mocker = PyLoaderMock - - def eq_attrs(self, ob, **kwargs): - for attr, val in kwargs.items(): - found = getattr(ob, attr) - self.assertEqual(found, val, - "{} attribute: {} != {}".format(attr, found, val)) - - def test_module(self): - name = '<module>' - path = os.path.join('', 'path', 'to', 'module') - mock = self.mocker({name: path}) - with util.uncache(name): - module = mock.load_module(name) - self.assertIn(name, sys.modules) - self.eq_attrs(module, __name__=name, __file__=path, __package__='', - __loader__=mock) - self.assertTrue(not hasattr(module, '__path__')) - return mock, name - - def test_package(self): - name = '<pkg>' - path = os.path.join('path', 'to', name, '__init__') - mock = self.mocker({name: path}) - with util.uncache(name): - module = mock.load_module(name) - self.assertIn(name, sys.modules) - self.eq_attrs(module, __name__=name, __file__=path, - __path__=[os.path.dirname(path)], __package__=name, - __loader__=mock) - return mock, name - - def test_lacking_parent(self): - name = 'pkg.mod' - path = os.path.join('path', 'to', 'pkg', 'mod') - mock = self.mocker({name: path}) - with util.uncache(name): - module = mock.load_module(name) - self.assertIn(name, sys.modules) - self.eq_attrs(module, __name__=name, __file__=path, __package__='pkg', - __loader__=mock) - self.assertFalse(hasattr(module, '__path__')) - return mock, name - - def test_module_reuse(self): - name = 'mod' - path = os.path.join('path', 'to', 'mod') - module = imp.new_module(name) - mock = self.mocker({name: path}) - with util.uncache(name): - sys.modules[name] = module - loaded_module = mock.load_module(name) - self.assertIs(loaded_module, module) - self.assertIs(sys.modules[name], module) - return mock, name - - def test_state_after_failure(self): - name = "mod" - module = imp.new_module(name) - module.blah = None - mock = self.mocker({name: os.path.join('path', 'to', 'mod')}) - mock.source = b"1/0" - with util.uncache(name): - sys.modules[name] = module - with self.assertRaises(ZeroDivisionError): - mock.load_module(name) - self.assertIs(sys.modules[name], module) - self.assertTrue(hasattr(module, 'blah')) - return mock - - def test_unloadable(self): - name = "mod" - mock = self.mocker({name: os.path.join('path', 'to', 'mod')}) - mock.source = b"1/0" - with util.uncache(name): - with self.assertRaises(ZeroDivisionError): - mock.load_module(name) - self.assertNotIn(name, sys.modules) - return mock - - -class PyLoaderCompatTests(PyLoaderTests): - - """Test that the suggested code to make a loader that is compatible from - Python 3.1 forward works.""" - - mocker = PyLoaderCompatMock - - -class PyLoaderInterfaceTests(unittest.TestCase): - - """Tests for importlib.abc.PyLoader to make sure that when source_path() - doesn't return a path everything works as expected.""" - - def test_no_source_path(self): - # No source path should lead to ImportError. - name = 'mod' - mock = PyLoaderMock({}) - with util.uncache(name), self.assertRaises(ImportError): - mock.load_module(name) - - def test_source_path_is_None(self): - name = 'mod' - mock = PyLoaderMock({name: None}) - with util.uncache(name), self.assertRaises(ImportError): - mock.load_module(name) - - def test_get_filename_with_source_path(self): - # get_filename() should return what source_path() returns. - name = 'mod' - path = os.path.join('path', 'to', 'source') - mock = PyLoaderMock({name: path}) - with util.uncache(name): - self.assertEqual(mock.get_filename(name), path) - - def test_get_filename_no_source_path(self): - # get_filename() should raise ImportError if source_path returns None. - name = 'mod' - mock = PyLoaderMock({name: None}) - with util.uncache(name), self.assertRaises(ImportError): - mock.get_filename(name) - - -class PyPycLoaderTests(PyLoaderTests): - - """Tests for importlib.abc.PyPycLoader.""" - - mocker = PyPycLoaderMock - - @source_util.writes_bytecode_files - def verify_bytecode(self, mock, name): - assert name in mock.module_paths - self.assertIn(name, mock.module_bytecode) - magic = mock.module_bytecode[name][:4] - self.assertEqual(magic, imp.get_magic()) - mtime = importlib._r_long(mock.module_bytecode[name][4:8]) - self.assertEqual(mtime, 1) - source_size = mock.module_bytecode[name][8:12] - self.assertEqual(len(mock.source) & 0xFFFFFFFF, - importlib._r_long(source_size)) - bc = mock.module_bytecode[name][12:] - self.assertEqual(bc, mock.compile_bc(name)) - - def test_module(self): - mock, name = super().test_module() - self.verify_bytecode(mock, name) - - def test_package(self): - mock, name = super().test_package() - self.verify_bytecode(mock, name) - - def test_lacking_parent(self): - mock, name = super().test_lacking_parent() - self.verify_bytecode(mock, name) - - def test_module_reuse(self): - mock, name = super().test_module_reuse() - self.verify_bytecode(mock, name) - - def test_state_after_failure(self): - super().test_state_after_failure() - - def test_unloadable(self): - super().test_unloadable() - - -class PyPycLoaderInterfaceTests(unittest.TestCase): - - """Test for the interface of importlib.abc.PyPycLoader.""" - - def get_filename_check(self, src_path, bc_path, expect): - name = 'mod' - mock = PyPycLoaderMock({name: src_path}, {name: {'path': bc_path}}) - with util.uncache(name): - assert mock.source_path(name) == src_path - assert mock.bytecode_path(name) == bc_path - self.assertEqual(mock.get_filename(name), expect) - - def test_filename_with_source_bc(self): - # When source and bytecode paths present, return the source path. - self.get_filename_check('source_path', 'bc_path', 'source_path') - - def test_filename_with_source_no_bc(self): - # With source but no bc, return source path. - self.get_filename_check('source_path', None, 'source_path') - - def test_filename_with_no_source_bc(self): - # With not source but bc, return the bc path. - self.get_filename_check(None, 'bc_path', 'bc_path') - - def test_filename_with_no_source_or_bc(self): - # With no source or bc, raise ImportError. - name = 'mod' - mock = PyPycLoaderMock({name: None}, {name: {'path': None}}) - with util.uncache(name), self.assertRaises(ImportError): - mock.get_filename(name) - - -class SkipWritingBytecodeTests(unittest.TestCase): - - """Test that bytecode is properly handled based on - sys.dont_write_bytecode.""" - - @source_util.writes_bytecode_files - def run_test(self, dont_write_bytecode): - name = 'mod' - mock = PyPycLoaderMock({name: os.path.join('path', 'to', 'mod')}) - sys.dont_write_bytecode = dont_write_bytecode - with util.uncache(name): - mock.load_module(name) - self.assertIsNot(name in mock.module_bytecode, dont_write_bytecode) - - def test_no_bytecode_written(self): - self.run_test(True) - - def test_bytecode_written(self): - self.run_test(False) - - -class RegeneratedBytecodeTests(unittest.TestCase): - - """Test that bytecode is regenerated as expected.""" - - @source_util.writes_bytecode_files - def test_different_magic(self): - # A different magic number should lead to new bytecode. - name = 'mod' - bad_magic = b'\x00\x00\x00\x00' - assert bad_magic != imp.get_magic() - mock = PyPycLoaderMock({name: os.path.join('path', 'to', 'mod')}, - {name: {'path': os.path.join('path', 'to', - 'mod.bytecode'), - 'magic': bad_magic}}) - with util.uncache(name): - mock.load_module(name) - self.assertIn(name, mock.module_bytecode) - magic = mock.module_bytecode[name][:4] - self.assertEqual(magic, imp.get_magic()) - - @source_util.writes_bytecode_files - def test_old_mtime(self): - # Bytecode with an older mtime should be regenerated. - name = 'mod' - old_mtime = PyPycLoaderMock.default_mtime - 1 - mock = PyPycLoaderMock({name: os.path.join('path', 'to', 'mod')}, - {name: {'path': 'path/to/mod.bytecode', 'mtime': old_mtime}}) - with util.uncache(name): - mock.load_module(name) - self.assertIn(name, mock.module_bytecode) - mtime = importlib._r_long(mock.module_bytecode[name][4:8]) - self.assertEqual(mtime, PyPycLoaderMock.default_mtime) - - -class BadBytecodeFailureTests(unittest.TestCase): - - """Test import failures when there is no source and parts of the bytecode - is bad.""" - - def test_bad_magic(self): - # A bad magic number should lead to an ImportError. - name = 'mod' - bad_magic = b'\x00\x00\x00\x00' - bc = {name: - {'path': os.path.join('path', 'to', 'mod'), - 'magic': bad_magic}} - mock = PyPycLoaderMock({name: None}, bc) - with util.uncache(name), self.assertRaises(ImportError) as cm: - mock.load_module(name) - self.assertEqual(cm.exception.name, name) - - def test_no_bytecode(self): - # Missing code object bytecode should lead to an EOFError. - name = 'mod' - bc = {name: {'path': os.path.join('path', 'to', 'mod'), 'bc': b''}} - mock = PyPycLoaderMock({name: None}, bc) - with util.uncache(name), self.assertRaises(EOFError): - mock.load_module(name) - - def test_bad_bytecode(self): - # Malformed code object bytecode should lead to a ValueError. - name = 'mod' - bc = {name: {'path': os.path.join('path', 'to', 'mod'), 'bc': b'1234'}} - mock = PyPycLoaderMock({name: None}, bc) - with util.uncache(name), self.assertRaises(ValueError): - mock.load_module(name) - - def raise_ImportError(*args, **kwargs): raise ImportError -class MissingPathsTests(unittest.TestCase): - - """Test what happens when a source or bytecode path does not exist (either - from *_path returning None or raising ImportError).""" - - def test_source_path_None(self): - # Bytecode should be used when source_path returns None, along with - # __file__ being set to the bytecode path. - name = 'mod' - bytecode_path = 'path/to/mod' - mock = PyPycLoaderMock({name: None}, {name: {'path': bytecode_path}}) - with util.uncache(name): - module = mock.load_module(name) - self.assertEqual(module.__file__, bytecode_path) - - # Testing for bytecode_path returning None handled by all tests where no - # bytecode initially exists. - - def test_all_paths_None(self): - # If all *_path methods return None, raise ImportError. - name = 'mod' - mock = PyPycLoaderMock({name: None}) - with util.uncache(name), self.assertRaises(ImportError) as cm: - mock.load_module(name) - self.assertEqual(cm.exception.name, name) - - def test_source_path_ImportError(self): - # An ImportError from source_path should trigger an ImportError. - name = 'mod' - mock = PyPycLoaderMock({}, {name: {'path': os.path.join('path', 'to', - 'mod')}}) - with util.uncache(name), self.assertRaises(ImportError): - mock.load_module(name) - - def test_bytecode_path_ImportError(self): - # An ImportError from bytecode_path should trigger an ImportError. - name = 'mod' - mock = PyPycLoaderMock({name: os.path.join('path', 'to', 'mod')}) - bad_meth = types.MethodType(raise_ImportError, mock) - mock.bytecode_path = bad_meth - with util.uncache(name), self.assertRaises(ImportError) as cm: - mock.load_module(name) - class SourceLoaderTestHarness(unittest.TestCase): @@ -801,6 +327,7 @@ class AbstractMethodImplTests(unittest.TestCase): class Loader(abc.Loader): def load_module(self, fullname): super().load_module(fullname) + def module_repr(self, module): super().module_repr(module) @@ -825,20 +352,6 @@ class AbstractMethodImplTests(unittest.TestCase): class SourceLoader(ResourceLoader, ExecutionLoader, abc.SourceLoader): pass - class PyLoader(ResourceLoader, InspectLoader, abc.PyLoader): - def source_path(self, _): - super().source_path(_) - - class PyPycLoader(PyLoader, abc.PyPycLoader): - def bytecode_path(self, _): - super().bytecode_path(_) - - def source_mtime(self, _): - super().source_mtime(_) - - def write_bytecode(self, _, _2): - super().write_bytecode(_, _2) - def raises_NotImplementedError(self, ins, *args): for method_name in args: method = getattr(ins, method_name) @@ -877,24 +390,12 @@ class AbstractMethodImplTests(unittest.TestCase): # Required abstractmethods. self.raises_NotImplementedError(ins, 'get_filename', 'get_data') # Optional abstractmethods. - self.raises_NotImplementedError(ins,'path_stats', 'set_data') - - def test_PyLoader(self): - self.raises_NotImplementedError(self.PyLoader(), 'source_path', - 'get_data', 'is_package') - - def test_PyPycLoader(self): - self.raises_NotImplementedError(self.PyPycLoader(), 'source_path', - 'source_mtime', 'bytecode_path', - 'write_bytecode') + self.raises_NotImplementedError(ins, 'path_stats', 'set_data') def test_main(): from test.support import run_unittest - run_unittest(PyLoaderTests, PyLoaderCompatTests, - PyLoaderInterfaceTests, - PyPycLoaderTests, PyPycLoaderInterfaceTests, - SkipWritingBytecodeTests, RegeneratedBytecodeTests, + run_unittest(SkipWritingBytecodeTests, RegeneratedBytecodeTests, BadBytecodeFailureTests, MissingPathsTests, SourceOnlyLoaderTests, SourceLoaderBytecodeTests, diff --git a/Lib/test/test_importlib/test_abc.py b/Lib/test/test_importlib/test_abc.py index c620c37..a8d8c2e 100644 --- a/Lib/test/test_importlib/test_abc.py +++ b/Lib/test/test_importlib/test_abc.py @@ -43,11 +43,6 @@ class PathEntryFinder(InheritanceTests, unittest.TestCase): subclasses = [machinery.FileFinder] -class Loader(InheritanceTests, unittest.TestCase): - - subclasses = [abc.PyLoader] - - class ResourceLoader(InheritanceTests, unittest.TestCase): superclasses = [abc.Loader] @@ -56,14 +51,13 @@ class ResourceLoader(InheritanceTests, unittest.TestCase): class InspectLoader(InheritanceTests, unittest.TestCase): superclasses = [abc.Loader] - subclasses = [abc.PyLoader, machinery.BuiltinImporter, + subclasses = [machinery.BuiltinImporter, machinery.FrozenImporter, machinery.ExtensionFileLoader] class ExecutionLoader(InheritanceTests, unittest.TestCase): superclasses = [abc.InspectLoader] - subclasses = [abc.PyLoader] class FileLoader(InheritanceTests, unittest.TestCase): @@ -78,16 +72,6 @@ class SourceLoader(InheritanceTests, unittest.TestCase): subclasses = [machinery.SourceFileLoader] -class PyLoader(InheritanceTests, unittest.TestCase): - - superclasses = [abc.Loader, abc.ResourceLoader, abc.ExecutionLoader] - - -class PyPycLoader(InheritanceTests, unittest.TestCase): - - superclasses = [abc.PyLoader] - - def test_main(): from test.support import run_unittest classes = [] diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 555ad74..a412469 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -815,6 +815,14 @@ class SizeofTest: bufio = self.tp(rawio, buffer_size=bufsize2) self.assertEqual(sys.getsizeof(bufio), size + bufsize2) + @support.cpython_only + def test_buffer_freeing(self) : + bufsize = 4096 + rawio = self.MockRawIO() + bufio = self.tp(rawio, buffer_size=bufsize) + size = sys.getsizeof(bufio) - bufsize + bufio.close() + self.assertEqual(sys.getsizeof(bufio), size) class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): read_mode = "rb" diff --git a/Lib/test/test_iterlen.py b/Lib/test/test_iterlen.py index 7469a31..57f7101 100644 --- a/Lib/test/test_iterlen.py +++ b/Lib/test/test_iterlen.py @@ -45,31 +45,21 @@ import unittest from test import support from itertools import repeat from collections import deque -from builtins import len as _len +from operator import length_hint n = 10 -def len(obj): - try: - return _len(obj) - except TypeError: - try: - # note: this is an internal undocumented API, - # don't rely on it in your own programs - return obj.__length_hint__() - except AttributeError: - raise TypeError class TestInvariantWithoutMutations(unittest.TestCase): def test_invariant(self): it = self.it for i in reversed(range(1, n+1)): - self.assertEqual(len(it), i) + self.assertEqual(length_hint(it), i) next(it) - self.assertEqual(len(it), 0) + self.assertEqual(length_hint(it), 0) self.assertRaises(StopIteration, next, it) - self.assertEqual(len(it), 0) + self.assertEqual(length_hint(it), 0) class TestTemporarilyImmutable(TestInvariantWithoutMutations): @@ -78,12 +68,12 @@ class TestTemporarilyImmutable(TestInvariantWithoutMutations): # length immutability during iteration it = self.it - self.assertEqual(len(it), n) + self.assertEqual(length_hint(it), n) next(it) - self.assertEqual(len(it), n-1) + self.assertEqual(length_hint(it), n-1) self.mutate() self.assertRaises(RuntimeError, next, it) - self.assertEqual(len(it), 0) + self.assertEqual(length_hint(it), 0) ## ------- Concrete Type Tests ------- @@ -92,10 +82,6 @@ class TestRepeat(TestInvariantWithoutMutations): def setUp(self): self.it = repeat(None, n) - def test_no_len_for_infinite_repeat(self): - # The repeat() object can also be infinite - self.assertRaises(TypeError, len, repeat(None)) - class TestXrange(TestInvariantWithoutMutations): def setUp(self): @@ -167,14 +153,15 @@ class TestList(TestInvariantWithoutMutations): it = iter(d) next(it) next(it) - self.assertEqual(len(it), n-2) + self.assertEqual(length_hint(it), n - 2) d.append(n) - self.assertEqual(len(it), n-1) # grow with append + self.assertEqual(length_hint(it), n - 1) # grow with append d[1:] = [] - self.assertEqual(len(it), 0) + self.assertEqual(length_hint(it), 0) self.assertEqual(list(it), []) d.extend(range(20)) - self.assertEqual(len(it), 0) + self.assertEqual(length_hint(it), 0) + class TestListReversed(TestInvariantWithoutMutations): @@ -186,32 +173,41 @@ class TestListReversed(TestInvariantWithoutMutations): it = reversed(d) next(it) next(it) - self.assertEqual(len(it), n-2) + self.assertEqual(length_hint(it), n - 2) d.append(n) - self.assertEqual(len(it), n-2) # ignore append + self.assertEqual(length_hint(it), n - 2) # ignore append d[1:] = [] - self.assertEqual(len(it), 0) + self.assertEqual(length_hint(it), 0) self.assertEqual(list(it), []) # confirm invariant d.extend(range(20)) - self.assertEqual(len(it), 0) + self.assertEqual(length_hint(it), 0) ## -- Check to make sure exceptions are not suppressed by __length_hint__() class BadLen(object): - def __iter__(self): return iter(range(10)) + def __iter__(self): + return iter(range(10)) + def __len__(self): raise RuntimeError('hello') + class BadLengthHint(object): - def __iter__(self): return iter(range(10)) + def __iter__(self): + return iter(range(10)) + def __length_hint__(self): raise RuntimeError('hello') + class NoneLengthHint(object): - def __iter__(self): return iter(range(10)) + def __iter__(self): + return iter(range(10)) + def __length_hint__(self): - return None + return NotImplemented + class TestLengthHintExceptions(unittest.TestCase): diff --git a/Lib/test/test_itertools.py b/Lib/test/test_itertools.py index 90e85a9..ece7f99 100644 --- a/Lib/test/test_itertools.py +++ b/Lib/test/test_itertools.py @@ -1723,9 +1723,8 @@ class TestVariousIteratorArgs(unittest.TestCase): class LengthTransparency(unittest.TestCase): def test_repeat(self): - from test.test_iterlen import len - self.assertEqual(len(repeat(None, 50)), 50) - self.assertRaises(TypeError, len, repeat(None)) + self.assertEqual(operator.length_hint(repeat(None, 50)), 50) + self.assertEqual(operator.length_hint(repeat(None), 12), 12) class RegressionTests(unittest.TestCase): diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index cb908fb..9251453 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -26,6 +26,7 @@ import logging.handlers import logging.config import codecs +import configparser import datetime import pickle import io @@ -81,7 +82,7 @@ class BaseTest(unittest.TestCase): """Base class for logging tests.""" log_format = "%(name)s -> %(levelname)s: %(message)s" - expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$" + expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$" message_num = 0 def setUp(self): @@ -150,14 +151,17 @@ class BaseTest(unittest.TestCase): finally: logging._releaseLock() - def assert_log_lines(self, expected_values, stream=None): + def assert_log_lines(self, expected_values, stream=None, pat=None): """Match the collected log lines against the regular expression self.expected_log_pat, and compare the extracted group values to the expected_values list of tuples.""" stream = stream or self.stream - pat = re.compile(self.expected_log_pat) + pat = re.compile(pat or self.expected_log_pat) try: - stream.reset() + if hasattr(stream, 'reset'): + stream.reset() + elif hasattr(stream, 'seek'): + stream.seek(0) actual_lines = stream.readlines() except AttributeError: # StringIO.StringIO lacks a reset() method. @@ -435,7 +439,7 @@ class CustomLevelsAndFiltersTest(BaseTest): """Test various filtering possibilities with custom logging levels.""" # Skip the logger name group. - expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" + expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" def setUp(self): BaseTest.setUp(self) @@ -996,7 +1000,7 @@ class MemoryHandlerTest(BaseTest): """Tests for the MemoryHandler.""" # Do not bother with a logger name group. - expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" + expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" def setUp(self): BaseTest.setUp(self) @@ -1049,7 +1053,7 @@ class ConfigFileTest(BaseTest): """Reading logging config from a .ini-style config file.""" - expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" + expected_log_pat = r"^(\w+) \+\+ (\w+)$" # config0 is a standard configuration. config0 = """ @@ -1276,6 +1280,24 @@ class ConfigFileTest(BaseTest): # Original logger output is empty. self.assert_log_lines([]) + def test_config0_using_cp_ok(self): + # A simple config file which overrides the default settings. + with captured_stdout() as output: + file = io.StringIO(textwrap.dedent(self.config0)) + cp = configparser.ConfigParser() + cp.read_file(file) + logging.config.fileConfig(cp) + logger = logging.getLogger() + # Won't output anything + logger.info(self.next_message()) + # Outputs a message + logger.error(self.next_message()) + self.assert_log_lines([ + ('ERROR', '2'), + ], stream=output) + # Original logger output is empty. + self.assert_log_lines([]) + def test_config1_ok(self, config=config1): # A config file defining a sub-parser as well. with captured_stdout() as output: @@ -1419,16 +1441,19 @@ class SocketHandlerTest(BaseTest): self.assertEqual(self.log_output, "spam\neggs\n") def test_noserver(self): + # Avoid timing-related failures due to SocketHandler's own hard-wired + # one-second timeout on socket.create_connection() (issue #16264). + self.sock_hdlr.retryStart = 2.5 # Kill the server self.server.stop(2.0) - #The logging call should try to connect, which should fail + # The logging call should try to connect, which should fail try: raise RuntimeError('Deliberate mistake') except RuntimeError: self.root_logger.exception('Never sent') self.root_logger.error('Never sent, either') now = time.time() - self.assertTrue(self.sock_hdlr.retryTime > now) + self.assertGreater(self.sock_hdlr.retryTime, now) time.sleep(self.sock_hdlr.retryTime - now + 0.001) self.root_logger.error('Nor this') @@ -1754,7 +1779,7 @@ class WarningsTest(BaseTest): logger.removeHandler(h) s = stream.getvalue() h.close() - self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0) + self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0) #See if an explicit file uses the original implementation a_file = io.StringIO() @@ -1792,7 +1817,7 @@ class ConfigDictTest(BaseTest): """Reading logging config from a dictionary.""" - expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" + expected_log_pat = r"^(\w+) \+\+ (\w+)$" # config0 is a standard configuration. config0 = { @@ -2601,10 +2626,10 @@ class ConfigDictTest(BaseTest): self.assertRaises(Exception, self.apply_config, self.config13) @unittest.skipUnless(threading, 'listen() needs threading to work') - def setup_via_listener(self, text): + def setup_via_listener(self, text, verify=None): text = text.encode("utf-8") # Ask for a randomly assigned port (by using port 0) - t = logging.config.listen(0) + t = logging.config.listen(0, verify) t.start() t.ready.wait() # Now get the port allocated @@ -2664,6 +2689,69 @@ class ConfigDictTest(BaseTest): # Original logger output is empty. self.assert_log_lines([]) + @unittest.skipUnless(threading, 'Threading required for this test.') + def test_listen_verify(self): + + def verify_fail(stuff): + return None + + def verify_reverse(stuff): + return stuff[::-1] + + logger = logging.getLogger("compiler.parser") + to_send = textwrap.dedent(ConfigFileTest.config1) + # First, specify a verification function that will fail. + # We expect to see no output, since our configuration + # never took effect. + with captured_stdout() as output: + self.setup_via_listener(to_send, verify_fail) + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([], stream=output) + # Original logger output has the stuff we logged. + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ], pat=r"^[\w.]+ -> (\w+): (\d+)$") + + # Now, perform no verification. Our configuration + # should take effect. + + with captured_stdout() as output: + self.setup_via_listener(to_send) # no verify callable specified + logger = logging.getLogger("compiler.parser") + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([ + ('INFO', '3'), + ('ERROR', '4'), + ], stream=output) + # Original logger output still has the stuff we logged before. + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ], pat=r"^[\w.]+ -> (\w+): (\d+)$") + + # Now, perform verification which transforms the bytes. + + with captured_stdout() as output: + self.setup_via_listener(to_send[::-1], verify_reverse) + logger = logging.getLogger("compiler.parser") + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([ + ('INFO', '5'), + ('ERROR', '6'), + ], stream=output) + # Original logger output still has the stuff we logged before. + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ], pat=r"^[\w.]+ -> (\w+): (\d+)$") + def test_baseconfig(self): d = { 'atuple': (1, 2, 3), @@ -2716,14 +2804,14 @@ class ChildLoggerTest(BaseTest): l2 = logging.getLogger('def.ghi') c1 = r.getChild('xyz') c2 = r.getChild('uvw.xyz') - self.assertTrue(c1 is logging.getLogger('xyz')) - self.assertTrue(c2 is logging.getLogger('uvw.xyz')) + self.assertIs(c1, logging.getLogger('xyz')) + self.assertIs(c2, logging.getLogger('uvw.xyz')) c1 = l1.getChild('def') c2 = c1.getChild('ghi') c3 = l1.getChild('def.ghi') - self.assertTrue(c1 is logging.getLogger('abc.def')) - self.assertTrue(c2 is logging.getLogger('abc.def.ghi')) - self.assertTrue(c2 is c3) + self.assertIs(c1, logging.getLogger('abc.def')) + self.assertIs(c2, logging.getLogger('abc.def.ghi')) + self.assertIs(c2, c3) class DerivedLogRecord(logging.LogRecord): @@ -2766,7 +2854,7 @@ class LogRecordFactoryTest(BaseTest): class QueueHandlerTest(BaseTest): # Do not bother with a logger name group. - expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" + expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" def setUp(self): BaseTest.setUp(self) @@ -3804,7 +3892,7 @@ class NTEventLogHandlerTest(BaseTest): h.handle(r) h.close() # Now see if the event is recorded - self.assertTrue(num_recs < win32evtlog.GetNumberOfEventLogRecords(elh)) + self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh)) flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \ win32evtlog.EVENTLOG_SEQUENTIAL_READ found = False diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py index 6b1e933..d1cc92e 100644 --- a/Lib/test/test_mailbox.py +++ b/Lib/test/test_mailbox.py @@ -596,7 +596,7 @@ class TestMaildir(TestMailbox, unittest.TestCase): def setUp(self): TestMailbox.setUp(self) - if os.name in ('nt', 'os2') or sys.platform == 'cygwin': + if (os.name == 'nt') or (sys.platform == 'cygwin'): self._box.colon = '!' def assertMailboxEmpty(self): diff --git a/Lib/test/test_mimetypes.py b/Lib/test/test_mimetypes.py index 91da289..593fdb0 100644 --- a/Lib/test/test_mimetypes.py +++ b/Lib/test/test_mimetypes.py @@ -22,6 +22,8 @@ class MimeTypesTestCase(unittest.TestCase): eq(self.db.guess_type("foo.tgz"), ("application/x-tar", "gzip")) eq(self.db.guess_type("foo.tar.gz"), ("application/x-tar", "gzip")) eq(self.db.guess_type("foo.tar.Z"), ("application/x-tar", "compress")) + eq(self.db.guess_type("foo.tar.bz2"), ("application/x-tar", "bzip2")) + eq(self.db.guess_type("foo.tar.xz"), ("application/x-tar", "xz")) def test_data_urls(self): eq = self.assertEqual diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index b2a964c..9c7a202 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -19,6 +19,7 @@ import socket import random import logging import struct +import operator import test.support import test.script_helper @@ -1617,6 +1618,18 @@ def mul(x, y): class _TestPool(BaseTestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.pool = cls.Pool(4) + + @classmethod + def tearDownClass(cls): + cls.pool.terminate() + cls.pool.join() + cls.pool = None + super().tearDownClass() + def test_apply(self): papply = self.pool.apply self.assertEqual(papply(sqr, (5,)), sqr(5)) @@ -1708,15 +1721,6 @@ class _TestPool(BaseTestCase): p.join() def test_terminate(self): - if self.TYPE == 'manager': - # On Unix a forked process increfs each shared object to - # which its parent process held a reference. If the - # forked process gets terminated then there is likely to - # be a reference leak. So to prevent - # _TestZZZNumberOfObjects from failing we skip this test - # when using a manager. - return - result = self.pool.map_async( time.sleep, [0.1 for i in range(10000)], chunksize=1 ) @@ -1838,35 +1842,6 @@ class _TestPoolWorkerLifetime(BaseTestCase): for (j, res) in enumerate(results): self.assertEqual(res.get(), sqr(j)) - -# -# Test that manager has expected number of shared objects left -# - -class _TestZZZNumberOfObjects(BaseTestCase): - # Because test cases are sorted alphabetically, this one will get - # run after all the other tests for the manager. It tests that - # there have been no "reference leaks" for the manager's shared - # objects. Note the comment in _TestPool.test_terminate(). - - # If some other test using ManagerMixin.manager fails, then the - # raised exception may keep alive a frame which holds a reference - # to a managed object. This will cause test_number_of_objects to - # also fail. - ALLOWED_TYPES = ('manager',) - - def test_number_of_objects(self): - EXPECTED_NUMBER = 1 # the pool object is still alive - multiprocessing.active_children() # discard dead process objs - gc.collect() # do garbage collection - refs = self.manager._number_of_objects() - debug_info = self.manager._debug_info() - if refs != EXPECTED_NUMBER: - print(self.manager._debug_info()) - print(debug_info) - - self.assertEqual(refs, EXPECTED_NUMBER) - # # Test of creating a customized manager class # @@ -2903,15 +2878,6 @@ class TestInvalidHandle(unittest.TestCase): # Functions used to create test cases from the base ones in this module # -def get_attributes(Source, names): - d = {} - for name in names: - obj = getattr(Source, name) - if type(obj) == type(get_attributes): - obj = staticmethod(obj) - d[name] = obj - return d - def create_test_cases(Mixin, type): result = {} glob = globals() @@ -2924,10 +2890,10 @@ def create_test_cases(Mixin, type): assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES) if type in base.ALLOWED_TYPES: newname = 'With' + Type + name[1:] - class Temp(base, unittest.TestCase, Mixin): + class Temp(base, Mixin, unittest.TestCase): pass result[newname] = Temp - Temp.__name__ = newname + Temp.__name__ = Temp.__qualname__ = newname Temp.__module__ = Mixin.__module__ return result @@ -2938,12 +2904,24 @@ def create_test_cases(Mixin, type): class ProcessesMixin(object): TYPE = 'processes' Process = multiprocessing.Process - locals().update(get_attributes(multiprocessing, ( - 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue', - 'RawArray', 'current_process', 'active_children', 'Pipe', - 'connection', 'JoinableQueue', 'Pool' - ))) + connection = multiprocessing.connection + current_process = staticmethod(multiprocessing.current_process) + active_children = staticmethod(multiprocessing.active_children) + Pool = staticmethod(multiprocessing.Pool) + Pipe = staticmethod(multiprocessing.Pipe) + Queue = staticmethod(multiprocessing.Queue) + JoinableQueue = staticmethod(multiprocessing.JoinableQueue) + Lock = staticmethod(multiprocessing.Lock) + RLock = staticmethod(multiprocessing.RLock) + Semaphore = staticmethod(multiprocessing.Semaphore) + BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) + Condition = staticmethod(multiprocessing.Condition) + Event = staticmethod(multiprocessing.Event) + Barrier = staticmethod(multiprocessing.Barrier) + Value = staticmethod(multiprocessing.Value) + Array = staticmethod(multiprocessing.Array) + RawValue = staticmethod(multiprocessing.RawValue) + RawArray = staticmethod(multiprocessing.RawArray) testcases_processes = create_test_cases(ProcessesMixin, type='processes') globals().update(testcases_processes) @@ -2952,12 +2930,42 @@ globals().update(testcases_processes) class ManagerMixin(object): TYPE = 'manager' Process = multiprocessing.Process - manager = object.__new__(multiprocessing.managers.SyncManager) - locals().update(get_attributes(manager, ( - 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'list', 'dict', - 'Namespace', 'JoinableQueue', 'Pool' - ))) + Queue = property(operator.attrgetter('manager.Queue')) + JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) + Lock = property(operator.attrgetter('manager.Lock')) + RLock = property(operator.attrgetter('manager.RLock')) + Semaphore = property(operator.attrgetter('manager.Semaphore')) + BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) + Condition = property(operator.attrgetter('manager.Condition')) + Event = property(operator.attrgetter('manager.Event')) + Barrier = property(operator.attrgetter('manager.Barrier')) + Value = property(operator.attrgetter('manager.Value')) + Array = property(operator.attrgetter('manager.Array')) + list = property(operator.attrgetter('manager.list')) + dict = property(operator.attrgetter('manager.dict')) + Namespace = property(operator.attrgetter('manager.Namespace')) + + @classmethod + def Pool(cls, *args, **kwds): + return cls.manager.Pool(*args, **kwds) + + @classmethod + def setUpClass(cls): + cls.manager = multiprocessing.Manager() + + @classmethod + def tearDownClass(cls): + multiprocessing.active_children() # discard dead process objs + gc.collect() # do garbage collection + if cls.manager._number_of_objects() != 0: + # This is not really an error since some tests do not + # ensure that all processes which hold a reference to a + # managed object have been joined. + print('Shared objects which still exist at manager shutdown:') + print(cls.manager._debug_info()) + cls.manager.shutdown() + cls.manager.join() + cls.manager = None testcases_manager = create_test_cases(ManagerMixin, type='manager') globals().update(testcases_manager) @@ -2966,16 +2974,27 @@ globals().update(testcases_manager) class ThreadsMixin(object): TYPE = 'threads' Process = multiprocessing.dummy.Process - locals().update(get_attributes(multiprocessing.dummy, ( - 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process', - 'active_children', 'Pipe', 'connection', 'dict', 'list', - 'Namespace', 'JoinableQueue', 'Pool' - ))) + connection = multiprocessing.dummy.connection + current_process = staticmethod(multiprocessing.dummy.current_process) + active_children = staticmethod(multiprocessing.dummy.active_children) + Pool = staticmethod(multiprocessing.Pool) + Pipe = staticmethod(multiprocessing.dummy.Pipe) + Queue = staticmethod(multiprocessing.dummy.Queue) + JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) + Lock = staticmethod(multiprocessing.dummy.Lock) + RLock = staticmethod(multiprocessing.dummy.RLock) + Semaphore = staticmethod(multiprocessing.dummy.Semaphore) + BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) + Condition = staticmethod(multiprocessing.dummy.Condition) + Event = staticmethod(multiprocessing.dummy.Event) + Barrier = staticmethod(multiprocessing.dummy.Barrier) + Value = staticmethod(multiprocessing.dummy.Value) + Array = staticmethod(multiprocessing.dummy.Array) testcases_threads = create_test_cases(ThreadsMixin, type='threads') globals().update(testcases_threads) + class OtherTest(unittest.TestCase): # TODO: add more tests for deliver/answer challenge. def test_deliver_challenge_auth_failure(self): @@ -3407,12 +3426,6 @@ def test_main(run=None): multiprocessing.get_logger().setLevel(LOG_LEVEL) - ProcessesMixin.pool = multiprocessing.Pool(4) - ThreadsMixin.pool = multiprocessing.dummy.Pool(4) - ManagerMixin.manager.__init__() - ManagerMixin.manager.start() - ManagerMixin.pool = ManagerMixin.manager.Pool(4) - testcases = ( sorted(testcases_processes.values(), key=lambda tc:tc.__name__) + sorted(testcases_threads.values(), key=lambda tc:tc.__name__) + @@ -3422,18 +3435,7 @@ def test_main(run=None): loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) - try: - run(suite) - finally: - ThreadsMixin.pool.terminate() - ProcessesMixin.pool.terminate() - ManagerMixin.pool.terminate() - ManagerMixin.pool.join() - ManagerMixin.manager.shutdown() - ManagerMixin.manager.join() - ThreadsMixin.pool.join() - ProcessesMixin.pool.join() - del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool + run(suite) def main(): test_main(unittest.TextTestRunner(verbosity=2).run) diff --git a/Lib/test/test_operator.py b/Lib/test/test_operator.py index fa608b9..b445ee0 100644 --- a/Lib/test/test_operator.py +++ b/Lib/test/test_operator.py @@ -410,6 +410,31 @@ class OperatorTestCase(unittest.TestCase): self.assertEqual(operator.__ixor__ (c, 5), "ixor") self.assertEqual(operator.__iconcat__ (c, c), "iadd") + def test_length_hint(self): + class X(object): + def __init__(self, value): + self.value = value + + def __length_hint__(self): + if type(self.value) is type: + raise self.value + else: + return self.value + + self.assertEqual(operator.length_hint([], 2), 0) + self.assertEqual(operator.length_hint(iter([1, 2, 3])), 3) + + self.assertEqual(operator.length_hint(X(2)), 2) + self.assertEqual(operator.length_hint(X(NotImplemented), 4), 4) + self.assertEqual(operator.length_hint(X(TypeError), 12), 12) + with self.assertRaises(TypeError): + operator.length_hint(X("abc")) + with self.assertRaises(ValueError): + operator.length_hint(X(-2)) + with self.assertRaises(LookupError): + operator.length_hint(X(LookupError)) + + def test_main(verbose=None): import sys test_classes = ( diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 7d6b377..09d3e75 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -1243,6 +1243,8 @@ if sys.platform != 'win32': def setUp(self): if support.TESTFN_UNENCODABLE: self.dir = support.TESTFN_UNENCODABLE + elif support.TESTFN_NONASCII: + self.dir = support.TESTFN_NONASCII else: self.dir = support.TESTFN self.bdir = os.fsencode(self.dir) @@ -1257,6 +1259,8 @@ if sys.platform != 'win32': add_filename(support.TESTFN_UNICODE) if support.TESTFN_UNENCODABLE: add_filename(support.TESTFN_UNENCODABLE) + if support.TESTFN_NONASCII: + add_filename(support.TESTFN_NONASCII) if not bytesfn: self.skipTest("couldn't create any non-ascii filename") @@ -2046,6 +2050,104 @@ class TermsizeTests(unittest.TestCase): self.assertEqual(expected, actual) +class OSErrorTests(unittest.TestCase): + def setUp(self): + class Str(str): + pass + + self.bytes_filenames = [] + self.unicode_filenames = [] + if support.TESTFN_UNENCODABLE is not None: + decoded = support.TESTFN_UNENCODABLE + else: + decoded = support.TESTFN + self.unicode_filenames.append(decoded) + self.unicode_filenames.append(Str(decoded)) + if support.TESTFN_UNDECODABLE is not None: + encoded = support.TESTFN_UNDECODABLE + else: + encoded = os.fsencode(support.TESTFN) + self.bytes_filenames.append(encoded) + self.bytes_filenames.append(memoryview(encoded)) + + self.filenames = self.bytes_filenames + self.unicode_filenames + + def test_oserror_filename(self): + funcs = [ + (self.filenames, os.chdir,), + (self.filenames, os.chmod, 0o777), + (self.filenames, os.lstat,), + (self.filenames, os.open, os.O_RDONLY), + (self.filenames, os.rmdir,), + (self.filenames, os.stat,), + (self.filenames, os.unlink,), + ] + if sys.platform == "win32": + funcs.extend(( + (self.bytes_filenames, os.rename, b"dst"), + (self.bytes_filenames, os.replace, b"dst"), + (self.unicode_filenames, os.rename, "dst"), + (self.unicode_filenames, os.replace, "dst"), + # Issue #16414: Don't test undecodable names with listdir() + # because of a Windows bug. + # + # With the ANSI code page 932, os.listdir(b'\xe7') return an + # empty list (instead of failing), whereas os.listdir(b'\xff') + # raises a FileNotFoundError. It looks like a Windows bug: + # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7') + # fails with ERROR_FILE_NOT_FOUND (2), instead of + # ERROR_PATH_NOT_FOUND (3). + (self.unicode_filenames, os.listdir,), + )) + else: + funcs.extend(( + (self.filenames, os.listdir,), + (self.filenames, os.rename, "dst"), + (self.filenames, os.replace, "dst"), + )) + if hasattr(os, "chown"): + funcs.append((self.filenames, os.chown, 0, 0)) + if hasattr(os, "lchown"): + funcs.append((self.filenames, os.lchown, 0, 0)) + if hasattr(os, "truncate"): + funcs.append((self.filenames, os.truncate, 0)) + if hasattr(os, "chflags"): + funcs.extend(( + (self.filenames, os.chflags, 0), + (self.filenames, os.lchflags, 0), + )) + if hasattr(os, "chroot"): + funcs.append((self.filenames, os.chroot,)) + if hasattr(os, "link"): + if sys.platform == "win32": + funcs.append((self.bytes_filenames, os.link, b"dst")) + funcs.append((self.unicode_filenames, os.link, "dst")) + else: + funcs.append((self.filenames, os.link, "dst")) + if hasattr(os, "listxattr"): + funcs.extend(( + (self.filenames, os.listxattr,), + (self.filenames, os.getxattr, "user.test"), + (self.filenames, os.setxattr, "user.test", b'user'), + (self.filenames, os.removexattr, "user.test"), + )) + if hasattr(os, "lchmod"): + funcs.append((self.filenames, os.lchmod, 0o777)) + if hasattr(os, "readlink"): + if sys.platform == "win32": + funcs.append((self.unicode_filenames, os.readlink,)) + else: + funcs.append((self.filenames, os.readlink,)) + + for filenames, func, *func_args in funcs: + for name in filenames: + try: + func(name, *func_args) + except OSError as err: + self.assertIs(err.filename, name) + else: + self.fail("No exception thrown by {}".format(func)) + @support.reap_threads def test_main(): support.run_unittest( @@ -2074,6 +2176,7 @@ def test_main(): ExtendedAttributeTests, Win32DeprecatedBytesAPI, TermsizeTests, + OSErrorTests, ) if __name__ == "__main__": diff --git a/Lib/test/test_pep277.py b/Lib/test/test_pep277.py index 4b16cbb..9bae6dc 100644 --- a/Lib/test/test_pep277.py +++ b/Lib/test/test_pep277.py @@ -99,10 +99,6 @@ class UnicodeFileTests(unittest.TestCase): with self.assertRaises(expected_exception) as c: fn(filename) exc_filename = c.exception.filename - # listdir may append a wildcard to the filename - if fn is os.listdir and sys.platform == 'win32': - exc_filename, _, wildcard = exc_filename.rpartition(os.sep) - self.assertEqual(wildcard, '*.*') if check_filename: self.assertEqual(exc_filename, filename, "Function '%s(%a) failed " "with bad filename in the exception: %a" % diff --git a/Lib/test/test_random.py b/Lib/test/test_random.py index b5931ba..a9aec70 100644 --- a/Lib/test/test_random.py +++ b/Lib/test/test_random.py @@ -46,6 +46,39 @@ class TestBasicOps(unittest.TestCase): self.assertRaises(TypeError, self.gen.seed, 1, 2, 3, 4) self.assertRaises(TypeError, type(self.gen), []) + def test_shuffle(self): + shuffle = self.gen.shuffle + lst = [] + shuffle(lst) + self.assertEqual(lst, []) + lst = [37] + shuffle(lst) + self.assertEqual(lst, [37]) + seqs = [list(range(n)) for n in range(10)] + shuffled_seqs = [list(range(n)) for n in range(10)] + for shuffled_seq in shuffled_seqs: + shuffle(shuffled_seq) + for (seq, shuffled_seq) in zip(seqs, shuffled_seqs): + self.assertEqual(len(seq), len(shuffled_seq)) + self.assertEqual(set(seq), set(shuffled_seq)) + + # The above tests all would pass if the shuffle was a + # no-op. The following non-deterministic test covers that. It + # asserts that the shuffled sequence of 1000 distinct elements + # must be different from the original one. Although there is + # mathematically a non-zero probability that this could + # actually happen in a genuinely random shuffle, it is + # completely negligible, given that the number of possible + # permutations of 1000 objects is 1000! (factorial of 1000), + # which is considerably larger than the number of atoms in the + # universe... + lst = list(range(1000)) + shuffled_lst = list(range(1000)) + shuffle(shuffled_lst) + self.assertTrue(lst != shuffled_lst) + shuffle(lst) + self.assertTrue(lst != shuffled_lst) + def test_choice(self): choice = self.gen.choice with self.assertRaises(IndexError): diff --git a/Lib/test/test_range.py b/Lib/test/test_range.py index 2a13bfe..f088387 100644 --- a/Lib/test/test_range.py +++ b/Lib/test/test_range.py @@ -313,7 +313,7 @@ class RangeTest(unittest.TestCase): self.assertRaises(TypeError, range, IN()) # Test use of user-defined classes in slice indices. - self.assertEqual(list(range(10)[:I(5)]), list(range(5))) + self.assertEqual(range(10)[:I(5)], range(5)) with self.assertRaises(RuntimeError): range(0, 10)[:IX()] diff --git a/Lib/test/test_select.py b/Lib/test/test_select.py index ddb9a0f..496709c 100644 --- a/Lib/test/test_select.py +++ b/Lib/test/test_select.py @@ -5,7 +5,7 @@ import sys import unittest from test import support -@unittest.skipIf(sys.platform[:3] in ('win', 'os2', 'riscos'), +@unittest.skipIf((sys.platform[:3]=='win') or (sys.platform=='riscos'), "can't easily test on this system") class SelectTestCase(unittest.TestCase): diff --git a/Lib/test/test_set.py b/Lib/test/test_set.py index 8e9e587..da62723 100644 --- a/Lib/test/test_set.py +++ b/Lib/test/test_set.py @@ -848,8 +848,6 @@ class TestBasicOps(unittest.TestCase): for v in self.set: self.assertIn(v, self.values) setiter = iter(self.set) - # note: __length_hint__ is an internal undocumented API, - # don't rely on it in your own programs self.assertEqual(setiter.__length_hint__(), len(self.set)) def test_pickling(self): diff --git a/Lib/test/test_shelve.py b/Lib/test/test_shelve.py index 13c1265..bd51d86 100644 --- a/Lib/test/test_shelve.py +++ b/Lib/test/test_shelve.py @@ -148,6 +148,19 @@ class TestCase(unittest.TestCase): p2 = d[encodedkey] self.assertNotEqual(p1, p2) # Write creates new object in store + def test_with(self): + d1 = {} + with shelve.Shelf(d1, protocol=2, writeback=False) as s: + s['key1'] = [1,2,3,4] + self.assertEqual(s['key1'], [1,2,3,4]) + self.assertEqual(len(s), 1) + self.assertRaises(ValueError, len, s) + try: + s['key1'] + except ValueError: + pass + else: + self.fail('Closed shelf should not find a key') from test import mapping_tests diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py index eb0e9a4..c3997dc 100644 --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -18,7 +18,8 @@ from shutil import (_make_tarball, _make_zipfile, make_archive, register_archive_format, unregister_archive_format, get_archive_formats, Error, unpack_archive, register_unpack_format, RegistryError, - unregister_unpack_format, get_unpack_formats) + unregister_unpack_format, get_unpack_formats, + SameFileError) import tarfile import warnings @@ -688,7 +689,7 @@ class TestShutil(unittest.TestCase): with open(src, 'w') as f: f.write('cheddar') os.link(src, dst) - self.assertRaises(shutil.Error, shutil.copyfile, src, dst) + self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) with open(src, 'r') as f: self.assertEqual(f.read(), 'cheddar') os.remove(dst) @@ -708,7 +709,7 @@ class TestShutil(unittest.TestCase): # to TESTFN/TESTFN/cheese, while it should point at # TESTFN/cheese. os.symlink('cheese', dst) - self.assertRaises(shutil.Error, shutil.copyfile, src, dst) + self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) with open(src, 'r') as f: self.assertEqual(f.read(), 'cheddar') os.remove(dst) @@ -1215,6 +1216,16 @@ class TestShutil(unittest.TestCase): self.assertTrue(os.path.exists(rv)) self.assertEqual(read_file(src_file), read_file(dst_file)) + def test_copyfile_same_file(self): + # copyfile() should raise SameFileError if the source and destination + # are the same. + src_dir = self.mkdtemp() + src_file = os.path.join(src_dir, 'foo') + write_file(src_file, 'foo') + self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file) + # But Error should work too, to stay backward compatible. + self.assertRaises(Error, shutil.copyfile, src_file, src_file) + def test_copytree_return_value(self): # copytree returns its destination path. src_dir = self.mkdtemp() diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 6be259b..1b73634 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -15,7 +15,7 @@ try: except ImportError: threading = None -if sys.platform in ('os2', 'riscos'): +if sys.platform == 'riscos': raise unittest.SkipTest("Can't test signal on %s" % sys.platform) diff --git a/Lib/test/test_site.py b/Lib/test/test_site.py index 29286c7..5090239 100644 --- a/Lib/test/test_site.py +++ b/Lib/test/test_site.py @@ -222,7 +222,7 @@ class HelperFunctionsTests(unittest.TestCase): site.PREFIXES = ['xoxo'] dirs = site.getsitepackages() - if sys.platform in ('os2emx', 'riscos'): + if sys.platform == 'riscos': self.assertEqual(len(dirs), 1) wanted = os.path.join('xoxo', 'Lib', 'site-packages') self.assertEqual(dirs[0], wanted) diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py index 353f8ff..58fc5d03 100644 --- a/Lib/test/test_socketserver.py +++ b/Lib/test/test_socketserver.py @@ -27,7 +27,7 @@ TEST_STR = b"hello world\n" HOST = test.support.HOST HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX") -HAVE_FORKING = hasattr(os, "fork") and os.name != "os2" +HAVE_FORKING = hasattr(os, "fork") def signal_alarm(n): """Call signal.alarm when it exists (i.e. not on Windows).""" @@ -93,21 +93,7 @@ class SocketServerTest(unittest.TestCase): # XXX: We need a way to tell AF_UNIX to pick its own name # like AF_INET provides port==0. dir = None - if os.name == 'os2': - dir = '\socket' fn = tempfile.mktemp(prefix='unix_socket.', dir=dir) - if os.name == 'os2': - # AF_UNIX socket names on OS/2 require a specific prefix - # which can't include a drive letter and must also use - # backslashes as directory separators - if fn[1] == ':': - fn = fn[2:] - if fn[0] in (os.sep, os.altsep): - fn = fn[1:] - if os.sep == '/': - fn = fn.replace(os.sep, os.altsep) - else: - fn = fn.replace(os.altsep, os.sep) self.test_files.append(fn) return fn diff --git a/Lib/test/test_sundry.py b/Lib/test/test_sundry.py index fcccdf7..ea1325a 100644 --- a/Lib/test/test_sundry.py +++ b/Lib/test/test_sundry.py @@ -13,7 +13,6 @@ class TestUntestedModules(unittest.TestCase): import distutils.bcppcompiler import distutils.ccompiler import distutils.cygwinccompiler - import distutils.emxccompiler import distutils.filelist if sys.platform.startswith('win'): import distutils.msvccompiler @@ -48,7 +47,6 @@ class TestUntestedModules(unittest.TestCase): import macurl2path import mailcap import nturl2path - import os2emxpath import pstats import py_compile import sndhdr diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index e5ec85c..a1074c3 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -482,7 +482,7 @@ class SysModuleTest(unittest.TestCase): def test_thread_info(self): info = sys.thread_info self.assertEqual(len(info), 3) - self.assertIn(info.name, ('nt', 'os2', 'pthread', 'solaris', None)) + self.assertIn(info.name, ('nt', 'pthread', 'solaris', None)) self.assertIn(info.lock, ('semaphore', 'mutex+cond', None)) def test_43581(self): diff --git a/Lib/test/test_sysconfig.py b/Lib/test/test_sysconfig.py index 9219360..5293649 100644 --- a/Lib/test/test_sysconfig.py +++ b/Lib/test/test_sysconfig.py @@ -234,7 +234,7 @@ class TestSysConfig(unittest.TestCase): self.assertTrue(os.path.isfile(config_h), config_h) def test_get_scheme_names(self): - wanted = ('nt', 'nt_user', 'os2', 'os2_home', 'osx_framework_user', + wanted = ('nt', 'nt_user', 'osx_framework_user', 'posix_home', 'posix_prefix', 'posix_user') self.assertEqual(get_scheme_names(), wanted) @@ -305,14 +305,13 @@ class TestSysConfig(unittest.TestCase): if 'MACOSX_DEPLOYMENT_TARGET' in env: del env['MACOSX_DEPLOYMENT_TARGET'] - with open('/dev/null', 'w') as devnull_fp: - p = subprocess.Popen([ - sys.executable, '-c', - 'import sysconfig; print(sysconfig.get_platform())', - ], - stdout=subprocess.PIPE, - stderr=devnull_fp, - env=env) + p = subprocess.Popen([ + sys.executable, '-c', + 'import sysconfig; print(sysconfig.get_platform())', + ], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + env=env) test_platform = p.communicate()[0].strip() test_platform = test_platform.decode('utf-8') status = p.wait() @@ -325,20 +324,19 @@ class TestSysConfig(unittest.TestCase): env = os.environ.copy() env['MACOSX_DEPLOYMENT_TARGET'] = '10.1' - with open('/dev/null') as dev_null: - p = subprocess.Popen([ - sys.executable, '-c', - 'import sysconfig; print(sysconfig.get_platform())', - ], - stdout=subprocess.PIPE, - stderr=dev_null, - env=env) - test_platform = p.communicate()[0].strip() - test_platform = test_platform.decode('utf-8') - status = p.wait() - - self.assertEqual(status, 0) - self.assertEqual(my_platform, test_platform) + p = subprocess.Popen([ + sys.executable, '-c', + 'import sysconfig; print(sysconfig.get_platform())', + ], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + env=env) + test_platform = p.communicate()[0].strip() + test_platform = test_platform.decode('utf-8') + status = p.wait() + + self.assertEqual(status, 0) + self.assertEqual(my_platform, test_platform) def test_srcdir(self): # See Issues #15322, #15364. diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py index d79f319..8d161d9 100644 --- a/Lib/test/test_tempfile.py +++ b/Lib/test/test_tempfile.py @@ -276,7 +276,7 @@ class TestMkstempInner(BaseTestCase): file = self.do_create() mode = stat.S_IMODE(os.stat(file.name).st_mode) expected = 0o600 - if sys.platform in ('win32', 'os2emx'): + if sys.platform == 'win32': # There's no distinction among 'user', 'group' and 'world'; # replicate the 'user' bits. user = expected >> 6 @@ -310,7 +310,7 @@ class TestMkstempInner(BaseTestCase): # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted, # but an arg with embedded spaces should be decorated with double # quotes on each end - if sys.platform in ('win32',): + if sys.platform == 'win32': decorated = '"%s"' % sys.executable tester = '"%s"' % tester else: @@ -479,7 +479,7 @@ class TestMkdtemp(BaseTestCase): mode = stat.S_IMODE(os.stat(dir).st_mode) mode &= 0o777 # Mask off sticky bits inherited from /tmp expected = 0o700 - if sys.platform in ('win32', 'os2emx'): + if sys.platform == 'win32': # There's no distinction among 'user', 'group' and 'world'; # replicate the 'user' bits. user = expected >> 6 diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index a191e15..f9a721b 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -68,7 +68,7 @@ class ThreadRunningTests(BasicThreadTest): thread.stack_size(0) self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default") - if os.name not in ("nt", "os2", "posix"): + if os.name not in ("nt", "posix"): return tss_supported = True diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index bb8d9b6..13c017d 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -451,8 +451,7 @@ class ThreadJoinOnShutdown(BaseTestCase): # #12316 and #11870), and fork() from a worker thread is known to trigger # problems with some operating systems (issue #3863): skip problematic tests # on platforms known to behave badly. - platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5', - 'os2emx') + platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5') def _run_and_join(self, script): script = """if 1: @@ -754,7 +753,8 @@ class ThreadingExceptionTests(BaseTestCase): lock = threading.Lock() self.assertRaises(RuntimeError, lock.release) - @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem') + @unittest.skipUnless(sys.platform == 'darwin' and test.support.python_is_optimized(), + 'test macosx problem') def test_recursion_limit(self): # Issue 9670 # test that excessive recursion within a non-main thread causes diff --git a/Lib/test/test_threadsignals.py b/Lib/test/test_threadsignals.py index f975a75..3a0a493 100644 --- a/Lib/test/test_threadsignals.py +++ b/Lib/test/test_threadsignals.py @@ -8,7 +8,7 @@ from test.support import run_unittest, import_module thread = import_module('_thread') import time -if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos': +if (sys.platform[:3] == 'win') or (sys.platform=='riscos'): raise unittest.SkipTest("Can't test signal on %s" % sys.platform) process_pid = os.getpid() diff --git a/Lib/test/test_unicode.py b/Lib/test/test_unicode.py index a811c4c..7da5c8b 100644 --- a/Lib/test/test_unicode.py +++ b/Lib/test/test_unicode.py @@ -1963,9 +1963,10 @@ class UnicodeTest(string_tests.CommonTest, # Test PyUnicode_FromFormat() def test_from_format(self): support.import_module('ctypes') - from ctypes import (pythonapi, py_object, + from ctypes import ( + pythonapi, py_object, sizeof, c_int, c_long, c_longlong, c_ssize_t, - c_uint, c_ulong, c_ulonglong, c_size_t) + c_uint, c_ulong, c_ulonglong, c_size_t, c_void_p) name = "PyUnicode_FromFormat" _PyUnicode_FromFormat = getattr(pythonapi, name) _PyUnicode_FromFormat.restype = py_object @@ -2016,6 +2017,31 @@ class UnicodeTest(string_tests.CommonTest, self.assertEqual(PyUnicode_FromFormat(b'%llu', c_ulonglong(123)), '123') self.assertEqual(PyUnicode_FromFormat(b'%zu', c_size_t(123)), '123') + # test long output + min_longlong = -(2 ** (8 * sizeof(c_longlong) - 1)) + max_longlong = -min_longlong - 1 + self.assertEqual(PyUnicode_FromFormat(b'%lld', c_longlong(min_longlong)), str(min_longlong)) + self.assertEqual(PyUnicode_FromFormat(b'%lld', c_longlong(max_longlong)), str(max_longlong)) + max_ulonglong = 2 ** (8 * sizeof(c_ulonglong)) - 1 + self.assertEqual(PyUnicode_FromFormat(b'%llu', c_ulonglong(max_ulonglong)), str(max_ulonglong)) + PyUnicode_FromFormat(b'%p', c_void_p(-1)) + + # test padding (width and/or precision) + self.assertEqual(PyUnicode_FromFormat(b'%010i', c_int(123)), '123'.rjust(10, '0')) + self.assertEqual(PyUnicode_FromFormat(b'%100i', c_int(123)), '123'.rjust(100)) + self.assertEqual(PyUnicode_FromFormat(b'%.100i', c_int(123)), '123'.rjust(100, '0')) + self.assertEqual(PyUnicode_FromFormat(b'%100.80i', c_int(123)), '123'.rjust(80, '0').rjust(100)) + + self.assertEqual(PyUnicode_FromFormat(b'%010u', c_uint(123)), '123'.rjust(10, '0')) + self.assertEqual(PyUnicode_FromFormat(b'%100u', c_uint(123)), '123'.rjust(100)) + self.assertEqual(PyUnicode_FromFormat(b'%.100u', c_uint(123)), '123'.rjust(100, '0')) + self.assertEqual(PyUnicode_FromFormat(b'%100.80u', c_uint(123)), '123'.rjust(80, '0').rjust(100)) + + self.assertEqual(PyUnicode_FromFormat(b'%010x', c_int(0x123)), '123'.rjust(10, '0')) + self.assertEqual(PyUnicode_FromFormat(b'%100x', c_int(0x123)), '123'.rjust(100)) + self.assertEqual(PyUnicode_FromFormat(b'%.100x', c_int(0x123)), '123'.rjust(100, '0')) + self.assertEqual(PyUnicode_FromFormat(b'%100.80x', c_int(0x123)), '123'.rjust(80, '0').rjust(100)) + # test %A text = PyUnicode_FromFormat(b'%%A:%A', 'abc\xe9\uabcd\U0010ffff') self.assertEqual(text, r"%A:'abc\xe9\uabcd\U0010ffff'") diff --git a/Lib/test/test_unicodedata.py b/Lib/test/test_unicodedata.py index 99aa003..677508b 100644 --- a/Lib/test/test_unicodedata.py +++ b/Lib/test/test_unicodedata.py @@ -80,7 +80,7 @@ class UnicodeDatabaseTest(unittest.TestCase): class UnicodeFunctionsTest(UnicodeDatabaseTest): # update this, if the database changes - expectedchecksum = '17fe2f12b788e4fff5479b469c4404bb6ecf841f' + expectedchecksum = 'ebd64e81553c9cb37f424f5616254499fcd8849e' def test_function_checksum(self): data = [] h = hashlib.sha1() diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py index 623eb5e..d8c3512 100644 --- a/Lib/test/test_urllib.py +++ b/Lib/test/test_urllib.py @@ -270,9 +270,10 @@ Content-Type: text/html; charset=iso-8859-1 def test_missing_localfile(self): # Test for #10836 - # 3.3 - URLError is not captured, explicit IOError is raised. - with self.assertRaises(IOError): + with self.assertRaises(urllib.error.URLError) as e: urlopen('file://localhost/a/file/which/doesnot/exists.py') + self.assertTrue(e.exception.filename) + self.assertTrue(e.exception.reason) def test_file_notexists(self): fd, tmp_file = tempfile.mkstemp() @@ -285,20 +286,21 @@ Content-Type: text/html; charset=iso-8859-1 os.close(fd) os.unlink(tmp_file) self.assertFalse(os.path.exists(tmp_file)) - # 3.3 - IOError instead of URLError - with self.assertRaises(IOError): + with self.assertRaises(urllib.error.URLError): urlopen(tmp_fileurl) def test_ftp_nohost(self): test_ftp_url = 'ftp:///path' - # 3.3 - IOError instead of URLError - with self.assertRaises(IOError): + with self.assertRaises(urllib.error.URLError) as e: urlopen(test_ftp_url) + self.assertFalse(e.exception.filename) + self.assertTrue(e.exception.reason) def test_ftp_nonexisting(self): - # 3.3 - IOError instead of URLError - with self.assertRaises(IOError): + with self.assertRaises(urllib.error.URLError) as e: urlopen('ftp://localhost/a/file/which/doesnot/exists.py') + self.assertFalse(e.exception.filename) + self.assertTrue(e.exception.reason) def test_userpass_inurl(self): diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py index 7ae1553..00ee669 100644 --- a/Lib/test/test_urllib2.py +++ b/Lib/test/test_urllib2.py @@ -302,6 +302,7 @@ class MockHTTPClass: self.req_headers = [] self.data = None self.raise_on_endheaders = False + self.sock = None self._tunnel_headers = {} def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): diff --git a/Lib/test/test_venv.py b/Lib/test/test_venv.py index 9696844..2a528c9 100644 --- a/Lib/test/test_venv.py +++ b/Lib/test/test_venv.py @@ -113,13 +113,68 @@ class BasicTest(BaseTest): out, err = p.communicate() self.assertEqual(out.strip(), expected.encode()) + if sys.platform == 'win32': + ENV_SUBDIRS = ( + ('Scripts',), + ('Include',), + ('Lib',), + ('Lib', 'site-packages'), + ) + else: + ENV_SUBDIRS = ( + ('bin',), + ('include',), + ('lib',), + ('lib', 'python%d.%d' % sys.version_info[:2]), + ('lib', 'python%d.%d' % sys.version_info[:2], 'site-packages'), + ) + + def create_contents(self, paths, filename): + """ + Create some files in the environment which are unrelated + to the virtual environment. + """ + for subdirs in paths: + d = os.path.join(self.env_dir, *subdirs) + os.mkdir(d) + fn = os.path.join(d, filename) + with open(fn, 'wb') as f: + f.write(b'Still here?') + def test_overwrite_existing(self): """ - Test control of overwriting an existing environment directory. + Test creating environment in an existing directory. """ - self.assertRaises(ValueError, venv.create, self.env_dir) + self.create_contents(self.ENV_SUBDIRS, 'foo') + venv.create(self.env_dir) + for subdirs in self.ENV_SUBDIRS: + fn = os.path.join(self.env_dir, *(subdirs + ('foo',))) + self.assertTrue(os.path.exists(fn)) + with open(fn, 'rb') as f: + self.assertEqual(f.read(), b'Still here?') + builder = venv.EnvBuilder(clear=True) builder.create(self.env_dir) + for subdirs in self.ENV_SUBDIRS: + fn = os.path.join(self.env_dir, *(subdirs + ('foo',))) + self.assertFalse(os.path.exists(fn)) + + def clear_directory(self, path): + for fn in os.listdir(path): + fn = os.path.join(path, fn) + if os.path.islink(fn) or os.path.isfile(fn): + os.remove(fn) + elif os.path.isdir(fn): + shutil.rmtree(fn) + + def test_unoverwritable_fails(self): + #create a file clashing with directories in the env dir + for paths in self.ENV_SUBDIRS[:3]: + fn = os.path.join(self.env_dir, *paths) + with open(fn, 'wb') as f: + f.write(b'') + self.assertRaises((ValueError, OSError), venv.create, self.env_dir) + self.clear_directory(self.env_dir) def test_upgrade(self): """ |