diff options
Diffstat (limited to 'Lib/test')
38 files changed, 838 insertions, 186 deletions
diff --git a/Lib/test/datetimetester.py b/Lib/test/datetimetester.py index 3f3c60a..b8c6138 100644 --- a/Lib/test/datetimetester.py +++ b/Lib/test/datetimetester.py @@ -2270,13 +2270,14 @@ class TestTime(HarmlessMixedComparison, unittest.TestCase): self.assertEqual(orig, derived) def test_bool(self): + # time is always True. cls = self.theclass self.assertTrue(cls(1)) self.assertTrue(cls(0, 1)) self.assertTrue(cls(0, 0, 1)) self.assertTrue(cls(0, 0, 0, 1)) - self.assertFalse(cls(0)) - self.assertFalse(cls()) + self.assertTrue(cls(0)) + self.assertTrue(cls()) def test_replace(self): cls = self.theclass @@ -2629,7 +2630,7 @@ class TestTimeTZ(TestTime, TZInfoBase, unittest.TestCase): self.assertEqual(derived.tzname(), 'cookie') def test_more_bool(self): - # Test cases with non-None tzinfo. + # time is always True. cls = self.theclass t = cls(0, tzinfo=FixedOffset(-300, "")) @@ -2639,23 +2640,11 @@ class TestTimeTZ(TestTime, TZInfoBase, unittest.TestCase): self.assertTrue(t) t = cls(5, tzinfo=FixedOffset(300, "")) - self.assertFalse(t) + self.assertTrue(t) t = cls(23, 59, tzinfo=FixedOffset(23*60 + 59, "")) - self.assertFalse(t) - - # Mostly ensuring this doesn't overflow internally. - t = cls(0, tzinfo=FixedOffset(23*60 + 59, "")) self.assertTrue(t) - # But this should yield a value error -- the utcoffset is bogus. - t = cls(0, tzinfo=FixedOffset(24*60, "")) - self.assertRaises(ValueError, lambda: bool(t)) - - # Likewise. - t = cls(0, tzinfo=FixedOffset(-24*60, "")) - self.assertRaises(ValueError, lambda: bool(t)) - def test_replace(self): cls = self.theclass z100 = FixedOffset(100, "+100") diff --git a/Lib/test/fork_wait.py b/Lib/test/fork_wait.py index 19b54ec..8c7c3aa 100644 --- a/Lib/test/fork_wait.py +++ b/Lib/test/fork_wait.py @@ -48,7 +48,12 @@ class ForkWait(unittest.TestCase): for i in range(NUM_THREADS): _thread.start_new(self.f, (i,)) - time.sleep(LONGSLEEP) + # busy-loop to wait for threads + deadline = time.monotonic() + 10.0 + while len(self.alive) < NUM_THREADS: + time.sleep(0.1) + if time.monotonic() <= deadline: + break a = sorted(self.alive.keys()) self.assertEqual(a, list(range(NUM_THREADS))) diff --git a/Lib/test/ssl_servers.py b/Lib/test/ssl_servers.py index 759b3f4..f9d30cf 100644 --- a/Lib/test/ssl_servers.py +++ b/Lib/test/ssl_servers.py @@ -150,7 +150,7 @@ class HTTPSServerThread(threading.Thread): def make_https_server(case, *, context=None, certfile=CERTFILE, host=HOST, handler_class=None): if context is None: - context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) # We assume the certfile contains both private key and certificate context.load_cert_chain(certfile) server = HTTPSServerThread(context, host, handler_class) @@ -182,6 +182,8 @@ if __name__ == "__main__": parser.add_argument('--curve-name', dest='curve_name', type=str, action='store', help='curve name for EC-based Diffie-Hellman') + parser.add_argument('--ciphers', dest='ciphers', type=str, + help='allowed cipher list') parser.add_argument('--dh', dest='dh_file', type=str, action='store', help='PEM file containing DH parameters') args = parser.parse_args() @@ -192,12 +194,14 @@ if __name__ == "__main__": else: handler_class = RootedHTTPRequestHandler handler_class.root = os.getcwd() - context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context.load_cert_chain(CERTFILE) if args.curve_name: context.set_ecdh_curve(args.curve_name) if args.dh_file: context.load_dh_params(args.dh_file) + if args.ciphers: + context.set_ciphers(args.ciphers) server = HTTPSServer(("", args.port), handler_class, context) if args.verbose: diff --git a/Lib/test/string_tests.py b/Lib/test/string_tests.py index 5ed01f2..569bae1 100644 --- a/Lib/test/string_tests.py +++ b/Lib/test/string_tests.py @@ -1178,8 +1178,7 @@ class MixinStrUnicodeUserStringTest: self.checkraises(TypeError, 'abc', '__mod__') self.checkraises(TypeError, '%(foo)s', '__mod__', 42) self.checkraises(TypeError, '%s%s', '__mod__', (42,)) - with self.assertWarns(DeprecationWarning): - self.checkraises(TypeError, '%c', '__mod__', (None,)) + self.checkraises(TypeError, '%c', '__mod__', (None,)) self.checkraises(ValueError, '%(foo', '__mod__', {}) self.checkraises(TypeError, '%(foo)s %(bar)s', '__mod__', ('foo', 42)) self.checkraises(TypeError, '%d', '__mod__', "42") # not numeric diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py index 084d247..5aaedf3 100644 --- a/Lib/test/test_asyncore.py +++ b/Lib/test/test_asyncore.py @@ -349,9 +349,8 @@ class dispatcherwithsend_noread(asyncore.dispatcher_with_send): def handle_connect(self): pass -class DispatcherWithSendTests(unittest.TestCase): - usepoll = False +class DispatcherWithSendTests(unittest.TestCase): def setUp(self): pass @@ -401,10 +400,6 @@ class DispatcherWithSendTests(unittest.TestCase): self.fail("join() timed out") - -class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests): - usepoll = True - @unittest.skipUnless(hasattr(asyncore, 'file_wrapper'), 'asyncore.file_wrapper required') class FileWrapperTest(unittest.TestCase): diff --git a/Lib/test/test_augassign.py b/Lib/test/test_augassign.py index 9a59c58..19b7687 100644 --- a/Lib/test/test_augassign.py +++ b/Lib/test/test_augassign.py @@ -136,6 +136,14 @@ class AugAssignTest(unittest.TestCase): output.append("__imul__ called") return self + def __matmul__(self, val): + output.append("__matmul__ called") + def __rmatmul__(self, val): + output.append("__rmatmul__ called") + def __imatmul__(self, val): + output.append("__imatmul__ called") + return self + def __div__(self, val): output.append("__div__ called") def __rdiv__(self, val): @@ -233,6 +241,10 @@ class AugAssignTest(unittest.TestCase): 1 * x x *= 1 + x @ 1 + 1 @ x + x @= 1 + x / 1 1 / x x /= 1 @@ -279,6 +291,9 @@ __isub__ called __mul__ called __rmul__ called __imul__ called +__matmul__ called +__rmatmul__ called +__imatmul__ called __truediv__ called __rtruediv__ called __itruediv__ called diff --git a/Lib/test/test_builtin.py b/Lib/test/test_builtin.py index b561a6f..018ac8d 100644 --- a/Lib/test/test_builtin.py +++ b/Lib/test/test_builtin.py @@ -1092,7 +1092,7 @@ class BuiltinTest(unittest.TestCase): self.assertAlmostEqual(pow(-1, 0.5), 1j) self.assertAlmostEqual(pow(-1, 1/3), 0.5 + 0.8660254037844386j) - self.assertRaises(TypeError, pow, -1, -2, 3) + self.assertRaises(ValueError, pow, -1, -2, 3) self.assertRaises(ValueError, pow, 1, 2, 0) self.assertRaises(TypeError, pow) diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py index ba7c38d..ba7f2c4 100644 --- a/Lib/test/test_capi.py +++ b/Lib/test/test_capi.py @@ -150,6 +150,23 @@ class CAPITest(unittest.TestCase): self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__, "($module, /, parameter)") + def test_c_type_with_matrix_multiplication(self): + M = _testcapi.matmulType + m1 = M() + m2 = M() + self.assertEqual(m1 @ m2, ("matmul", m1, m2)) + self.assertEqual(m1 @ 42, ("matmul", m1, 42)) + self.assertEqual(42 @ m1, ("matmul", 42, m1)) + o = m1 + o @= m2 + self.assertEqual(o, ("imatmul", m1, m2)) + o = m1 + o @= 42 + self.assertEqual(o, ("imatmul", m1, 42)) + o = 42 + o @= m1 + self.assertEqual(o, ("matmul", 42, m1)) + @unittest.skipUnless(threading, 'Threading required for this test.') class TestPendingCalls(unittest.TestCase): @@ -319,34 +336,38 @@ class EmbeddingTests(unittest.TestCase): print() print(out) print(err) + expected_errors = sys.__stdout__.errors expected_stdin_encoding = sys.__stdin__.encoding expected_pipe_encoding = self._get_default_pipe_encoding() expected_output = os.linesep.join([ "--- Use defaults ---", "Expected encoding: default", "Expected errors: default", - "stdin: {0}:strict", - "stdout: {1}:strict", - "stderr: {1}:backslashreplace", + "stdin: {in_encoding}:{errors}", + "stdout: {out_encoding}:{errors}", + "stderr: {out_encoding}:backslashreplace", "--- Set errors only ---", "Expected encoding: default", - "Expected errors: surrogateescape", - "stdin: {0}:surrogateescape", - "stdout: {1}:surrogateescape", - "stderr: {1}:backslashreplace", + "Expected errors: ignore", + "stdin: {in_encoding}:ignore", + "stdout: {out_encoding}:ignore", + "stderr: {out_encoding}:backslashreplace", "--- Set encoding only ---", "Expected encoding: latin-1", "Expected errors: default", - "stdin: latin-1:strict", - "stdout: latin-1:strict", + "stdin: latin-1:{errors}", + "stdout: latin-1:{errors}", "stderr: latin-1:backslashreplace", "--- Set encoding and errors ---", "Expected encoding: latin-1", - "Expected errors: surrogateescape", - "stdin: latin-1:surrogateescape", - "stdout: latin-1:surrogateescape", - "stderr: latin-1:backslashreplace"]).format(expected_stdin_encoding, - expected_pipe_encoding) + "Expected errors: replace", + "stdin: latin-1:replace", + "stdout: latin-1:replace", + "stderr: latin-1:backslashreplace"]) + expected_output = expected_output.format( + in_encoding=expected_stdin_encoding, + out_encoding=expected_pipe_encoding, + errors=expected_errors) # This is useful if we ever trip over odd platform behaviour self.maxDiff = None self.assertEqual(out.strip(), expected_output) diff --git a/Lib/test/test_codeccallbacks.py b/Lib/test/test_codeccallbacks.py index 84804bb..a1ce9cf 100644 --- a/Lib/test/test_codeccallbacks.py +++ b/Lib/test/test_codeccallbacks.py @@ -819,7 +819,7 @@ class CodecCallbackTest(unittest.TestCase): def __getitem__(self, key): raise ValueError #self.assertRaises(ValueError, "\xff".translate, D()) - self.assertRaises(TypeError, "\xff".translate, {0xff: sys.maxunicode+1}) + self.assertRaises(ValueError, "\xff".translate, {0xff: sys.maxunicode+1}) self.assertRaises(TypeError, "\xff".translate, {0xff: ()}) def test_bug828737(self): diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index 9b62d5b..6945a99 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -890,10 +890,6 @@ class CP65001Test(ReadTest, unittest.TestCase): "\U00010fff\uD800") self.assertTrue(codecs.lookup_error("surrogatepass")) - def test_readline(self): - self.skipTest("issue #20571: code page 65001 codec does not " - "support partial decoder yet") - class UTF7Test(ReadTest, unittest.TestCase): encoding = "utf-7" @@ -2750,15 +2746,15 @@ class CodePageTest(unittest.TestCase): self.assertRaisesRegex(UnicodeEncodeError, 'cp932', codecs.code_page_encode, 932, '\xff') self.assertRaisesRegex(UnicodeDecodeError, 'cp932', - codecs.code_page_decode, 932, b'\x81\x00') + codecs.code_page_decode, 932, b'\x81\x00', 'strict', True) self.assertRaisesRegex(UnicodeDecodeError, 'CP_UTF8', - codecs.code_page_decode, self.CP_UTF8, b'\xff') + codecs.code_page_decode, self.CP_UTF8, b'\xff', 'strict', True) def check_decode(self, cp, tests): for raw, errors, expected in tests: if expected is not None: try: - decoded = codecs.code_page_decode(cp, raw, errors) + decoded = codecs.code_page_decode(cp, raw, errors, True) except UnicodeDecodeError as err: self.fail('Unable to decode %a from "cp%s" with ' 'errors=%r: %s' % (raw, cp, errors, err)) @@ -2770,7 +2766,7 @@ class CodePageTest(unittest.TestCase): self.assertLessEqual(decoded[1], len(raw)) else: self.assertRaises(UnicodeDecodeError, - codecs.code_page_decode, cp, raw, errors) + codecs.code_page_decode, cp, raw, errors, True) def check_encode(self, cp, tests): for text, errors, expected in tests: diff --git a/Lib/test/test_collections.py b/Lib/test/test_collections.py index ee28a6c..d352d2a 100644 --- a/Lib/test/test_collections.py +++ b/Lib/test/test_collections.py @@ -1187,6 +1187,11 @@ class TestOrderedDict(unittest.TestCase): self.assertEqual(list(od.items()), pairs) self.assertEqual(list(reversed(od)), [t[0] for t in reversed(pairs)]) + self.assertEqual(list(reversed(od.keys())), + [t[0] for t in reversed(pairs)]) + self.assertEqual(list(reversed(od.values())), + [t[1] for t in reversed(pairs)]) + self.assertEqual(list(reversed(od.items())), list(reversed(pairs))) def test_popitem(self): pairs = [('c', 1), ('b', 2), ('a', 3), ('d', 4), ('e', 5), ('f', 6)] diff --git a/Lib/test/test_descr.py b/Lib/test/test_descr.py index 8bb7d6a..e65edb2 100644 --- a/Lib/test/test_descr.py +++ b/Lib/test/test_descr.py @@ -4160,6 +4160,7 @@ order (MRO) for bases """ ('__add__', 'x + y', 'x += y'), ('__sub__', 'x - y', 'x -= y'), ('__mul__', 'x * y', 'x *= y'), + ('__matmul__', 'x @ y', 'x @= y'), ('__truediv__', 'operator.truediv(x, y)', None), ('__floordiv__', 'operator.floordiv(x, y)', None), ('__div__', 'x / y', 'x /= y'), diff --git a/Lib/test/test_doctest.py b/Lib/test/test_doctest.py index 56193e8..5eb8474 100644 --- a/Lib/test/test_doctest.py +++ b/Lib/test/test_doctest.py @@ -2096,22 +2096,9 @@ def test_DocTestSuite(): >>> suite.run(unittest.TestResult()) <unittest.result.TestResult run=0 errors=0 failures=0> - However, if DocTestSuite finds no docstrings, it raises an error: + The module need not contain any docstrings either: - >>> try: - ... doctest.DocTestSuite('test.sample_doctest_no_docstrings') - ... except ValueError as e: - ... error = e - - >>> print(error.args[1]) - has no docstrings - - You can prevent this error by passing a DocTestFinder instance with - the `exclude_empty` keyword argument set to False: - - >>> finder = doctest.DocTestFinder(exclude_empty=False) - >>> suite = doctest.DocTestSuite('test.sample_doctest_no_docstrings', - ... test_finder=finder) + >>> suite = doctest.DocTestSuite('test.sample_doctest_no_docstrings') >>> suite.run(unittest.TestResult()) <unittest.result.TestResult run=0 errors=0 failures=0> @@ -2121,6 +2108,22 @@ def test_DocTestSuite(): >>> suite.run(unittest.TestResult()) <unittest.result.TestResult run=9 errors=0 failures=4> + We can also provide a DocTestFinder: + + >>> finder = doctest.DocTestFinder() + >>> suite = doctest.DocTestSuite('test.sample_doctest', + ... test_finder=finder) + >>> suite.run(unittest.TestResult()) + <unittest.result.TestResult run=9 errors=0 failures=4> + + The DocTestFinder need not return any tests: + + >>> finder = doctest.DocTestFinder() + >>> suite = doctest.DocTestSuite('test.sample_doctest_no_docstrings', + ... test_finder=finder) + >>> suite.run(unittest.TestResult()) + <unittest.result.TestResult run=0 errors=0 failures=0> + We can supply global variables. If we pass globs, they will be used instead of the module globals. Here we'll pass an empty globals, triggering an extra error: @@ -2897,7 +2900,7 @@ Invalid doctest option: def test_main(): # Check the doctest cases in doctest itself: - support.run_doctest(doctest, verbosity=True) + ret = support.run_doctest(doctest, verbosity=True) # Check the doctest cases defined here: from test import test_doctest support.run_doctest(test_doctest, verbosity=True) diff --git a/Lib/test/test_docxmlrpc.py b/Lib/test/test_docxmlrpc.py index cb6366c..eb97516 100644 --- a/Lib/test/test_docxmlrpc.py +++ b/Lib/test/test_docxmlrpc.py @@ -87,10 +87,11 @@ class DocXMLRPCHTTPGETServer(unittest.TestCase): threading.Thread(target=server, args=(self.evt, 1)).start() # wait for port to be assigned - n = 1000 - while n > 0 and PORT is None: - time.sleep(0.001) - n -= 1 + deadline = time.monotonic() + 10.0 + while PORT is None: + time.sleep(0.010) + if time.monotonic() > deadline: + break self.client = http.client.HTTPConnection("localhost:%d" % PORT) diff --git a/Lib/test/test_fork1.py b/Lib/test/test_fork1.py index e0626df..8bcbd46 100644 --- a/Lib/test/test_fork1.py +++ b/Lib/test/test_fork1.py @@ -18,13 +18,14 @@ get_attribute(os, 'fork') class ForkTest(ForkWait): def wait_impl(self, cpid): - for i in range(10): + deadline = time.monotonic() + 10.0 + while time.monotonic() <= deadline: # waitpid() shouldn't hang, but some of the buildbots seem to hang # in the forking tests. This is an attempt to fix the problem. spid, status = os.waitpid(cpid, os.WNOHANG) if spid == cpid: break - time.sleep(1.0) + time.sleep(0.1) self.assertEqual(spid, cpid) self.assertEqual(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8)) diff --git a/Lib/test/test_format.py b/Lib/test/test_format.py index fc71e48..631bf35 100644 --- a/Lib/test/test_format.py +++ b/Lib/test/test_format.py @@ -142,8 +142,6 @@ class FormatTest(unittest.TestCase): testformat("%#+027.23X", big, "+0X0001234567890ABCDEF12345") # same, except no 0 flag testformat("%#+27.23X", big, " +0X001234567890ABCDEF12345") - with self.assertWarns(DeprecationWarning): - testformat("%x", float(big), "123456_______________", 6) big = 0o12345670123456701234567012345670 # 32 octal digits testformat("%o", big, "12345670123456701234567012345670") testformat("%o", -big, "-12345670123456701234567012345670") @@ -183,8 +181,6 @@ class FormatTest(unittest.TestCase): testformat("%034.33o", big, "0012345670123456701234567012345670") # base marker shouldn't change that testformat("%0#34.33o", big, "0o012345670123456701234567012345670") - with self.assertWarns(DeprecationWarning): - testformat("%o", float(big), "123456__________________________", 6) # Some small ints, in both Python int and flavors). testformat("%d", 42, "42") testformat("%d", -42, "-42") @@ -195,8 +191,6 @@ class FormatTest(unittest.TestCase): testformat("%#x", 1, "0x1") testformat("%#X", 1, "0X1") testformat("%#X", 1, "0X1") - with self.assertWarns(DeprecationWarning): - testformat("%#x", 1.0, "0x1") testformat("%#o", 1, "0o1") testformat("%#o", 1, "0o1") testformat("%#o", 0, "0o0") @@ -213,14 +207,10 @@ class FormatTest(unittest.TestCase): testformat("%x", -0x42, "-42") testformat("%x", 0x42, "42") testformat("%x", -0x42, "-42") - with self.assertWarns(DeprecationWarning): - testformat("%x", float(0x42), "42") testformat("%o", 0o42, "42") testformat("%o", -0o42, "-42") testformat("%o", 0o42, "42") testformat("%o", -0o42, "-42") - with self.assertWarns(DeprecationWarning): - testformat("%o", float(0o42), "42") testformat("%r", "\u0378", "'\\u0378'") # non printable testformat("%a", "\u0378", "'\\u0378'") # non printable testformat("%r", "\u0374", "'\u0374'") # printable diff --git a/Lib/test/test_fractions.py b/Lib/test/test_fractions.py index 3336532..e86d5ce 100644 --- a/Lib/test/test_fractions.py +++ b/Lib/test/test_fractions.py @@ -330,7 +330,6 @@ class FractionTest(unittest.TestCase): self.assertTypedEquals(F(-2, 10), round(F(-15, 100), 1)) self.assertTypedEquals(F(-2, 10), round(F(-25, 100), 1)) - def testArithmetic(self): self.assertEqual(F(1, 2), F(1, 10) + F(2, 5)) self.assertEqual(F(-3, 10), F(1, 10) - F(2, 5)) @@ -402,6 +401,8 @@ class FractionTest(unittest.TestCase): self.assertTypedEquals(2.0 , 4 ** F(1, 2)) self.assertTypedEquals(0.25, 2.0 ** F(-2, 1)) self.assertTypedEquals(1.0 + 0j, (1.0 + 0j) ** F(1, 10)) + self.assertRaises(ZeroDivisionError, operator.pow, + F(0, 1), -2) def testMixingWithDecimal(self): # Decimal refuses mixed arithmetic (but not mixed comparisons) diff --git a/Lib/test/test_grammar.py b/Lib/test/test_grammar.py index bba8820..a7bad2d 100644 --- a/Lib/test/test_grammar.py +++ b/Lib/test/test_grammar.py @@ -985,6 +985,20 @@ class GrammarTests(unittest.TestCase): self.assertFalse((False is 2) is 3) self.assertFalse(False is 2 is 3) + def test_matrix_mul(self): + # This is not intended to be a comprehensive test, rather just to be few + # samples of the @ operator in test_grammar.py. + class M: + def __matmul__(self, o): + return 4 + def __imatmul__(self, o): + self.other = o + return self + m = M() + self.assertEqual(m @ m, 4) + m @= 42 + self.assertEqual(m.other, 42) + def test_main(): run_unittest(TokenTests, GrammarTests) diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py index 22f7329..1a6d8d0 100644 --- a/Lib/test/test_httplib.py +++ b/Lib/test/test_httplib.py @@ -18,6 +18,26 @@ CERT_fakehostname = os.path.join(here, 'keycert2.pem') # Root cert file (CA) for svn.python.org's cert CACERT_svn_python_org = os.path.join(here, 'https_svn_python_org_root.pem') +# constants for testing chunked encoding +chunked_start = ( + 'HTTP/1.1 200 OK\r\n' + 'Transfer-Encoding: chunked\r\n\r\n' + 'a\r\n' + 'hello worl\r\n' + '3\r\n' + 'd! \r\n' + '8\r\n' + 'and now \r\n' + '22\r\n' + 'for something completely different\r\n' +) +chunked_expected = b'hello world! and now for something completely different' +chunk_extension = ";foo=bar" +last_chunk = "0\r\n" +last_chunk_extended = "0" + chunk_extension + "\r\n" +trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" +chunked_end = "\r\n" + HOST = support.HOST class FakeSocket: @@ -38,7 +58,10 @@ class FakeSocket: def makefile(self, mode, bufsize=None): if mode != 'r' and mode != 'rb': raise client.UnimplementedFileMode() - return self.fileclass(self.text) + # keep the file around so we can check how much was read from it + self.file = self.fileclass(self.text) + self.file.close = lambda:None #nerf close () + return self.file def close(self): pass @@ -435,20 +458,8 @@ class BasicTest(TestCase): conn.request('POST', 'test', conn) def test_chunked(self): - chunked_start = ( - 'HTTP/1.1 200 OK\r\n' - 'Transfer-Encoding: chunked\r\n\r\n' - 'a\r\n' - 'hello worl\r\n' - '3\r\n' - 'd! \r\n' - '8\r\n' - 'and now \r\n' - '22\r\n' - 'for something completely different\r\n' - ) - expected = b'hello world! and now for something completely different' - sock = FakeSocket(chunked_start + '0\r\n') + expected = chunked_expected + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="GET") resp.begin() self.assertEqual(resp.read(), expected) @@ -456,7 +467,7 @@ class BasicTest(TestCase): # Various read sizes for n in range(1, 12): - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="GET") resp.begin() self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected) @@ -479,23 +490,12 @@ class BasicTest(TestCase): resp.close() def test_readinto_chunked(self): - chunked_start = ( - 'HTTP/1.1 200 OK\r\n' - 'Transfer-Encoding: chunked\r\n\r\n' - 'a\r\n' - 'hello worl\r\n' - '3\r\n' - 'd! \r\n' - '8\r\n' - 'and now \r\n' - '22\r\n' - 'for something completely different\r\n' - ) - expected = b'hello world! and now for something completely different' + + expected = chunked_expected nexpected = len(expected) b = bytearray(128) - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="GET") resp.begin() n = resp.readinto(b) @@ -505,7 +505,7 @@ class BasicTest(TestCase): # Various read sizes for n in range(1, 12): - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="GET") resp.begin() m = memoryview(b) @@ -541,7 +541,7 @@ class BasicTest(TestCase): '1\r\n' 'd\r\n' ) - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="HEAD") resp.begin() self.assertEqual(resp.read(), b'') @@ -561,7 +561,7 @@ class BasicTest(TestCase): '1\r\n' 'd\r\n' ) - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="HEAD") resp.begin() b = bytearray(5) @@ -636,6 +636,7 @@ class BasicTest(TestCase): + '0' * 65536 + 'a\r\n' 'hello world\r\n' '0\r\n' + '\r\n' ) resp = client.HTTPResponse(FakeSocket(body)) resp.begin() @@ -675,6 +676,239 @@ class BasicTest(TestCase): conn.request('POST', '/', body) self.assertGreater(sock.sendall_calls, 1) + def test_chunked_extension(self): + extra = '3;foo=bar\r\n' + 'abc\r\n' + expected = chunked_expected + b'abc' + + sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + resp.close() + + def test_chunked_missing_end(self): + """some servers may serve up a short chunked encoding stream""" + expected = chunked_expected + sock = FakeSocket(chunked_start + last_chunk) #no terminating crlf + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + resp.close() + + def test_chunked_trailers(self): + """See that trailers are read and ignored""" + expected = chunked_expected + sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + # we should have reached the end of the file + self.assertEqual(sock.file.read(100), b"") #we read to the end + resp.close() + + def test_chunked_sync(self): + """Check that we don't read past the end of the chunked-encoding stream""" + expected = chunked_expected + extradata = "extradata" + sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + # the file should now have our extradata ready to be read + self.assertEqual(sock.file.read(100), extradata.encode("ascii")) #we read to the end + resp.close() + + def test_content_length_sync(self): + """Check that we don't read past the end of the Content-Length stream""" + extradata = "extradata" + expected = b"Hello123\r\n" + sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello123\r\n' + extradata) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + # the file should now have our extradata ready to be read + self.assertEqual(sock.file.read(100), extradata.encode("ascii")) #we read to the end + resp.close() + +class ExtendedReadTest(TestCase): + """ + Test peek(), read1(), readline() + """ + lines = ( + 'HTTP/1.1 200 OK\r\n' + '\r\n' + 'hello world!\n' + 'and now \n' + 'for something completely different\n' + 'foo' + ) + lines_expected = lines[lines.find('hello'):].encode("ascii") + lines_chunked = ( + 'HTTP/1.1 200 OK\r\n' + 'Transfer-Encoding: chunked\r\n\r\n' + 'a\r\n' + 'hello worl\r\n' + '3\r\n' + 'd!\n\r\n' + '9\r\n' + 'and now \n\r\n' + '23\r\n' + 'for something completely different\n\r\n' + '3\r\n' + 'foo\r\n' + '0\r\n' # terminating chunk + '\r\n' # end of trailers + ) + + def setUp(self): + sock = FakeSocket(self.lines) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + resp.fp = io.BufferedReader(resp.fp) + self.resp = resp + + + + def test_peek(self): + resp = self.resp + # patch up the buffered peek so that it returns not too much stuff + oldpeek = resp.fp.peek + def mypeek(n=-1): + p = oldpeek(n) + if n >= 0: + return p[:n] + return p[:10] + resp.fp.peek = mypeek + + all = [] + while True: + # try a short peek + p = resp.peek(3) + if p: + self.assertGreater(len(p), 0) + # then unbounded peek + p2 = resp.peek() + self.assertGreaterEqual(len(p2), len(p)) + self.assertTrue(p2.startswith(p)) + next = resp.read(len(p2)) + self.assertEqual(next, p2) + else: + next = resp.read() + self.assertFalse(next) + all.append(next) + if not next: + break + self.assertEqual(b"".join(all), self.lines_expected) + + def test_readline(self): + resp = self.resp + self._verify_readline(self.resp.readline, self.lines_expected) + + def _verify_readline(self, readline, expected): + all = [] + while True: + # short readlines + line = readline(5) + if line and line != b"foo": + if len(line) < 5: + self.assertTrue(line.endswith(b"\n")) + all.append(line) + if not line: + break + self.assertEqual(b"".join(all), expected) + + def test_read1(self): + resp = self.resp + def r(): + res = resp.read1(4) + self.assertLessEqual(len(res), 4) + return res + readliner = Readliner(r) + self._verify_readline(readliner.readline, self.lines_expected) + + def test_read1_unbounded(self): + resp = self.resp + all = [] + while True: + data = resp.read1() + if not data: + break + all.append(data) + self.assertEqual(b"".join(all), self.lines_expected) + + def test_read1_bounded(self): + resp = self.resp + all = [] + while True: + data = resp.read1(10) + if not data: + break + self.assertLessEqual(len(data), 10) + all.append(data) + self.assertEqual(b"".join(all), self.lines_expected) + + def test_read1_0(self): + self.assertEqual(self.resp.read1(0), b"") + + def test_peek_0(self): + p = self.resp.peek(0) + self.assertLessEqual(0, len(p)) + +class ExtendedReadTestChunked(ExtendedReadTest): + """ + Test peek(), read1(), readline() in chunked mode + """ + lines = ( + 'HTTP/1.1 200 OK\r\n' + 'Transfer-Encoding: chunked\r\n\r\n' + 'a\r\n' + 'hello worl\r\n' + '3\r\n' + 'd!\n\r\n' + '9\r\n' + 'and now \n\r\n' + '23\r\n' + 'for something completely different\n\r\n' + '3\r\n' + 'foo\r\n' + '0\r\n' # terminating chunk + '\r\n' # end of trailers + ) + + +class Readliner: + """ + a simple readline class that uses an arbitrary read function and buffering + """ + def __init__(self, readfunc): + self.readfunc = readfunc + self.remainder = b"" + + def readline(self, limit): + data = [] + datalen = 0 + read = self.remainder + try: + while True: + idx = read.find(b'\n') + if idx != -1: + break + if datalen + len(read) >= limit: + idx = limit - datalen - 1 + # read more data + data.append(read) + read = self.readfunc() + if not read: + idx = 0 #eof condition + break + idx += 1 + data.append(read[:idx]) + self.remainder = read[idx:] + return b"".join(data) + except: + self.remainder = b"".join(data) + raise + class OfflineTest(TestCase): def test_responses(self): self.assertEqual(client.responses[client.NOT_FOUND], "Not Found") @@ -1019,7 +1253,8 @@ class TunnelTests(TestCase): def test_main(verbose=None): support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest, HTTPSTest, RequestBodyTest, SourceAddressTest, - HTTPResponseTest, TunnelTests) + HTTPResponseTest, ExtendedReadTest, + ExtendedReadTestChunked, TunnelTests) if __name__ == '__main__': test_main() diff --git a/Lib/test/test_importlib/test_lazy.py b/Lib/test/test_importlib/test_lazy.py new file mode 100644 index 0000000..2e191bb --- /dev/null +++ b/Lib/test/test_importlib/test_lazy.py @@ -0,0 +1,132 @@ +import importlib +from importlib import abc +from importlib import util +import unittest + +from . import util as test_util + + +class CollectInit: + + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + def exec_module(self, module): + return self + + +class LazyLoaderFactoryTests(unittest.TestCase): + + def test_init(self): + factory = util.LazyLoader.factory(CollectInit) + # E.g. what importlib.machinery.FileFinder instantiates loaders with + # plus keyword arguments. + lazy_loader = factory('module name', 'module path', kw='kw') + loader = lazy_loader.loader + self.assertEqual(('module name', 'module path'), loader.args) + self.assertEqual({'kw': 'kw'}, loader.kwargs) + + def test_validation(self): + # No exec_module(), no lazy loading. + with self.assertRaises(TypeError): + util.LazyLoader.factory(object) + + +class TestingImporter(abc.MetaPathFinder, abc.Loader): + + module_name = 'lazy_loader_test' + mutated_name = 'changed' + loaded = None + source_code = 'attr = 42; __name__ = {!r}'.format(mutated_name) + + def find_spec(self, name, path, target=None): + if name != self.module_name: + return None + return util.spec_from_loader(name, util.LazyLoader(self)) + + def exec_module(self, module): + exec(self.source_code, module.__dict__) + self.loaded = module + + +class LazyLoaderTests(unittest.TestCase): + + def test_init(self): + with self.assertRaises(TypeError): + util.LazyLoader(object) + + def new_module(self, source_code=None): + loader = TestingImporter() + if source_code is not None: + loader.source_code = source_code + spec = util.spec_from_loader(TestingImporter.module_name, + util.LazyLoader(loader)) + module = spec.loader.create_module(spec) + module.__spec__ = spec + module.__loader__ = spec.loader + spec.loader.exec_module(module) + # Module is now lazy. + self.assertIsNone(loader.loaded) + return module + + def test_e2e(self): + # End-to-end test to verify the load is in fact lazy. + importer = TestingImporter() + assert importer.loaded is None + with test_util.uncache(importer.module_name): + with test_util.import_state(meta_path=[importer]): + module = importlib.import_module(importer.module_name) + self.assertIsNone(importer.loaded) + # Trigger load. + self.assertEqual(module.__loader__, importer) + self.assertIsNotNone(importer.loaded) + self.assertEqual(module, importer.loaded) + + def test_attr_unchanged(self): + # An attribute only mutated as a side-effect of import should not be + # changed needlessly. + module = self.new_module() + self.assertEqual(TestingImporter.mutated_name, module.__name__) + + def test_new_attr(self): + # A new attribute should persist. + module = self.new_module() + module.new_attr = 42 + self.assertEqual(42, module.new_attr) + + def test_mutated_preexisting_attr(self): + # Changing an attribute that already existed on the module -- + # e.g. __name__ -- should persist. + module = self.new_module() + module.__name__ = 'bogus' + self.assertEqual('bogus', module.__name__) + + def test_mutated_attr(self): + # Changing an attribute that comes into existence after an import + # should persist. + module = self.new_module() + module.attr = 6 + self.assertEqual(6, module.attr) + + def test_delete_eventual_attr(self): + # Deleting an attribute should stay deleted. + module = self.new_module() + del module.attr + self.assertFalse(hasattr(module, 'attr')) + + def test_delete_preexisting_attr(self): + module = self.new_module() + del module.__name__ + self.assertFalse(hasattr(module, '__name__')) + + def test_module_substitution_error(self): + source_code = 'import sys; sys.modules[__name__] = 42' + module = self.new_module(source_code) + with test_util.uncache(TestingImporter.module_name): + with self.assertRaises(ValueError): + module.__name__ + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_inspect.py b/Lib/test/test_inspect.py index 1ede3b5..7ad190b 100644 --- a/Lib/test/test_inspect.py +++ b/Lib/test/test_inspect.py @@ -8,6 +8,7 @@ import linecache import os from os.path import normcase import _pickle +import pickle import re import shutil import sys @@ -73,6 +74,7 @@ def generator_function_example(self): for i in range(2): yield i + class TestPredicates(IsTestBase): def test_sixteen(self): count = len([x for x in dir(inspect) if x.startswith('is')]) @@ -1611,6 +1613,17 @@ class TestGetGeneratorState(unittest.TestCase): self.assertRaises(TypeError, inspect.getgeneratorlocals, (2,3)) +class MySignature(inspect.Signature): + # Top-level to make it picklable; + # used in test_signature_object_pickle + pass + +class MyParameter(inspect.Parameter): + # Top-level to make it picklable; + # used in test_signature_object_pickle + pass + + class TestSignatureObject(unittest.TestCase): @staticmethod def signature(func): @@ -1668,6 +1681,37 @@ class TestSignatureObject(unittest.TestCase): with self.assertRaisesRegex(ValueError, 'follows default argument'): S((pkd, pk)) + self.assertTrue(repr(sig).startswith('<Signature')) + self.assertTrue('"(po, pk' in repr(sig)) + + def test_signature_object_pickle(self): + def foo(a, b, *, c:1={}, **kw) -> {42:'ham'}: pass + foo_partial = functools.partial(foo, a=1) + + sig = inspect.signature(foo_partial) + + for ver in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(pickle_ver=ver, subclass=False): + sig_pickled = pickle.loads(pickle.dumps(sig, ver)) + self.assertEqual(sig, sig_pickled) + + # Test that basic sub-classing works + sig = inspect.signature(foo) + myparam = MyParameter(name='z', kind=inspect.Parameter.POSITIONAL_ONLY) + myparams = collections.OrderedDict(sig.parameters, a=myparam) + mysig = MySignature().replace(parameters=myparams.values(), + return_annotation=sig.return_annotation) + self.assertTrue(isinstance(mysig, MySignature)) + self.assertTrue(isinstance(mysig.parameters['z'], MyParameter)) + + for ver in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(pickle_ver=ver, subclass=True): + sig_pickled = pickle.loads(pickle.dumps(mysig, ver)) + self.assertEqual(mysig, sig_pickled) + self.assertTrue(isinstance(sig_pickled, MySignature)) + self.assertTrue(isinstance(sig_pickled.parameters['z'], + MyParameter)) + def test_signature_immutability(self): def test(a): pass @@ -2469,11 +2513,29 @@ class TestSignatureObject(unittest.TestCase): def bar(pos, *args, c, b, a=42, **kwargs:int): pass self.assertEqual(inspect.signature(foo), inspect.signature(bar)) - def test_signature_unhashable(self): + def test_signature_hashable(self): + S = inspect.Signature + P = inspect.Parameter + def foo(a): pass - sig = inspect.signature(foo) + foo_sig = inspect.signature(foo) + + manual_sig = S(parameters=[P('a', P.POSITIONAL_OR_KEYWORD)]) + + self.assertEqual(hash(foo_sig), hash(manual_sig)) + self.assertNotEqual(hash(foo_sig), + hash(manual_sig.replace(return_annotation='spam'))) + + def bar(a) -> 1: pass + self.assertNotEqual(hash(foo_sig), hash(inspect.signature(bar))) + + def foo(a={}): pass + with self.assertRaisesRegex(TypeError, 'unhashable type'): + hash(inspect.signature(foo)) + + def foo(a) -> {}: pass with self.assertRaisesRegex(TypeError, 'unhashable type'): - hash(sig) + hash(inspect.signature(foo)) def test_signature_str(self): def foo(a:int=1, *, b, c=None, **kwargs) -> 42: @@ -2547,6 +2609,19 @@ class TestSignatureObject(unittest.TestCase): self.assertEqual(self.signature(Spam.foo), self.signature(Ham.foo)) + def test_signature_from_callable_python_obj(self): + class MySignature(inspect.Signature): pass + def foo(a, *, b:1): pass + foo_sig = MySignature.from_callable(foo) + self.assertTrue(isinstance(foo_sig, MySignature)) + + @unittest.skipIf(MISSING_C_DOCSTRINGS, + "Signature information for builtins requires docstrings") + def test_signature_from_callable_builtin_obj(self): + class MySignature(inspect.Signature): pass + sig = MySignature.from_callable(_pickle.Pickler) + self.assertTrue(isinstance(sig, MySignature)) + class TestParameterObject(unittest.TestCase): def test_signature_parameter_kinds(self): @@ -2592,6 +2667,16 @@ class TestParameterObject(unittest.TestCase): p.replace(kind=inspect.Parameter.VAR_POSITIONAL) self.assertTrue(repr(p).startswith('<Parameter')) + self.assertTrue('"a=42"' in repr(p)) + + def test_signature_parameter_hashable(self): + P = inspect.Parameter + foo = P('foo', kind=P.POSITIONAL_ONLY) + self.assertEqual(hash(foo), hash(P('foo', kind=P.POSITIONAL_ONLY))) + self.assertNotEqual(hash(foo), hash(P('foo', kind=P.POSITIONAL_ONLY, + default=42))) + self.assertNotEqual(hash(foo), + hash(foo.replace(kind=P.VAR_POSITIONAL))) def test_signature_parameter_equality(self): P = inspect.Parameter @@ -2603,13 +2688,6 @@ class TestParameterObject(unittest.TestCase): self.assertEqual(p, P('foo', default=42, kind=inspect.Parameter.KEYWORD_ONLY)) - def test_signature_parameter_unhashable(self): - p = inspect.Parameter('foo', default=42, - kind=inspect.Parameter.KEYWORD_ONLY) - - with self.assertRaisesRegex(TypeError, 'unhashable type'): - hash(p) - def test_signature_parameter_replace(self): p = inspect.Parameter('foo', default=42, kind=inspect.Parameter.KEYWORD_ONLY) @@ -2918,6 +2996,16 @@ class TestBoundArguments(unittest.TestCase): ba4 = inspect.signature(bar).bind(1) self.assertNotEqual(ba, ba4) + def test_signature_bound_arguments_pickle(self): + def foo(a, b, *, c:1={}, **kw) -> {42:'ham'}: pass + sig = inspect.signature(foo) + ba = sig.bind(20, 30, z={}) + + for ver in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(pickle_ver=ver): + ba_pickled = pickle.loads(pickle.dumps(ba, ver)) + self.assertEqual(ba, ba_pickled) + class TestSignaturePrivateHelpers(unittest.TestCase): def test_signature_get_bound_param(self): diff --git a/Lib/test/test_ipaddress.py b/Lib/test/test_ipaddress.py index f2947b9..33adb3b 100644 --- a/Lib/test/test_ipaddress.py +++ b/Lib/test/test_ipaddress.py @@ -1593,6 +1593,14 @@ class IpaddrUnitTest(unittest.TestCase): addr3.exploded) self.assertEqual('192.168.178.1', addr4.exploded) + def testReversePointer(self): + addr1 = ipaddress.IPv4Address('127.0.0.1') + addr2 = ipaddress.IPv6Address('2001:db8::1') + self.assertEqual('1.0.0.127.in-addr.arpa', addr1.reverse_pointer) + self.assertEqual('1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.' + + 'b.d.0.1.0.0.2.ip6.arpa', + addr2.reverse_pointer) + def testIntRepresentation(self): self.assertEqual(16909060, int(self.ipv4_address)) self.assertEqual(42540616829182469433547762482097946625, diff --git a/Lib/test/test_json/test_tool.py b/Lib/test/test_json/test_tool.py index 0c39e56..5484a8a 100644 --- a/Lib/test/test_json/test_tool.py +++ b/Lib/test/test_json/test_tool.py @@ -55,6 +55,7 @@ class TestTool(unittest.TestCase): def test_infile_stdout(self): infile = self._create_infile() rc, out, err = assert_python_ok('-m', 'json.tool', infile) + self.assertEqual(rc, 0) self.assertEqual(out.splitlines(), self.expect.encode().splitlines()) self.assertEqual(err, b'') @@ -65,5 +66,12 @@ class TestTool(unittest.TestCase): self.addCleanup(os.remove, outfile) with open(outfile, "r") as fp: self.assertEqual(fp.read(), self.expect) + self.assertEqual(rc, 0) self.assertEqual(out, b'') self.assertEqual(err, b'') + + def test_help_flag(self): + rc, out, err = assert_python_ok('-m', 'json.tool', '-h') + self.assertEqual(rc, 0) + self.assertTrue(out.startswith(b'usage: ')) + self.assertEqual(err, b'') diff --git a/Lib/test/test_math.py b/Lib/test/test_math.py index 48f84ba..c9f3f16 100644 --- a/Lib/test/test_math.py +++ b/Lib/test/test_math.py @@ -422,9 +422,17 @@ class MathTests(unittest.TestCase): self.assertEqual(math.factorial(i), py_factorial(i)) self.assertRaises(ValueError, math.factorial, -1) self.assertRaises(ValueError, math.factorial, -1.0) + self.assertRaises(ValueError, math.factorial, -10**100) + self.assertRaises(ValueError, math.factorial, -1e100) self.assertRaises(ValueError, math.factorial, math.pi) - self.assertRaises(OverflowError, math.factorial, sys.maxsize+1) - self.assertRaises(OverflowError, math.factorial, 10e100) + + # Other implementations may place different upper bounds. + @support.cpython_only + def testFactorialHugeInputs(self): + # Currently raises ValueError for inputs that are too large + # to fit into a C long. + self.assertRaises(OverflowError, math.factorial, 10**100) + self.assertRaises(OverflowError, math.factorial, 1e100) def testFloor(self): self.assertRaises(TypeError, math.floor) diff --git a/Lib/test/test_operator.py b/Lib/test/test_operator.py index ab58a98..1bd0391 100644 --- a/Lib/test/test_operator.py +++ b/Lib/test/test_operator.py @@ -203,6 +203,15 @@ class OperatorTestCase: self.assertRaises(TypeError, operator.mul, None, None) self.assertTrue(operator.mul(5, 2) == 10) + def test_matmul(self): + operator = self.module + self.assertRaises(TypeError, operator.matmul) + self.assertRaises(TypeError, operator.matmul, 42, 42) + class M: + def __matmul__(self, other): + return other - 1 + self.assertEqual(M() @ 42, 41) + def test_neg(self): operator = self.module self.assertRaises(TypeError, operator.neg) @@ -416,6 +425,7 @@ class OperatorTestCase: def __ilshift__ (self, other): return "ilshift" def __imod__ (self, other): return "imod" def __imul__ (self, other): return "imul" + def __imatmul__ (self, other): return "imatmul" def __ior__ (self, other): return "ior" def __ipow__ (self, other): return "ipow" def __irshift__ (self, other): return "irshift" @@ -430,6 +440,7 @@ class OperatorTestCase: self.assertEqual(operator.ilshift (c, 5), "ilshift") self.assertEqual(operator.imod (c, 5), "imod") self.assertEqual(operator.imul (c, 5), "imul") + self.assertEqual(operator.imatmul (c, 5), "imatmul") self.assertEqual(operator.ior (c, 5), "ior") self.assertEqual(operator.ipow (c, 5), "ipow") self.assertEqual(operator.irshift (c, 5), "irshift") diff --git a/Lib/test/test_selectors.py b/Lib/test/test_selectors.py index 34edd76..8f83c90 100644 --- a/Lib/test/test_selectors.py +++ b/Lib/test/test_selectors.py @@ -441,10 +441,18 @@ class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn): SELECTOR = getattr(selectors, 'KqueueSelector', None) +@unittest.skipUnless(hasattr(selectors, 'DevpollSelector'), + "Test needs selectors.DevpollSelector") +class DevpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn): + + SELECTOR = getattr(selectors, 'DevpollSelector', None) + + + def test_main(): tests = [DefaultSelectorTestCase, SelectSelectorTestCase, PollSelectorTestCase, EpollSelectorTestCase, - KqueueSelectorTestCase] + KqueueSelectorTestCase, DevpollSelectorTestCase] support.run_unittest(*tests) support.reap_children() diff --git a/Lib/test/test_set.py b/Lib/test/test_set.py index bfef621..992a4ce 100644 --- a/Lib/test/test_set.py +++ b/Lib/test/test_set.py @@ -929,7 +929,7 @@ class TestBasicOpsString(TestBasicOps, unittest.TestCase): class TestBasicOpsBytes(TestBasicOps, unittest.TestCase): def setUp(self): - self.case = "string set" + self.case = "bytes set" self.values = [b"a", b"b", b"c"] self.set = set(self.values) self.dup = set(self.values) diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index a6f2c64..50cae07 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -1,6 +1,7 @@ import unittest from test import support from contextlib import closing +import enum import gc import pickle import select @@ -39,6 +40,23 @@ def ignoring_eintr(__func, *args, **kwargs): return None +class GenericTests(unittest.TestCase): + + @unittest.skipIf(threading is None, "test needs threading module") + def test_enums(self): + for name in dir(signal): + sig = getattr(signal, name) + if name in {'SIG_DFL', 'SIG_IGN'}: + self.assertIsInstance(sig, signal.Handlers) + elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}: + self.assertIsInstance(sig, signal.Sigmasks) + elif name.startswith('SIG') and not name.startswith('SIG_'): + self.assertIsInstance(sig, signal.Signals) + elif name.startswith('CTRL_'): + self.assertIsInstance(sig, signal.Signals) + self.assertEqual(sys.platform, "win32") + + @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") class InterProcessSignalTests(unittest.TestCase): MAX_DURATION = 20 # Entire test should last at most 20 sec. @@ -195,6 +213,7 @@ class PosixTests(unittest.TestCase): def test_getsignal(self): hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) + self.assertIsInstance(hup, signal.Handlers) self.assertEqual(signal.getsignal(signal.SIGHUP), self.trivial_signal_handler) signal.signal(signal.SIGHUP, hup) @@ -271,7 +290,7 @@ class WakeupSignalTests(unittest.TestCase): os.close(read) os.close(write) - """.format(signals, ordered, test_body) + """.format(tuple(map(int, signals)), ordered, test_body) assert_python_ok('-c', code) @@ -604,6 +623,8 @@ class PendingSignalsTests(unittest.TestCase): signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) os.kill(os.getpid(), signum) pending = signal.sigpending() + for sig in pending: + assert isinstance(sig, signal.Signals), repr(pending) if pending != {signum}: raise Exception('%s != {%s}' % (pending, signum)) try: @@ -660,6 +681,7 @@ class PendingSignalsTests(unittest.TestCase): code = '''if 1: import signal import sys + from signal import Signals def handler(signum, frame): 1/0 @@ -702,6 +724,7 @@ class PendingSignalsTests(unittest.TestCase): def test(signum): signal.alarm(1) received = signal.sigwait([signum]) + assert isinstance(received, signal.Signals), received if received != signum: raise Exception('received %s, not %s' % (received, signum)) ''') @@ -842,8 +865,14 @@ class PendingSignalsTests(unittest.TestCase): def kill(signum): os.kill(os.getpid(), signum) + def check_mask(mask): + for sig in mask: + assert isinstance(sig, signal.Signals), repr(sig) + def read_sigmask(): - return signal.pthread_sigmask(signal.SIG_BLOCK, []) + sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) + check_mask(sigmask) + return sigmask signum = signal.SIGUSR1 @@ -852,6 +881,7 @@ class PendingSignalsTests(unittest.TestCase): # Unblock SIGUSR1 (and copy the old mask) to test our signal handler old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + check_mask(old_mask) try: kill(signum) except ZeroDivisionError: @@ -861,11 +891,13 @@ class PendingSignalsTests(unittest.TestCase): # Block and then raise SIGUSR1. The signal is blocked: the signal # handler is not called, and the signal is now pending - signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + check_mask(mask) kill(signum) # Check the new mask blocked = read_sigmask() + check_mask(blocked) if signum not in blocked: raise Exception("%s not in %s" % (signum, blocked)) if old_mask ^ blocked != {signum}: @@ -928,7 +960,7 @@ class PendingSignalsTests(unittest.TestCase): def test_main(): try: - support.run_unittest(PosixTests, InterProcessSignalTests, + support.run_unittest(GenericTests, PosixTests, InterProcessSignalTests, WakeupFDTests, WakeupSignalTests, SiginterruptTest, ItimerTest, WindowsSignalTests, PendingSignalsTests) diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py index 0617b30..8e0fde4 100644 --- a/Lib/test/test_socketserver.py +++ b/Lib/test/test_socketserver.py @@ -222,38 +222,6 @@ class SocketServerTest(unittest.TestCase): socketserver.DatagramRequestHandler, self.dgram_examine) - @contextlib.contextmanager - def mocked_select_module(self): - """Mocks the select.select() call to raise EINTR for first call""" - old_select = select.select - - class MockSelect: - def __init__(self): - self.called = 0 - - def __call__(self, *args): - self.called += 1 - if self.called == 1: - # raise the exception on first call - raise OSError(errno.EINTR, os.strerror(errno.EINTR)) - else: - # Return real select value for consecutive calls - return old_select(*args) - - select.select = MockSelect() - try: - yield select.select - finally: - select.select = old_select - - def test_InterruptServerSelectCall(self): - with self.mocked_select_module() as mock_select: - pid = self.run_server(socketserver.TCPServer, - socketserver.StreamRequestHandler, - self.stream_examine) - # Make sure select was called again: - self.assertGreater(mock_select.called, 1) - # Alas, on Linux (at least) recvfrom() doesn't return a meaningful # client address so this cannot work: diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 2b3de1f..a2ab795 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -134,6 +134,14 @@ class BasicSocketTests(unittest.TestCase): self.assertIn(ssl.HAS_SNI, {True, False}) self.assertIn(ssl.HAS_ECDH, {True, False}) + def test_str_for_enums(self): + # Make sure that the PROTOCOL_* constants have enum-like string + # reprs. + proto = ssl.PROTOCOL_SSLv3 + self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_SSLv3') + ctx = ssl.SSLContext(proto) + self.assertIs(ctx.protocol, proto) + def test_random(self): v = ssl.RAND_status() if support.verbose: @@ -1371,14 +1379,12 @@ class NetworkedTests(unittest.TestCase): def test_get_server_certificate(self): def _test_get_server_certificate(host, port, cert=None): with support.transient_internet(host): - pem = ssl.get_server_certificate((host, port), - ssl.PROTOCOL_SSLv23) + pem = ssl.get_server_certificate((host, port)) if not pem: self.fail("No server certificate on %s:%s!" % (host, port)) try: pem = ssl.get_server_certificate((host, port), - ssl.PROTOCOL_SSLv23, ca_certs=CERTFILE) except ssl.SSLError as x: #should fail @@ -1388,7 +1394,6 @@ class NetworkedTests(unittest.TestCase): self.fail("Got server certificate %s for %s:%s!" % (pem, host, port)) pem = ssl.get_server_certificate((host, port), - ssl.PROTOCOL_SSLv23, ca_certs=cert) if not pem: self.fail("No server certificate on %s:%s!" % (host, port)) diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index 5a9699f..a809fd7 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -615,6 +615,53 @@ class SysModuleTest(unittest.TestCase): expected = None self.check_fsencoding(fs_encoding, expected) + def c_locale_get_error_handler(self, isolated=False, encoding=None): + # Force the POSIX locale + env = os.environ.copy() + env["LC_ALL"] = "C" + code = '\n'.join(( + 'import sys', + 'def dump(name):', + ' std = getattr(sys, name)', + ' print("%s: %s" % (name, std.errors))', + 'dump("stdin")', + 'dump("stdout")', + 'dump("stderr")', + )) + args = [sys.executable, "-c", code] + if isolated: + args.append("-I") + elif encoding: + env['PYTHONIOENCODING'] = encoding + p = subprocess.Popen(args, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env, + universal_newlines=True) + stdout, stderr = p.communicate() + return stdout + + def test_c_locale_surrogateescape(self): + out = self.c_locale_get_error_handler(isolated=True) + self.assertEqual(out, + 'stdin: surrogateescape\n' + 'stdout: surrogateescape\n' + 'stderr: backslashreplace\n') + + # replace the default error handler + out = self.c_locale_get_error_handler(encoding=':strict') + self.assertEqual(out, + 'stdin: strict\n' + 'stdout: strict\n' + 'stderr: backslashreplace\n') + + # force the encoding + out = self.c_locale_get_error_handler(encoding='iso8859-1') + self.assertEqual(out, + 'stdin: surrogateescape\n' + 'stdout: surrogateescape\n' + 'stderr: backslashreplace\n') + def test_implementation(self): # This test applies to all implementations equally. @@ -905,7 +952,7 @@ class SizeofTest(unittest.TestCase): check(int, s) # (PyTypeObject + PyNumberMethods + PyMappingMethods + # PySequenceMethods + PyBufferProcs + 4P) - s = vsize('P2n15Pl4Pn9Pn11PIP') + struct.calcsize('34P 3P 10P 2P 4P') + s = vsize('P2n17Pl4Pn9Pn11PIP') + struct.calcsize('34P 3P 10P 2P 4P') # Separate block for PyDictKeysObject with 4 entries s += struct.calcsize("2nPn") + 4*struct.calcsize("n2P") # class diff --git a/Lib/test/test_tokenize.py b/Lib/test/test_tokenize.py index 38611a7..8f74a06 100644 --- a/Lib/test/test_tokenize.py +++ b/Lib/test/test_tokenize.py @@ -464,7 +464,7 @@ Additive Multiplicative - >>> dump_tokens("x = 1//1*1/5*12%0x12") + >>> dump_tokens("x = 1//1*1/5*12%0x12@42") ENCODING 'utf-8' (0, 0) (0, 0) NAME 'x' (1, 0) (1, 1) OP '=' (1, 2) (1, 3) @@ -479,6 +479,8 @@ Multiplicative NUMBER '12' (1, 13) (1, 15) OP '%' (1, 15) (1, 16) NUMBER '0x12' (1, 16) (1, 20) + OP '@' (1, 20) (1, 21) + NUMBER '42' (1, 21) (1, 23) Unary @@ -1154,6 +1156,7 @@ class TestTokenize(TestCase): self.assertExactTypeEqual('//', token.DOUBLESLASH) self.assertExactTypeEqual('//=', token.DOUBLESLASHEQUAL) self.assertExactTypeEqual('@', token.AT) + self.assertExactTypeEqual('@=', token.ATEQUAL) self.assertExactTypeEqual('a**2+b**2==c**2', NAME, token.DOUBLESTAR, NUMBER, diff --git a/Lib/test/test_tuple.py b/Lib/test/test_tuple.py index e41711c..14c6430 100644 --- a/Lib/test/test_tuple.py +++ b/Lib/test/test_tuple.py @@ -201,6 +201,14 @@ class TupleTest(seq_tests.CommonTest): with self.assertRaises(TypeError): [3,] + T((1,2)) + def test_lexicographic_ordering(self): + # Issue 21100 + a = self.type2test([1, 2]) + b = self.type2test([1, 2, 0]) + c = self.type2test([1, 3]) + self.assertLess(a, b) + self.assertLess(b, c) + def test_main(): support.run_unittest(TupleTest) diff --git a/Lib/test/test_types.py b/Lib/test/test_types.py index ec10752..11d9546 100644 --- a/Lib/test/test_types.py +++ b/Lib/test/test_types.py @@ -343,6 +343,8 @@ class TypesTests(unittest.TestCase): self.assertRaises(ValueError, 3 .__format__, ",n") # can't have ',' with 'c' self.assertRaises(ValueError, 3 .__format__, ",c") + # can't have '#' with 'c' + self.assertRaises(ValueError, 3 .__format__, "#c") # ensure that only int and float type specifiers work for format_spec in ([chr(x) for x in range(ord('a'), ord('z')+1)] + diff --git a/Lib/test/test_unicode.py b/Lib/test/test_unicode.py index 9ae31d1..64e6bf5 100644 --- a/Lib/test/test_unicode.py +++ b/Lib/test/test_unicode.py @@ -8,6 +8,7 @@ Written by Marc-Andre Lemburg (mal@lemburg.com). import _string import codecs import itertools +import operator import struct import sys import unittest @@ -250,6 +251,7 @@ class UnicodeTest(string_tests.CommonTest, {ord('a'): None, ord('b'): ''}) self.checkequalnofix('xyyx', 'xzx', 'translate', {ord('z'): 'yy'}) + # this needs maketrans() self.checkequalnofix('abababc', 'abababc', 'translate', {'b': '<i>'}) @@ -259,6 +261,33 @@ class UnicodeTest(string_tests.CommonTest, tbl = self.type2test.maketrans('abc', 'xyz', 'd') self.checkequalnofix('xyzzy', 'abdcdcbdddd', 'translate', tbl) + # various tests switching from ASCII to latin1 or the opposite; + # same length, remove a letter, or replace with a longer string. + self.assertEqual("[a]".translate(str.maketrans('a', 'X')), + "[X]") + self.assertEqual("[a]".translate(str.maketrans({'a': 'X'})), + "[X]") + self.assertEqual("[a]".translate(str.maketrans({'a': None})), + "[]") + self.assertEqual("[a]".translate(str.maketrans({'a': 'XXX'})), + "[XXX]") + self.assertEqual("[a]".translate(str.maketrans({'a': '\xe9'})), + "[\xe9]") + self.assertEqual("[a]".translate(str.maketrans({'a': '<\xe9>'})), + "[<\xe9>]") + self.assertEqual("[\xe9]".translate(str.maketrans({'\xe9': 'a'})), + "[a]") + self.assertEqual("[\xe9]".translate(str.maketrans({'\xe9': None})), + "[]") + + # invalid Unicode characters + invalid_char = 0x10ffff+1 + for before in "a\xe9\u20ac\U0010ffff": + mapping = str.maketrans({before: invalid_char}) + text = "[%s]" % before + self.assertRaises(ValueError, text.translate, mapping) + + # errors self.assertRaises(TypeError, self.type2test.maketrans) self.assertRaises(ValueError, self.type2test.maketrans, 'abc', 'defg') self.assertRaises(TypeError, self.type2test.maketrans, 2, 'def') @@ -1148,20 +1177,20 @@ class UnicodeTest(string_tests.CommonTest, self.assertEqual('%.2s' % "a\xe9\u20ac", 'a\xe9') #issue 19995 - class PsuedoInt: + class PseudoInt: def __init__(self, value): self.value = int(value) def __int__(self): return self.value def __index__(self): return self.value - class PsuedoFloat: + class PseudoFloat: def __init__(self, value): self.value = float(value) def __int__(self): return int(self.value) - pi = PsuedoFloat(3.1415) - letter_m = PsuedoInt(109) + pi = PseudoFloat(3.1415) + letter_m = PseudoInt(109) self.assertEqual('%x' % 42, '2a') self.assertEqual('%X' % 15, 'F') self.assertEqual('%o' % 9, '11') @@ -1170,11 +1199,11 @@ class UnicodeTest(string_tests.CommonTest, self.assertEqual('%X' % letter_m, '6D') self.assertEqual('%o' % letter_m, '155') self.assertEqual('%c' % letter_m, 'm') - self.assertWarns(DeprecationWarning, '%x'.__mod__, pi), - self.assertWarns(DeprecationWarning, '%x'.__mod__, 3.14), - self.assertWarns(DeprecationWarning, '%X'.__mod__, 2.11), - self.assertWarns(DeprecationWarning, '%o'.__mod__, 1.79), - self.assertWarns(DeprecationWarning, '%c'.__mod__, pi), + self.assertRaisesRegex(TypeError, '%x format: an integer is required, not float', operator.mod, '%x', 3.14), + self.assertRaisesRegex(TypeError, '%X format: an integer is required, not float', operator.mod, '%X', 2.11), + self.assertRaisesRegex(TypeError, '%o format: an integer is required, not float', operator.mod, '%o', 1.79), + self.assertRaisesRegex(TypeError, '%x format: an integer is required, not PseudoFloat', operator.mod, '%x', pi), + self.assertRaises(TypeError, operator.mod, '%c', pi), def test_formatting_with_enum(self): # issue18780 diff --git a/Lib/test/test_wait3.py b/Lib/test/test_wait3.py index f6a065d..bb71481 100644 --- a/Lib/test/test_wait3.py +++ b/Lib/test/test_wait3.py @@ -18,7 +18,8 @@ class Wait3Test(ForkWait): # This many iterations can be required, since some previously run # tests (e.g. test_ctypes) could have spawned a lot of children # very quickly. - for i in range(30): + deadline = time.monotonic() + 10.0 + while time.monotonic() <= deadline: # wait3() shouldn't hang, but some of the buildbots seem to hang # in the forking tests. This is an attempt to fix the problem. spid, status, rusage = os.wait3(os.WNOHANG) diff --git a/Lib/test/test_wait4.py b/Lib/test/test_wait4.py index 352c11a..b427a9b 100644 --- a/Lib/test/test_wait4.py +++ b/Lib/test/test_wait4.py @@ -19,13 +19,14 @@ class Wait4Test(ForkWait): # Issue #11185: wait4 is broken on AIX and will always return 0 # with WNOHANG. option = 0 - for i in range(10): + deadline = time.monotonic() + 10.0 + while time.monotonic() <= deadline: # wait4() shouldn't hang, but some of the buildbots seem to hang # in the forking tests. This is an attempt to fix the problem. spid, status, rusage = os.wait4(cpid, option) if spid == cpid: break - time.sleep(1.0) + time.sleep(0.1) self.assertEqual(spid, cpid) self.assertEqual(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8)) self.assertTrue(rusage) diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index 99b3eda..120c54f 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -713,6 +713,23 @@ class SimpleServerTestCase(BaseServerTestCase): conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye') conn.close() + def test_context_manager(self): + with xmlrpclib.ServerProxy(URL) as server: + server.add(2, 3) + self.assertNotEqual(server('transport')._connection, + (None, None)) + self.assertEqual(server('transport')._connection, + (None, None)) + + def test_context_manager_method_error(self): + try: + with xmlrpclib.ServerProxy(URL) as server: + server.add(2, "a") + except xmlrpclib.Fault: + pass + self.assertEqual(server('transport')._connection, + (None, None)) + class MultiPathServerTestCase(BaseServerTestCase): threadFunc = staticmethod(http_multi_server) @@ -898,6 +915,7 @@ class ServerProxyTestCase(unittest.TestCase): p = xmlrpclib.ServerProxy(self.url, transport=t) self.assertEqual(p('transport'), t) + # This is a contrived way to make a failure occur on the server side # in order to test the _send_traceback_header flag on the server class FailingMessageClass(http.client.HTTPMessage): |