diff options
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/datetimetester.py | 21 | ||||
-rw-r--r-- | Lib/test/fork_wait.py | 7 | ||||
-rw-r--r-- | Lib/test/string_tests.py | 3 | ||||
-rw-r--r-- | Lib/test/test_capi.py | 32 | ||||
-rw-r--r-- | Lib/test/test_codeccallbacks.py | 2 | ||||
-rw-r--r-- | Lib/test/test_codecs.py | 12 | ||||
-rw-r--r-- | Lib/test/test_collections.py | 5 | ||||
-rw-r--r-- | Lib/test/test_doctest.py | 2 | ||||
-rw-r--r-- | Lib/test/test_docxmlrpc.py | 9 | ||||
-rw-r--r-- | Lib/test/test_fork1.py | 5 | ||||
-rw-r--r-- | Lib/test/test_format.py | 10 | ||||
-rw-r--r-- | Lib/test/test_fractions.py | 3 | ||||
-rw-r--r-- | Lib/test/test_httplib.py | 303 | ||||
-rw-r--r-- | Lib/test/test_importlib/test_lazy.py | 132 | ||||
-rw-r--r-- | Lib/test/test_inspect.py | 70 | ||||
-rw-r--r-- | Lib/test/test_json/test_tool.py | 8 | ||||
-rw-r--r-- | Lib/test/test_selectors.py | 10 | ||||
-rw-r--r-- | Lib/test/test_signal.py | 39 | ||||
-rw-r--r-- | Lib/test/test_socketserver.py | 32 | ||||
-rw-r--r-- | Lib/test/test_sys.py | 47 | ||||
-rw-r--r-- | Lib/test/test_tuple.py | 8 | ||||
-rw-r--r-- | Lib/test/test_unicode.py | 47 | ||||
-rw-r--r-- | Lib/test/test_wait3.py | 3 | ||||
-rw-r--r-- | Lib/test/test_wait4.py | 5 | ||||
-rw-r--r-- | Lib/test/test_xmlrpc.py | 18 |
25 files changed, 690 insertions, 143 deletions
diff --git a/Lib/test/datetimetester.py b/Lib/test/datetimetester.py index 3f3c60a..b8c6138 100644 --- a/Lib/test/datetimetester.py +++ b/Lib/test/datetimetester.py @@ -2270,13 +2270,14 @@ class TestTime(HarmlessMixedComparison, unittest.TestCase): self.assertEqual(orig, derived) def test_bool(self): + # time is always True. cls = self.theclass self.assertTrue(cls(1)) self.assertTrue(cls(0, 1)) self.assertTrue(cls(0, 0, 1)) self.assertTrue(cls(0, 0, 0, 1)) - self.assertFalse(cls(0)) - self.assertFalse(cls()) + self.assertTrue(cls(0)) + self.assertTrue(cls()) def test_replace(self): cls = self.theclass @@ -2629,7 +2630,7 @@ class TestTimeTZ(TestTime, TZInfoBase, unittest.TestCase): self.assertEqual(derived.tzname(), 'cookie') def test_more_bool(self): - # Test cases with non-None tzinfo. + # time is always True. cls = self.theclass t = cls(0, tzinfo=FixedOffset(-300, "")) @@ -2639,23 +2640,11 @@ class TestTimeTZ(TestTime, TZInfoBase, unittest.TestCase): self.assertTrue(t) t = cls(5, tzinfo=FixedOffset(300, "")) - self.assertFalse(t) + self.assertTrue(t) t = cls(23, 59, tzinfo=FixedOffset(23*60 + 59, "")) - self.assertFalse(t) - - # Mostly ensuring this doesn't overflow internally. - t = cls(0, tzinfo=FixedOffset(23*60 + 59, "")) self.assertTrue(t) - # But this should yield a value error -- the utcoffset is bogus. - t = cls(0, tzinfo=FixedOffset(24*60, "")) - self.assertRaises(ValueError, lambda: bool(t)) - - # Likewise. - t = cls(0, tzinfo=FixedOffset(-24*60, "")) - self.assertRaises(ValueError, lambda: bool(t)) - def test_replace(self): cls = self.theclass z100 = FixedOffset(100, "+100") diff --git a/Lib/test/fork_wait.py b/Lib/test/fork_wait.py index 19b54ec..8c7c3aa 100644 --- a/Lib/test/fork_wait.py +++ b/Lib/test/fork_wait.py @@ -48,7 +48,12 @@ class ForkWait(unittest.TestCase): for i in range(NUM_THREADS): _thread.start_new(self.f, (i,)) - time.sleep(LONGSLEEP) + # busy-loop to wait for threads + deadline = time.monotonic() + 10.0 + while len(self.alive) < NUM_THREADS: + time.sleep(0.1) + if time.monotonic() <= deadline: + break a = sorted(self.alive.keys()) self.assertEqual(a, list(range(NUM_THREADS))) diff --git a/Lib/test/string_tests.py b/Lib/test/string_tests.py index 5ed01f2..569bae1 100644 --- a/Lib/test/string_tests.py +++ b/Lib/test/string_tests.py @@ -1178,8 +1178,7 @@ class MixinStrUnicodeUserStringTest: self.checkraises(TypeError, 'abc', '__mod__') self.checkraises(TypeError, '%(foo)s', '__mod__', 42) self.checkraises(TypeError, '%s%s', '__mod__', (42,)) - with self.assertWarns(DeprecationWarning): - self.checkraises(TypeError, '%c', '__mod__', (None,)) + self.checkraises(TypeError, '%c', '__mod__', (None,)) self.checkraises(ValueError, '%(foo', '__mod__', {}) self.checkraises(TypeError, '%(foo)s %(bar)s', '__mod__', ('foo', 42)) self.checkraises(TypeError, '%d', '__mod__', "42") # not numeric diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py index ba7c38d..408f12c 100644 --- a/Lib/test/test_capi.py +++ b/Lib/test/test_capi.py @@ -319,34 +319,38 @@ class EmbeddingTests(unittest.TestCase): print() print(out) print(err) + expected_errors = sys.__stdout__.errors expected_stdin_encoding = sys.__stdin__.encoding expected_pipe_encoding = self._get_default_pipe_encoding() expected_output = os.linesep.join([ "--- Use defaults ---", "Expected encoding: default", "Expected errors: default", - "stdin: {0}:strict", - "stdout: {1}:strict", - "stderr: {1}:backslashreplace", + "stdin: {in_encoding}:{errors}", + "stdout: {out_encoding}:{errors}", + "stderr: {out_encoding}:backslashreplace", "--- Set errors only ---", "Expected encoding: default", - "Expected errors: surrogateescape", - "stdin: {0}:surrogateescape", - "stdout: {1}:surrogateescape", - "stderr: {1}:backslashreplace", + "Expected errors: ignore", + "stdin: {in_encoding}:ignore", + "stdout: {out_encoding}:ignore", + "stderr: {out_encoding}:backslashreplace", "--- Set encoding only ---", "Expected encoding: latin-1", "Expected errors: default", - "stdin: latin-1:strict", - "stdout: latin-1:strict", + "stdin: latin-1:{errors}", + "stdout: latin-1:{errors}", "stderr: latin-1:backslashreplace", "--- Set encoding and errors ---", "Expected encoding: latin-1", - "Expected errors: surrogateescape", - "stdin: latin-1:surrogateescape", - "stdout: latin-1:surrogateescape", - "stderr: latin-1:backslashreplace"]).format(expected_stdin_encoding, - expected_pipe_encoding) + "Expected errors: replace", + "stdin: latin-1:replace", + "stdout: latin-1:replace", + "stderr: latin-1:backslashreplace"]) + expected_output = expected_output.format( + in_encoding=expected_stdin_encoding, + out_encoding=expected_pipe_encoding, + errors=expected_errors) # This is useful if we ever trip over odd platform behaviour self.maxDiff = None self.assertEqual(out.strip(), expected_output) diff --git a/Lib/test/test_codeccallbacks.py b/Lib/test/test_codeccallbacks.py index 84804bb..a1ce9cf 100644 --- a/Lib/test/test_codeccallbacks.py +++ b/Lib/test/test_codeccallbacks.py @@ -819,7 +819,7 @@ class CodecCallbackTest(unittest.TestCase): def __getitem__(self, key): raise ValueError #self.assertRaises(ValueError, "\xff".translate, D()) - self.assertRaises(TypeError, "\xff".translate, {0xff: sys.maxunicode+1}) + self.assertRaises(ValueError, "\xff".translate, {0xff: sys.maxunicode+1}) self.assertRaises(TypeError, "\xff".translate, {0xff: ()}) def test_bug828737(self): diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index 9b62d5b..6945a99 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -890,10 +890,6 @@ class CP65001Test(ReadTest, unittest.TestCase): "\U00010fff\uD800") self.assertTrue(codecs.lookup_error("surrogatepass")) - def test_readline(self): - self.skipTest("issue #20571: code page 65001 codec does not " - "support partial decoder yet") - class UTF7Test(ReadTest, unittest.TestCase): encoding = "utf-7" @@ -2750,15 +2746,15 @@ class CodePageTest(unittest.TestCase): self.assertRaisesRegex(UnicodeEncodeError, 'cp932', codecs.code_page_encode, 932, '\xff') self.assertRaisesRegex(UnicodeDecodeError, 'cp932', - codecs.code_page_decode, 932, b'\x81\x00') + codecs.code_page_decode, 932, b'\x81\x00', 'strict', True) self.assertRaisesRegex(UnicodeDecodeError, 'CP_UTF8', - codecs.code_page_decode, self.CP_UTF8, b'\xff') + codecs.code_page_decode, self.CP_UTF8, b'\xff', 'strict', True) def check_decode(self, cp, tests): for raw, errors, expected in tests: if expected is not None: try: - decoded = codecs.code_page_decode(cp, raw, errors) + decoded = codecs.code_page_decode(cp, raw, errors, True) except UnicodeDecodeError as err: self.fail('Unable to decode %a from "cp%s" with ' 'errors=%r: %s' % (raw, cp, errors, err)) @@ -2770,7 +2766,7 @@ class CodePageTest(unittest.TestCase): self.assertLessEqual(decoded[1], len(raw)) else: self.assertRaises(UnicodeDecodeError, - codecs.code_page_decode, cp, raw, errors) + codecs.code_page_decode, cp, raw, errors, True) def check_encode(self, cp, tests): for text, errors, expected in tests: diff --git a/Lib/test/test_collections.py b/Lib/test/test_collections.py index ee28a6c..d352d2a 100644 --- a/Lib/test/test_collections.py +++ b/Lib/test/test_collections.py @@ -1187,6 +1187,11 @@ class TestOrderedDict(unittest.TestCase): self.assertEqual(list(od.items()), pairs) self.assertEqual(list(reversed(od)), [t[0] for t in reversed(pairs)]) + self.assertEqual(list(reversed(od.keys())), + [t[0] for t in reversed(pairs)]) + self.assertEqual(list(reversed(od.values())), + [t[1] for t in reversed(pairs)]) + self.assertEqual(list(reversed(od.items())), list(reversed(pairs))) def test_popitem(self): pairs = [('c', 1), ('b', 2), ('a', 3), ('d', 4), ('e', 5), ('f', 6)] diff --git a/Lib/test/test_doctest.py b/Lib/test/test_doctest.py index 56193e8..a1029ed 100644 --- a/Lib/test/test_doctest.py +++ b/Lib/test/test_doctest.py @@ -2897,7 +2897,7 @@ Invalid doctest option: def test_main(): # Check the doctest cases in doctest itself: - support.run_doctest(doctest, verbosity=True) + ret = support.run_doctest(doctest, verbosity=True) # Check the doctest cases defined here: from test import test_doctest support.run_doctest(test_doctest, verbosity=True) diff --git a/Lib/test/test_docxmlrpc.py b/Lib/test/test_docxmlrpc.py index cb6366c..eb97516 100644 --- a/Lib/test/test_docxmlrpc.py +++ b/Lib/test/test_docxmlrpc.py @@ -87,10 +87,11 @@ class DocXMLRPCHTTPGETServer(unittest.TestCase): threading.Thread(target=server, args=(self.evt, 1)).start() # wait for port to be assigned - n = 1000 - while n > 0 and PORT is None: - time.sleep(0.001) - n -= 1 + deadline = time.monotonic() + 10.0 + while PORT is None: + time.sleep(0.010) + if time.monotonic() > deadline: + break self.client = http.client.HTTPConnection("localhost:%d" % PORT) diff --git a/Lib/test/test_fork1.py b/Lib/test/test_fork1.py index e0626df..8bcbd46 100644 --- a/Lib/test/test_fork1.py +++ b/Lib/test/test_fork1.py @@ -18,13 +18,14 @@ get_attribute(os, 'fork') class ForkTest(ForkWait): def wait_impl(self, cpid): - for i in range(10): + deadline = time.monotonic() + 10.0 + while time.monotonic() <= deadline: # waitpid() shouldn't hang, but some of the buildbots seem to hang # in the forking tests. This is an attempt to fix the problem. spid, status = os.waitpid(cpid, os.WNOHANG) if spid == cpid: break - time.sleep(1.0) + time.sleep(0.1) self.assertEqual(spid, cpid) self.assertEqual(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8)) diff --git a/Lib/test/test_format.py b/Lib/test/test_format.py index fc71e48..631bf35 100644 --- a/Lib/test/test_format.py +++ b/Lib/test/test_format.py @@ -142,8 +142,6 @@ class FormatTest(unittest.TestCase): testformat("%#+027.23X", big, "+0X0001234567890ABCDEF12345") # same, except no 0 flag testformat("%#+27.23X", big, " +0X001234567890ABCDEF12345") - with self.assertWarns(DeprecationWarning): - testformat("%x", float(big), "123456_______________", 6) big = 0o12345670123456701234567012345670 # 32 octal digits testformat("%o", big, "12345670123456701234567012345670") testformat("%o", -big, "-12345670123456701234567012345670") @@ -183,8 +181,6 @@ class FormatTest(unittest.TestCase): testformat("%034.33o", big, "0012345670123456701234567012345670") # base marker shouldn't change that testformat("%0#34.33o", big, "0o012345670123456701234567012345670") - with self.assertWarns(DeprecationWarning): - testformat("%o", float(big), "123456__________________________", 6) # Some small ints, in both Python int and flavors). testformat("%d", 42, "42") testformat("%d", -42, "-42") @@ -195,8 +191,6 @@ class FormatTest(unittest.TestCase): testformat("%#x", 1, "0x1") testformat("%#X", 1, "0X1") testformat("%#X", 1, "0X1") - with self.assertWarns(DeprecationWarning): - testformat("%#x", 1.0, "0x1") testformat("%#o", 1, "0o1") testformat("%#o", 1, "0o1") testformat("%#o", 0, "0o0") @@ -213,14 +207,10 @@ class FormatTest(unittest.TestCase): testformat("%x", -0x42, "-42") testformat("%x", 0x42, "42") testformat("%x", -0x42, "-42") - with self.assertWarns(DeprecationWarning): - testformat("%x", float(0x42), "42") testformat("%o", 0o42, "42") testformat("%o", -0o42, "-42") testformat("%o", 0o42, "42") testformat("%o", -0o42, "-42") - with self.assertWarns(DeprecationWarning): - testformat("%o", float(0o42), "42") testformat("%r", "\u0378", "'\\u0378'") # non printable testformat("%a", "\u0378", "'\\u0378'") # non printable testformat("%r", "\u0374", "'\u0374'") # printable diff --git a/Lib/test/test_fractions.py b/Lib/test/test_fractions.py index 3336532..e86d5ce 100644 --- a/Lib/test/test_fractions.py +++ b/Lib/test/test_fractions.py @@ -330,7 +330,6 @@ class FractionTest(unittest.TestCase): self.assertTypedEquals(F(-2, 10), round(F(-15, 100), 1)) self.assertTypedEquals(F(-2, 10), round(F(-25, 100), 1)) - def testArithmetic(self): self.assertEqual(F(1, 2), F(1, 10) + F(2, 5)) self.assertEqual(F(-3, 10), F(1, 10) - F(2, 5)) @@ -402,6 +401,8 @@ class FractionTest(unittest.TestCase): self.assertTypedEquals(2.0 , 4 ** F(1, 2)) self.assertTypedEquals(0.25, 2.0 ** F(-2, 1)) self.assertTypedEquals(1.0 + 0j, (1.0 + 0j) ** F(1, 10)) + self.assertRaises(ZeroDivisionError, operator.pow, + F(0, 1), -2) def testMixingWithDecimal(self): # Decimal refuses mixed arithmetic (but not mixed comparisons) diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py index 30b6c0c..69aa381 100644 --- a/Lib/test/test_httplib.py +++ b/Lib/test/test_httplib.py @@ -18,6 +18,26 @@ CERT_fakehostname = os.path.join(here, 'keycert2.pem') # Root cert file (CA) for svn.python.org's cert CACERT_svn_python_org = os.path.join(here, 'https_svn_python_org_root.pem') +# constants for testing chunked encoding +chunked_start = ( + 'HTTP/1.1 200 OK\r\n' + 'Transfer-Encoding: chunked\r\n\r\n' + 'a\r\n' + 'hello worl\r\n' + '3\r\n' + 'd! \r\n' + '8\r\n' + 'and now \r\n' + '22\r\n' + 'for something completely different\r\n' +) +chunked_expected = b'hello world! and now for something completely different' +chunk_extension = ";foo=bar" +last_chunk = "0\r\n" +last_chunk_extended = "0" + chunk_extension + "\r\n" +trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" +chunked_end = "\r\n" + HOST = support.HOST class FakeSocket: @@ -36,7 +56,10 @@ class FakeSocket: def makefile(self, mode, bufsize=None): if mode != 'r' and mode != 'rb': raise client.UnimplementedFileMode() - return self.fileclass(self.text) + # keep the file around so we can check how much was read from it + self.file = self.fileclass(self.text) + self.file.close = lambda:None #nerf close () + return self.file class EPipeSocket(FakeSocket): @@ -430,20 +453,8 @@ class BasicTest(TestCase): conn.request('POST', 'test', conn) def test_chunked(self): - chunked_start = ( - 'HTTP/1.1 200 OK\r\n' - 'Transfer-Encoding: chunked\r\n\r\n' - 'a\r\n' - 'hello worl\r\n' - '3\r\n' - 'd! \r\n' - '8\r\n' - 'and now \r\n' - '22\r\n' - 'for something completely different\r\n' - ) - expected = b'hello world! and now for something completely different' - sock = FakeSocket(chunked_start + '0\r\n') + expected = chunked_expected + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="GET") resp.begin() self.assertEqual(resp.read(), expected) @@ -451,7 +462,7 @@ class BasicTest(TestCase): # Various read sizes for n in range(1, 12): - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="GET") resp.begin() self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected) @@ -474,23 +485,12 @@ class BasicTest(TestCase): resp.close() def test_readinto_chunked(self): - chunked_start = ( - 'HTTP/1.1 200 OK\r\n' - 'Transfer-Encoding: chunked\r\n\r\n' - 'a\r\n' - 'hello worl\r\n' - '3\r\n' - 'd! \r\n' - '8\r\n' - 'and now \r\n' - '22\r\n' - 'for something completely different\r\n' - ) - expected = b'hello world! and now for something completely different' + + expected = chunked_expected nexpected = len(expected) b = bytearray(128) - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="GET") resp.begin() n = resp.readinto(b) @@ -500,7 +500,7 @@ class BasicTest(TestCase): # Various read sizes for n in range(1, 12): - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="GET") resp.begin() m = memoryview(b) @@ -536,7 +536,7 @@ class BasicTest(TestCase): '1\r\n' 'd\r\n' ) - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="HEAD") resp.begin() self.assertEqual(resp.read(), b'') @@ -556,7 +556,7 @@ class BasicTest(TestCase): '1\r\n' 'd\r\n' ) - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="HEAD") resp.begin() b = bytearray(5) @@ -631,6 +631,7 @@ class BasicTest(TestCase): + '0' * 65536 + 'a\r\n' 'hello world\r\n' '0\r\n' + '\r\n' ) resp = client.HTTPResponse(FakeSocket(body)) resp.begin() @@ -670,6 +671,239 @@ class BasicTest(TestCase): conn.request('POST', '/', body) self.assertGreater(sock.sendall_calls, 1) + def test_chunked_extension(self): + extra = '3;foo=bar\r\n' + 'abc\r\n' + expected = chunked_expected + b'abc' + + sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + resp.close() + + def test_chunked_missing_end(self): + """some servers may serve up a short chunked encoding stream""" + expected = chunked_expected + sock = FakeSocket(chunked_start + last_chunk) #no terminating crlf + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + resp.close() + + def test_chunked_trailers(self): + """See that trailers are read and ignored""" + expected = chunked_expected + sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + # we should have reached the end of the file + self.assertEqual(sock.file.read(100), b"") #we read to the end + resp.close() + + def test_chunked_sync(self): + """Check that we don't read past the end of the chunked-encoding stream""" + expected = chunked_expected + extradata = "extradata" + sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + # the file should now have our extradata ready to be read + self.assertEqual(sock.file.read(100), extradata.encode("ascii")) #we read to the end + resp.close() + + def test_content_length_sync(self): + """Check that we don't read past the end of the Content-Length stream""" + extradata = "extradata" + expected = b"Hello123\r\n" + sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello123\r\n' + extradata) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + # the file should now have our extradata ready to be read + self.assertEqual(sock.file.read(100), extradata.encode("ascii")) #we read to the end + resp.close() + +class ExtendedReadTest(TestCase): + """ + Test peek(), read1(), readline() + """ + lines = ( + 'HTTP/1.1 200 OK\r\n' + '\r\n' + 'hello world!\n' + 'and now \n' + 'for something completely different\n' + 'foo' + ) + lines_expected = lines[lines.find('hello'):].encode("ascii") + lines_chunked = ( + 'HTTP/1.1 200 OK\r\n' + 'Transfer-Encoding: chunked\r\n\r\n' + 'a\r\n' + 'hello worl\r\n' + '3\r\n' + 'd!\n\r\n' + '9\r\n' + 'and now \n\r\n' + '23\r\n' + 'for something completely different\n\r\n' + '3\r\n' + 'foo\r\n' + '0\r\n' # terminating chunk + '\r\n' # end of trailers + ) + + def setUp(self): + sock = FakeSocket(self.lines) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + resp.fp = io.BufferedReader(resp.fp) + self.resp = resp + + + + def test_peek(self): + resp = self.resp + # patch up the buffered peek so that it returns not too much stuff + oldpeek = resp.fp.peek + def mypeek(n=-1): + p = oldpeek(n) + if n >= 0: + return p[:n] + return p[:10] + resp.fp.peek = mypeek + + all = [] + while True: + # try a short peek + p = resp.peek(3) + if p: + self.assertGreater(len(p), 0) + # then unbounded peek + p2 = resp.peek() + self.assertGreaterEqual(len(p2), len(p)) + self.assertTrue(p2.startswith(p)) + next = resp.read(len(p2)) + self.assertEqual(next, p2) + else: + next = resp.read() + self.assertFalse(next) + all.append(next) + if not next: + break + self.assertEqual(b"".join(all), self.lines_expected) + + def test_readline(self): + resp = self.resp + self._verify_readline(self.resp.readline, self.lines_expected) + + def _verify_readline(self, readline, expected): + all = [] + while True: + # short readlines + line = readline(5) + if line and line != b"foo": + if len(line) < 5: + self.assertTrue(line.endswith(b"\n")) + all.append(line) + if not line: + break + self.assertEqual(b"".join(all), expected) + + def test_read1(self): + resp = self.resp + def r(): + res = resp.read1(4) + self.assertLessEqual(len(res), 4) + return res + readliner = Readliner(r) + self._verify_readline(readliner.readline, self.lines_expected) + + def test_read1_unbounded(self): + resp = self.resp + all = [] + while True: + data = resp.read1() + if not data: + break + all.append(data) + self.assertEqual(b"".join(all), self.lines_expected) + + def test_read1_bounded(self): + resp = self.resp + all = [] + while True: + data = resp.read1(10) + if not data: + break + self.assertLessEqual(len(data), 10) + all.append(data) + self.assertEqual(b"".join(all), self.lines_expected) + + def test_read1_0(self): + self.assertEqual(self.resp.read1(0), b"") + + def test_peek_0(self): + p = self.resp.peek(0) + self.assertLessEqual(0, len(p)) + +class ExtendedReadTestChunked(ExtendedReadTest): + """ + Test peek(), read1(), readline() in chunked mode + """ + lines = ( + 'HTTP/1.1 200 OK\r\n' + 'Transfer-Encoding: chunked\r\n\r\n' + 'a\r\n' + 'hello worl\r\n' + '3\r\n' + 'd!\n\r\n' + '9\r\n' + 'and now \n\r\n' + '23\r\n' + 'for something completely different\n\r\n' + '3\r\n' + 'foo\r\n' + '0\r\n' # terminating chunk + '\r\n' # end of trailers + ) + + +class Readliner: + """ + a simple readline class that uses an arbitrary read function and buffering + """ + def __init__(self, readfunc): + self.readfunc = readfunc + self.remainder = b"" + + def readline(self, limit): + data = [] + datalen = 0 + read = self.remainder + try: + while True: + idx = read.find(b'\n') + if idx != -1: + break + if datalen + len(read) >= limit: + idx = limit - datalen - 1 + # read more data + data.append(read) + read = self.readfunc() + if not read: + idx = 0 #eof condition + break + idx += 1 + data.append(read[:idx]) + self.remainder = read[idx:] + return b"".join(data) + except: + self.remainder = b"".join(data) + raise + class OfflineTest(TestCase): def test_responses(self): self.assertEqual(client.responses[client.NOT_FOUND], "Not Found") @@ -973,7 +1207,8 @@ class HTTPResponseTest(TestCase): def test_main(verbose=None): support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest, HTTPSTest, RequestBodyTest, SourceAddressTest, - HTTPResponseTest) + HTTPResponseTest, ExtendedReadTest, + ExtendedReadTestChunked) if __name__ == '__main__': test_main() diff --git a/Lib/test/test_importlib/test_lazy.py b/Lib/test/test_importlib/test_lazy.py new file mode 100644 index 0000000..2e191bb --- /dev/null +++ b/Lib/test/test_importlib/test_lazy.py @@ -0,0 +1,132 @@ +import importlib +from importlib import abc +from importlib import util +import unittest + +from . import util as test_util + + +class CollectInit: + + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + def exec_module(self, module): + return self + + +class LazyLoaderFactoryTests(unittest.TestCase): + + def test_init(self): + factory = util.LazyLoader.factory(CollectInit) + # E.g. what importlib.machinery.FileFinder instantiates loaders with + # plus keyword arguments. + lazy_loader = factory('module name', 'module path', kw='kw') + loader = lazy_loader.loader + self.assertEqual(('module name', 'module path'), loader.args) + self.assertEqual({'kw': 'kw'}, loader.kwargs) + + def test_validation(self): + # No exec_module(), no lazy loading. + with self.assertRaises(TypeError): + util.LazyLoader.factory(object) + + +class TestingImporter(abc.MetaPathFinder, abc.Loader): + + module_name = 'lazy_loader_test' + mutated_name = 'changed' + loaded = None + source_code = 'attr = 42; __name__ = {!r}'.format(mutated_name) + + def find_spec(self, name, path, target=None): + if name != self.module_name: + return None + return util.spec_from_loader(name, util.LazyLoader(self)) + + def exec_module(self, module): + exec(self.source_code, module.__dict__) + self.loaded = module + + +class LazyLoaderTests(unittest.TestCase): + + def test_init(self): + with self.assertRaises(TypeError): + util.LazyLoader(object) + + def new_module(self, source_code=None): + loader = TestingImporter() + if source_code is not None: + loader.source_code = source_code + spec = util.spec_from_loader(TestingImporter.module_name, + util.LazyLoader(loader)) + module = spec.loader.create_module(spec) + module.__spec__ = spec + module.__loader__ = spec.loader + spec.loader.exec_module(module) + # Module is now lazy. + self.assertIsNone(loader.loaded) + return module + + def test_e2e(self): + # End-to-end test to verify the load is in fact lazy. + importer = TestingImporter() + assert importer.loaded is None + with test_util.uncache(importer.module_name): + with test_util.import_state(meta_path=[importer]): + module = importlib.import_module(importer.module_name) + self.assertIsNone(importer.loaded) + # Trigger load. + self.assertEqual(module.__loader__, importer) + self.assertIsNotNone(importer.loaded) + self.assertEqual(module, importer.loaded) + + def test_attr_unchanged(self): + # An attribute only mutated as a side-effect of import should not be + # changed needlessly. + module = self.new_module() + self.assertEqual(TestingImporter.mutated_name, module.__name__) + + def test_new_attr(self): + # A new attribute should persist. + module = self.new_module() + module.new_attr = 42 + self.assertEqual(42, module.new_attr) + + def test_mutated_preexisting_attr(self): + # Changing an attribute that already existed on the module -- + # e.g. __name__ -- should persist. + module = self.new_module() + module.__name__ = 'bogus' + self.assertEqual('bogus', module.__name__) + + def test_mutated_attr(self): + # Changing an attribute that comes into existence after an import + # should persist. + module = self.new_module() + module.attr = 6 + self.assertEqual(6, module.attr) + + def test_delete_eventual_attr(self): + # Deleting an attribute should stay deleted. + module = self.new_module() + del module.attr + self.assertFalse(hasattr(module, 'attr')) + + def test_delete_preexisting_attr(self): + module = self.new_module() + del module.__name__ + self.assertFalse(hasattr(module, '__name__')) + + def test_module_substitution_error(self): + source_code = 'import sys; sys.modules[__name__] = 42' + module = self.new_module(source_code) + with test_util.uncache(TestingImporter.module_name): + with self.assertRaises(ValueError): + module.__name__ + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_inspect.py b/Lib/test/test_inspect.py index 38367f3..881ca95 100644 --- a/Lib/test/test_inspect.py +++ b/Lib/test/test_inspect.py @@ -8,6 +8,7 @@ import linecache import os from os.path import normcase import _pickle +import pickle import re import shutil import sys @@ -73,6 +74,7 @@ def generator_function_example(self): for i in range(2): yield i + class TestPredicates(IsTestBase): def test_sixteen(self): count = len([x for x in dir(inspect) if x.startswith('is')]) @@ -1611,6 +1613,17 @@ class TestGetGeneratorState(unittest.TestCase): self.assertRaises(TypeError, inspect.getgeneratorlocals, (2,3)) +class MySignature(inspect.Signature): + # Top-level to make it picklable; + # used in test_signature_object_pickle + pass + +class MyParameter(inspect.Parameter): + # Top-level to make it picklable; + # used in test_signature_object_pickle + pass + + class TestSignatureObject(unittest.TestCase): @staticmethod def signature(func): @@ -1668,6 +1681,39 @@ class TestSignatureObject(unittest.TestCase): with self.assertRaisesRegex(ValueError, 'follows default argument'): S((pkd, pk)) + self.assertTrue(repr(sig).startswith('<Signature')) + self.assertTrue('"(po, pk' in repr(sig)) + + def test_signature_object_pickle(self): + def foo(a, b, *, c:1={}, **kw) -> {42:'ham'}: pass + foo_partial = functools.partial(foo, a=1) + + sig = inspect.signature(foo_partial) + self.assertTrue(sig.parameters['a']._partial_kwarg) + + for ver in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(pickle_ver=ver, subclass=False): + sig_pickled = pickle.loads(pickle.dumps(sig, ver)) + self.assertEqual(sig, sig_pickled) + self.assertTrue(sig_pickled.parameters['a']._partial_kwarg) + + # Test that basic sub-classing works + sig = inspect.signature(foo) + myparam = MyParameter(name='z', kind=inspect.Parameter.POSITIONAL_ONLY) + myparams = collections.OrderedDict(sig.parameters, a=myparam) + mysig = MySignature().replace(parameters=myparams.values(), + return_annotation=sig.return_annotation) + self.assertTrue(isinstance(mysig, MySignature)) + self.assertTrue(isinstance(mysig.parameters['z'], MyParameter)) + + for ver in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(pickle_ver=ver, subclass=True): + sig_pickled = pickle.loads(pickle.dumps(mysig, ver)) + self.assertEqual(mysig, sig_pickled) + self.assertTrue(isinstance(sig_pickled, MySignature)) + self.assertTrue(isinstance(sig_pickled.parameters['z'], + MyParameter)) + def test_signature_immutability(self): def test(a): pass @@ -2488,6 +2534,19 @@ class TestSignatureObject(unittest.TestCase): self.assertEqual(self.signature(Spam.foo), self.signature(Ham.foo)) + def test_signature_from_callable_python_obj(self): + class MySignature(inspect.Signature): pass + def foo(a, *, b:1): pass + foo_sig = MySignature.from_callable(foo) + self.assertTrue(isinstance(foo_sig, MySignature)) + + @unittest.skipIf(MISSING_C_DOCSTRINGS, + "Signature information for builtins requires docstrings") + def test_signature_from_callable_builtin_obj(self): + class MySignature(inspect.Signature): pass + sig = MySignature.from_callable(_pickle.Pickler) + self.assertTrue(isinstance(sig, MySignature)) + class TestParameterObject(unittest.TestCase): def test_signature_parameter_kinds(self): @@ -2533,6 +2592,7 @@ class TestParameterObject(unittest.TestCase): p.replace(kind=inspect.Parameter.VAR_POSITIONAL) self.assertTrue(repr(p).startswith('<Parameter')) + self.assertTrue('"a=42"' in repr(p)) def test_signature_parameter_equality(self): P = inspect.Parameter @@ -2859,6 +2919,16 @@ class TestBoundArguments(unittest.TestCase): ba4 = inspect.signature(bar).bind(1) self.assertNotEqual(ba, ba4) + def test_signature_bound_arguments_pickle(self): + def foo(a, b, *, c:1={}, **kw) -> {42:'ham'}: pass + sig = inspect.signature(foo) + ba = sig.bind(20, 30, z={}) + + for ver in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(pickle_ver=ver): + ba_pickled = pickle.loads(pickle.dumps(ba, ver)) + self.assertEqual(ba, ba_pickled) + class TestSignaturePrivateHelpers(unittest.TestCase): def test_signature_get_bound_param(self): diff --git a/Lib/test/test_json/test_tool.py b/Lib/test/test_json/test_tool.py index 0c39e56..5484a8a 100644 --- a/Lib/test/test_json/test_tool.py +++ b/Lib/test/test_json/test_tool.py @@ -55,6 +55,7 @@ class TestTool(unittest.TestCase): def test_infile_stdout(self): infile = self._create_infile() rc, out, err = assert_python_ok('-m', 'json.tool', infile) + self.assertEqual(rc, 0) self.assertEqual(out.splitlines(), self.expect.encode().splitlines()) self.assertEqual(err, b'') @@ -65,5 +66,12 @@ class TestTool(unittest.TestCase): self.addCleanup(os.remove, outfile) with open(outfile, "r") as fp: self.assertEqual(fp.read(), self.expect) + self.assertEqual(rc, 0) self.assertEqual(out, b'') self.assertEqual(err, b'') + + def test_help_flag(self): + rc, out, err = assert_python_ok('-m', 'json.tool', '-h') + self.assertEqual(rc, 0) + self.assertTrue(out.startswith(b'usage: ')) + self.assertEqual(err, b'') diff --git a/Lib/test/test_selectors.py b/Lib/test/test_selectors.py index 34edd76..8f83c90 100644 --- a/Lib/test/test_selectors.py +++ b/Lib/test/test_selectors.py @@ -441,10 +441,18 @@ class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn): SELECTOR = getattr(selectors, 'KqueueSelector', None) +@unittest.skipUnless(hasattr(selectors, 'DevpollSelector'), + "Test needs selectors.DevpollSelector") +class DevpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn): + + SELECTOR = getattr(selectors, 'DevpollSelector', None) + + + def test_main(): tests = [DefaultSelectorTestCase, SelectSelectorTestCase, PollSelectorTestCase, EpollSelectorTestCase, - KqueueSelectorTestCase] + KqueueSelectorTestCase, DevpollSelectorTestCase] support.run_unittest(*tests) support.reap_children() diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index a6f2c64..31e6d37 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -1,6 +1,7 @@ import unittest from test import support from contextlib import closing +import enum import gc import pickle import select @@ -39,6 +40,22 @@ def ignoring_eintr(__func, *args, **kwargs): return None +class GenericTests(unittest.TestCase): + + def test_enums(self): + for name in dir(signal): + sig = getattr(signal, name) + if name in {'SIG_DFL', 'SIG_IGN'}: + self.assertIsInstance(sig, signal.Handlers) + elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}: + self.assertIsInstance(sig, signal.Sigmasks) + elif name.startswith('SIG') and not name.startswith('SIG_'): + self.assertIsInstance(sig, signal.Signals) + elif name.startswith('CTRL_'): + self.assertIsInstance(sig, signal.Signals) + self.assertEqual(sys.platform, "win32") + + @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") class InterProcessSignalTests(unittest.TestCase): MAX_DURATION = 20 # Entire test should last at most 20 sec. @@ -195,6 +212,7 @@ class PosixTests(unittest.TestCase): def test_getsignal(self): hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) + self.assertIsInstance(hup, signal.Handlers) self.assertEqual(signal.getsignal(signal.SIGHUP), self.trivial_signal_handler) signal.signal(signal.SIGHUP, hup) @@ -271,7 +289,7 @@ class WakeupSignalTests(unittest.TestCase): os.close(read) os.close(write) - """.format(signals, ordered, test_body) + """.format(tuple(map(int, signals)), ordered, test_body) assert_python_ok('-c', code) @@ -604,6 +622,8 @@ class PendingSignalsTests(unittest.TestCase): signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) os.kill(os.getpid(), signum) pending = signal.sigpending() + for sig in pending: + assert isinstance(sig, signal.Signals), repr(pending) if pending != {signum}: raise Exception('%s != {%s}' % (pending, signum)) try: @@ -660,6 +680,7 @@ class PendingSignalsTests(unittest.TestCase): code = '''if 1: import signal import sys + from signal import Signals def handler(signum, frame): 1/0 @@ -702,6 +723,7 @@ class PendingSignalsTests(unittest.TestCase): def test(signum): signal.alarm(1) received = signal.sigwait([signum]) + assert isinstance(received, signal.Signals), received if received != signum: raise Exception('received %s, not %s' % (received, signum)) ''') @@ -842,8 +864,14 @@ class PendingSignalsTests(unittest.TestCase): def kill(signum): os.kill(os.getpid(), signum) + def check_mask(mask): + for sig in mask: + assert isinstance(sig, signal.Signals), repr(sig) + def read_sigmask(): - return signal.pthread_sigmask(signal.SIG_BLOCK, []) + sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) + check_mask(sigmask) + return sigmask signum = signal.SIGUSR1 @@ -852,6 +880,7 @@ class PendingSignalsTests(unittest.TestCase): # Unblock SIGUSR1 (and copy the old mask) to test our signal handler old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + check_mask(old_mask) try: kill(signum) except ZeroDivisionError: @@ -861,11 +890,13 @@ class PendingSignalsTests(unittest.TestCase): # Block and then raise SIGUSR1. The signal is blocked: the signal # handler is not called, and the signal is now pending - signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + check_mask(mask) kill(signum) # Check the new mask blocked = read_sigmask() + check_mask(blocked) if signum not in blocked: raise Exception("%s not in %s" % (signum, blocked)) if old_mask ^ blocked != {signum}: @@ -928,7 +959,7 @@ class PendingSignalsTests(unittest.TestCase): def test_main(): try: - support.run_unittest(PosixTests, InterProcessSignalTests, + support.run_unittest(GenericTests, PosixTests, InterProcessSignalTests, WakeupFDTests, WakeupSignalTests, SiginterruptTest, ItimerTest, WindowsSignalTests, PendingSignalsTests) diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py index 0617b30..8e0fde4 100644 --- a/Lib/test/test_socketserver.py +++ b/Lib/test/test_socketserver.py @@ -222,38 +222,6 @@ class SocketServerTest(unittest.TestCase): socketserver.DatagramRequestHandler, self.dgram_examine) - @contextlib.contextmanager - def mocked_select_module(self): - """Mocks the select.select() call to raise EINTR for first call""" - old_select = select.select - - class MockSelect: - def __init__(self): - self.called = 0 - - def __call__(self, *args): - self.called += 1 - if self.called == 1: - # raise the exception on first call - raise OSError(errno.EINTR, os.strerror(errno.EINTR)) - else: - # Return real select value for consecutive calls - return old_select(*args) - - select.select = MockSelect() - try: - yield select.select - finally: - select.select = old_select - - def test_InterruptServerSelectCall(self): - with self.mocked_select_module() as mock_select: - pid = self.run_server(socketserver.TCPServer, - socketserver.StreamRequestHandler, - self.stream_examine) - # Make sure select was called again: - self.assertGreater(mock_select.called, 1) - # Alas, on Linux (at least) recvfrom() doesn't return a meaningful # client address so this cannot work: diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index 5a9699f..b82358e 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -615,6 +615,53 @@ class SysModuleTest(unittest.TestCase): expected = None self.check_fsencoding(fs_encoding, expected) + def c_locale_get_error_handler(self, isolated=False, encoding=None): + # Force the POSIX locale + env = os.environ.copy() + env["LC_ALL"] = "C" + code = '\n'.join(( + 'import sys', + 'def dump(name):', + ' std = getattr(sys, name)', + ' print("%s: %s" % (name, std.errors))', + 'dump("stdin")', + 'dump("stdout")', + 'dump("stderr")', + )) + args = [sys.executable, "-c", code] + if isolated: + args.append("-I") + elif encoding: + env['PYTHONIOENCODING'] = encoding + p = subprocess.Popen(args, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env, + universal_newlines=True) + stdout, stderr = p.communicate() + return stdout + + def test_c_locale_surrogateescape(self): + out = self.c_locale_get_error_handler(isolated=True) + self.assertEqual(out, + 'stdin: surrogateescape\n' + 'stdout: surrogateescape\n' + 'stderr: backslashreplace\n') + + # replace the default error handler + out = self.c_locale_get_error_handler(encoding=':strict') + self.assertEqual(out, + 'stdin: strict\n' + 'stdout: strict\n' + 'stderr: backslashreplace\n') + + # force the encoding + out = self.c_locale_get_error_handler(encoding='iso8859-1') + self.assertEqual(out, + 'stdin: surrogateescape\n' + 'stdout: surrogateescape\n' + 'stderr: backslashreplace\n') + def test_implementation(self): # This test applies to all implementations equally. diff --git a/Lib/test/test_tuple.py b/Lib/test/test_tuple.py index e41711c..14c6430 100644 --- a/Lib/test/test_tuple.py +++ b/Lib/test/test_tuple.py @@ -201,6 +201,14 @@ class TupleTest(seq_tests.CommonTest): with self.assertRaises(TypeError): [3,] + T((1,2)) + def test_lexicographic_ordering(self): + # Issue 21100 + a = self.type2test([1, 2]) + b = self.type2test([1, 2, 0]) + c = self.type2test([1, 3]) + self.assertLess(a, b) + self.assertLess(b, c) + def test_main(): support.run_unittest(TupleTest) diff --git a/Lib/test/test_unicode.py b/Lib/test/test_unicode.py index 7e70918..7fda51c 100644 --- a/Lib/test/test_unicode.py +++ b/Lib/test/test_unicode.py @@ -8,6 +8,7 @@ Written by Marc-Andre Lemburg (mal@lemburg.com). import _string import codecs import itertools +import operator import struct import sys import unittest @@ -250,6 +251,7 @@ class UnicodeTest(string_tests.CommonTest, {ord('a'): None, ord('b'): ''}) self.checkequalnofix('xyyx', 'xzx', 'translate', {ord('z'): 'yy'}) + # this needs maketrans() self.checkequalnofix('abababc', 'abababc', 'translate', {'b': '<i>'}) @@ -259,6 +261,33 @@ class UnicodeTest(string_tests.CommonTest, tbl = self.type2test.maketrans('abc', 'xyz', 'd') self.checkequalnofix('xyzzy', 'abdcdcbdddd', 'translate', tbl) + # various tests switching from ASCII to latin1 or the opposite; + # same length, remove a letter, or replace with a longer string. + self.assertEqual("[a]".translate(str.maketrans('a', 'X')), + "[X]") + self.assertEqual("[a]".translate(str.maketrans({'a': 'X'})), + "[X]") + self.assertEqual("[a]".translate(str.maketrans({'a': None})), + "[]") + self.assertEqual("[a]".translate(str.maketrans({'a': 'XXX'})), + "[XXX]") + self.assertEqual("[a]".translate(str.maketrans({'a': '\xe9'})), + "[\xe9]") + self.assertEqual("[a]".translate(str.maketrans({'a': '<\xe9>'})), + "[<\xe9>]") + self.assertEqual("[\xe9]".translate(str.maketrans({'\xe9': 'a'})), + "[a]") + self.assertEqual("[\xe9]".translate(str.maketrans({'\xe9': None})), + "[]") + + # invalid Unicode characters + invalid_char = 0x10ffff+1 + for before in "a\xe9\u20ac\U0010ffff": + mapping = str.maketrans({before: invalid_char}) + text = "[%s]" % before + self.assertRaises(ValueError, text.translate, mapping) + + # errors self.assertRaises(TypeError, self.type2test.maketrans) self.assertRaises(ValueError, self.type2test.maketrans, 'abc', 'defg') self.assertRaises(TypeError, self.type2test.maketrans, 2, 'def') @@ -1127,20 +1156,20 @@ class UnicodeTest(string_tests.CommonTest, self.assertEqual('%.2s' % "a\xe9\u20ac", 'a\xe9') #issue 19995 - class PsuedoInt: + class PseudoInt: def __init__(self, value): self.value = int(value) def __int__(self): return self.value def __index__(self): return self.value - class PsuedoFloat: + class PseudoFloat: def __init__(self, value): self.value = float(value) def __int__(self): return int(self.value) - pi = PsuedoFloat(3.1415) - letter_m = PsuedoInt(109) + pi = PseudoFloat(3.1415) + letter_m = PseudoInt(109) self.assertEqual('%x' % 42, '2a') self.assertEqual('%X' % 15, 'F') self.assertEqual('%o' % 9, '11') @@ -1149,11 +1178,11 @@ class UnicodeTest(string_tests.CommonTest, self.assertEqual('%X' % letter_m, '6D') self.assertEqual('%o' % letter_m, '155') self.assertEqual('%c' % letter_m, 'm') - self.assertWarns(DeprecationWarning, '%x'.__mod__, pi), - self.assertWarns(DeprecationWarning, '%x'.__mod__, 3.14), - self.assertWarns(DeprecationWarning, '%X'.__mod__, 2.11), - self.assertWarns(DeprecationWarning, '%o'.__mod__, 1.79), - self.assertWarns(DeprecationWarning, '%c'.__mod__, pi), + self.assertRaisesRegex(TypeError, '%x format: an integer is required, not float', operator.mod, '%x', 3.14), + self.assertRaisesRegex(TypeError, '%X format: an integer is required, not float', operator.mod, '%X', 2.11), + self.assertRaisesRegex(TypeError, '%o format: an integer is required, not float', operator.mod, '%o', 1.79), + self.assertRaisesRegex(TypeError, '%x format: an integer is required, not PseudoFloat', operator.mod, '%x', pi), + self.assertRaises(TypeError, operator.mod, '%c', pi), def test_formatting_with_enum(self): # issue18780 diff --git a/Lib/test/test_wait3.py b/Lib/test/test_wait3.py index f6a065d..bb71481 100644 --- a/Lib/test/test_wait3.py +++ b/Lib/test/test_wait3.py @@ -18,7 +18,8 @@ class Wait3Test(ForkWait): # This many iterations can be required, since some previously run # tests (e.g. test_ctypes) could have spawned a lot of children # very quickly. - for i in range(30): + deadline = time.monotonic() + 10.0 + while time.monotonic() <= deadline: # wait3() shouldn't hang, but some of the buildbots seem to hang # in the forking tests. This is an attempt to fix the problem. spid, status, rusage = os.wait3(os.WNOHANG) diff --git a/Lib/test/test_wait4.py b/Lib/test/test_wait4.py index 352c11a..b427a9b 100644 --- a/Lib/test/test_wait4.py +++ b/Lib/test/test_wait4.py @@ -19,13 +19,14 @@ class Wait4Test(ForkWait): # Issue #11185: wait4 is broken on AIX and will always return 0 # with WNOHANG. option = 0 - for i in range(10): + deadline = time.monotonic() + 10.0 + while time.monotonic() <= deadline: # wait4() shouldn't hang, but some of the buildbots seem to hang # in the forking tests. This is an attempt to fix the problem. spid, status, rusage = os.wait4(cpid, option) if spid == cpid: break - time.sleep(1.0) + time.sleep(0.1) self.assertEqual(spid, cpid) self.assertEqual(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8)) self.assertTrue(rusage) diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index 99b3eda..120c54f 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -713,6 +713,23 @@ class SimpleServerTestCase(BaseServerTestCase): conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye') conn.close() + def test_context_manager(self): + with xmlrpclib.ServerProxy(URL) as server: + server.add(2, 3) + self.assertNotEqual(server('transport')._connection, + (None, None)) + self.assertEqual(server('transport')._connection, + (None, None)) + + def test_context_manager_method_error(self): + try: + with xmlrpclib.ServerProxy(URL) as server: + server.add(2, "a") + except xmlrpclib.Fault: + pass + self.assertEqual(server('transport')._connection, + (None, None)) + class MultiPathServerTestCase(BaseServerTestCase): threadFunc = staticmethod(http_multi_server) @@ -898,6 +915,7 @@ class ServerProxyTestCase(unittest.TestCase): p = xmlrpclib.ServerProxy(self.url, transport=t) self.assertEqual(p('transport'), t) + # This is a contrived way to make a failure occur on the server side # in order to test the _send_traceback_header flag on the server class FailingMessageClass(http.client.HTTPMessage): |