summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/datetimetester.py21
-rw-r--r--Lib/test/fork_wait.py7
-rw-r--r--Lib/test/ssl_servers.py8
-rw-r--r--Lib/test/string_tests.py3
-rw-r--r--Lib/test/test_asyncore.py24
-rw-r--r--Lib/test/test_augassign.py15
-rw-r--r--Lib/test/test_builtin.py2
-rw-r--r--Lib/test/test_capi.py49
-rw-r--r--Lib/test/test_codeccallbacks.py2
-rw-r--r--Lib/test/test_codecs.py12
-rw-r--r--Lib/test/test_collections.py5
-rw-r--r--Lib/test/test_csv.py10
-rw-r--r--Lib/test/test_decimal.py153
-rw-r--r--Lib/test/test_descr.py1
-rw-r--r--Lib/test/test_doctest.py39
-rw-r--r--Lib/test/test_docxmlrpc.py9
-rw-r--r--Lib/test/test_fork1.py5
-rw-r--r--Lib/test/test_format.py10
-rw-r--r--Lib/test/test_fractions.py3
-rw-r--r--Lib/test/test_grammar.py14
-rw-r--r--Lib/test/test_httplib.py303
-rw-r--r--Lib/test/test_importlib/test_lazy.py132
-rw-r--r--Lib/test/test_inspect.py108
-rw-r--r--Lib/test/test_io.py28
-rw-r--r--Lib/test/test_ipaddress.py8
-rw-r--r--Lib/test/test_json/test_tool.py8
-rw-r--r--Lib/test/test_math.py12
-rw-r--r--Lib/test/test_module.py24
-rw-r--r--Lib/test/test_operator.py11
-rw-r--r--Lib/test/test_poplib.py15
-rw-r--r--Lib/test/test_selectors.py10
-rw-r--r--Lib/test/test_set.py2
-rw-r--r--Lib/test/test_signal.py40
-rw-r--r--Lib/test/test_socketserver.py32
-rw-r--r--Lib/test/test_ssl.py114
-rw-r--r--Lib/test/test_sys.py49
-rw-r--r--Lib/test/test_tokenize.py5
-rw-r--r--Lib/test/test_tuple.py8
-rw-r--r--Lib/test/test_types.py2
-rw-r--r--Lib/test/test_unicode.py47
-rw-r--r--Lib/test/test_wait3.py3
-rw-r--r--Lib/test/test_wait4.py5
-rw-r--r--Lib/test/test_warnings.py16
-rw-r--r--Lib/test/test_xmlrpc.py18
44 files changed, 1162 insertions, 230 deletions
diff --git a/Lib/test/datetimetester.py b/Lib/test/datetimetester.py
index 3f3c60a..b8c6138 100644
--- a/Lib/test/datetimetester.py
+++ b/Lib/test/datetimetester.py
@@ -2270,13 +2270,14 @@ class TestTime(HarmlessMixedComparison, unittest.TestCase):
self.assertEqual(orig, derived)
def test_bool(self):
+ # time is always True.
cls = self.theclass
self.assertTrue(cls(1))
self.assertTrue(cls(0, 1))
self.assertTrue(cls(0, 0, 1))
self.assertTrue(cls(0, 0, 0, 1))
- self.assertFalse(cls(0))
- self.assertFalse(cls())
+ self.assertTrue(cls(0))
+ self.assertTrue(cls())
def test_replace(self):
cls = self.theclass
@@ -2629,7 +2630,7 @@ class TestTimeTZ(TestTime, TZInfoBase, unittest.TestCase):
self.assertEqual(derived.tzname(), 'cookie')
def test_more_bool(self):
- # Test cases with non-None tzinfo.
+ # time is always True.
cls = self.theclass
t = cls(0, tzinfo=FixedOffset(-300, ""))
@@ -2639,23 +2640,11 @@ class TestTimeTZ(TestTime, TZInfoBase, unittest.TestCase):
self.assertTrue(t)
t = cls(5, tzinfo=FixedOffset(300, ""))
- self.assertFalse(t)
+ self.assertTrue(t)
t = cls(23, 59, tzinfo=FixedOffset(23*60 + 59, ""))
- self.assertFalse(t)
-
- # Mostly ensuring this doesn't overflow internally.
- t = cls(0, tzinfo=FixedOffset(23*60 + 59, ""))
self.assertTrue(t)
- # But this should yield a value error -- the utcoffset is bogus.
- t = cls(0, tzinfo=FixedOffset(24*60, ""))
- self.assertRaises(ValueError, lambda: bool(t))
-
- # Likewise.
- t = cls(0, tzinfo=FixedOffset(-24*60, ""))
- self.assertRaises(ValueError, lambda: bool(t))
-
def test_replace(self):
cls = self.theclass
z100 = FixedOffset(100, "+100")
diff --git a/Lib/test/fork_wait.py b/Lib/test/fork_wait.py
index 19b54ec..8c7c3aa 100644
--- a/Lib/test/fork_wait.py
+++ b/Lib/test/fork_wait.py
@@ -48,7 +48,12 @@ class ForkWait(unittest.TestCase):
for i in range(NUM_THREADS):
_thread.start_new(self.f, (i,))
- time.sleep(LONGSLEEP)
+ # busy-loop to wait for threads
+ deadline = time.monotonic() + 10.0
+ while len(self.alive) < NUM_THREADS:
+ time.sleep(0.1)
+ if time.monotonic() <= deadline:
+ break
a = sorted(self.alive.keys())
self.assertEqual(a, list(range(NUM_THREADS)))
diff --git a/Lib/test/ssl_servers.py b/Lib/test/ssl_servers.py
index 759b3f4..f9d30cf 100644
--- a/Lib/test/ssl_servers.py
+++ b/Lib/test/ssl_servers.py
@@ -150,7 +150,7 @@ class HTTPSServerThread(threading.Thread):
def make_https_server(case, *, context=None, certfile=CERTFILE,
host=HOST, handler_class=None):
if context is None:
- context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
# We assume the certfile contains both private key and certificate
context.load_cert_chain(certfile)
server = HTTPSServerThread(context, host, handler_class)
@@ -182,6 +182,8 @@ if __name__ == "__main__":
parser.add_argument('--curve-name', dest='curve_name', type=str,
action='store',
help='curve name for EC-based Diffie-Hellman')
+ parser.add_argument('--ciphers', dest='ciphers', type=str,
+ help='allowed cipher list')
parser.add_argument('--dh', dest='dh_file', type=str, action='store',
help='PEM file containing DH parameters')
args = parser.parse_args()
@@ -192,12 +194,14 @@ if __name__ == "__main__":
else:
handler_class = RootedHTTPRequestHandler
handler_class.root = os.getcwd()
- context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(CERTFILE)
if args.curve_name:
context.set_ecdh_curve(args.curve_name)
if args.dh_file:
context.load_dh_params(args.dh_file)
+ if args.ciphers:
+ context.set_ciphers(args.ciphers)
server = HTTPSServer(("", args.port), handler_class, context)
if args.verbose:
diff --git a/Lib/test/string_tests.py b/Lib/test/string_tests.py
index 5ed01f2..569bae1 100644
--- a/Lib/test/string_tests.py
+++ b/Lib/test/string_tests.py
@@ -1178,8 +1178,7 @@ class MixinStrUnicodeUserStringTest:
self.checkraises(TypeError, 'abc', '__mod__')
self.checkraises(TypeError, '%(foo)s', '__mod__', 42)
self.checkraises(TypeError, '%s%s', '__mod__', (42,))
- with self.assertWarns(DeprecationWarning):
- self.checkraises(TypeError, '%c', '__mod__', (None,))
+ self.checkraises(TypeError, '%c', '__mod__', (None,))
self.checkraises(ValueError, '%(foo', '__mod__', {})
self.checkraises(TypeError, '%(foo)s %(bar)s', '__mod__', ('foo', 42))
self.checkraises(TypeError, '%d', '__mod__', "42") # not numeric
diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py
index 084d247..b04aa1d 100644
--- a/Lib/test/test_asyncore.py
+++ b/Lib/test/test_asyncore.py
@@ -316,23 +316,6 @@ class DispatcherTests(unittest.TestCase):
'warning: unhandled connect event']
self.assertEqual(lines, expected)
- def test_issue_8594(self):
- # XXX - this test is supposed to be removed in next major Python
- # version
- d = asyncore.dispatcher(socket.socket())
- # make sure the error message no longer refers to the socket
- # object but the dispatcher instance instead
- self.assertRaisesRegex(AttributeError, 'dispatcher instance',
- getattr, d, 'foo')
- # cheap inheritance with the underlying socket is supposed
- # to still work but a DeprecationWarning is expected
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter("always")
- family = d.family
- self.assertEqual(family, socket.AF_INET)
- self.assertEqual(len(w), 1)
- self.assertTrue(issubclass(w[0].category, DeprecationWarning))
-
def test_strerror(self):
# refers to bug #8573
err = asyncore._strerror(errno.EPERM)
@@ -349,9 +332,8 @@ class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
def handle_connect(self):
pass
-class DispatcherWithSendTests(unittest.TestCase):
- usepoll = False
+class DispatcherWithSendTests(unittest.TestCase):
def setUp(self):
pass
@@ -401,10 +383,6 @@ class DispatcherWithSendTests(unittest.TestCase):
self.fail("join() timed out")
-
-class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
- usepoll = True
-
@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
'asyncore.file_wrapper required')
class FileWrapperTest(unittest.TestCase):
diff --git a/Lib/test/test_augassign.py b/Lib/test/test_augassign.py
index 9a59c58..19b7687 100644
--- a/Lib/test/test_augassign.py
+++ b/Lib/test/test_augassign.py
@@ -136,6 +136,14 @@ class AugAssignTest(unittest.TestCase):
output.append("__imul__ called")
return self
+ def __matmul__(self, val):
+ output.append("__matmul__ called")
+ def __rmatmul__(self, val):
+ output.append("__rmatmul__ called")
+ def __imatmul__(self, val):
+ output.append("__imatmul__ called")
+ return self
+
def __div__(self, val):
output.append("__div__ called")
def __rdiv__(self, val):
@@ -233,6 +241,10 @@ class AugAssignTest(unittest.TestCase):
1 * x
x *= 1
+ x @ 1
+ 1 @ x
+ x @= 1
+
x / 1
1 / x
x /= 1
@@ -279,6 +291,9 @@ __isub__ called
__mul__ called
__rmul__ called
__imul__ called
+__matmul__ called
+__rmatmul__ called
+__imatmul__ called
__truediv__ called
__rtruediv__ called
__itruediv__ called
diff --git a/Lib/test/test_builtin.py b/Lib/test/test_builtin.py
index b561a6f..018ac8d 100644
--- a/Lib/test/test_builtin.py
+++ b/Lib/test/test_builtin.py
@@ -1092,7 +1092,7 @@ class BuiltinTest(unittest.TestCase):
self.assertAlmostEqual(pow(-1, 0.5), 1j)
self.assertAlmostEqual(pow(-1, 1/3), 0.5 + 0.8660254037844386j)
- self.assertRaises(TypeError, pow, -1, -2, 3)
+ self.assertRaises(ValueError, pow, -1, -2, 3)
self.assertRaises(ValueError, pow, 1, 2, 0)
self.assertRaises(TypeError, pow)
diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py
index ba7c38d..ba7f2c4 100644
--- a/Lib/test/test_capi.py
+++ b/Lib/test/test_capi.py
@@ -150,6 +150,23 @@ class CAPITest(unittest.TestCase):
self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__,
"($module, /, parameter)")
+ def test_c_type_with_matrix_multiplication(self):
+ M = _testcapi.matmulType
+ m1 = M()
+ m2 = M()
+ self.assertEqual(m1 @ m2, ("matmul", m1, m2))
+ self.assertEqual(m1 @ 42, ("matmul", m1, 42))
+ self.assertEqual(42 @ m1, ("matmul", 42, m1))
+ o = m1
+ o @= m2
+ self.assertEqual(o, ("imatmul", m1, m2))
+ o = m1
+ o @= 42
+ self.assertEqual(o, ("imatmul", m1, 42))
+ o = 42
+ o @= m1
+ self.assertEqual(o, ("matmul", 42, m1))
+
@unittest.skipUnless(threading, 'Threading required for this test.')
class TestPendingCalls(unittest.TestCase):
@@ -319,34 +336,38 @@ class EmbeddingTests(unittest.TestCase):
print()
print(out)
print(err)
+ expected_errors = sys.__stdout__.errors
expected_stdin_encoding = sys.__stdin__.encoding
expected_pipe_encoding = self._get_default_pipe_encoding()
expected_output = os.linesep.join([
"--- Use defaults ---",
"Expected encoding: default",
"Expected errors: default",
- "stdin: {0}:strict",
- "stdout: {1}:strict",
- "stderr: {1}:backslashreplace",
+ "stdin: {in_encoding}:{errors}",
+ "stdout: {out_encoding}:{errors}",
+ "stderr: {out_encoding}:backslashreplace",
"--- Set errors only ---",
"Expected encoding: default",
- "Expected errors: surrogateescape",
- "stdin: {0}:surrogateescape",
- "stdout: {1}:surrogateescape",
- "stderr: {1}:backslashreplace",
+ "Expected errors: ignore",
+ "stdin: {in_encoding}:ignore",
+ "stdout: {out_encoding}:ignore",
+ "stderr: {out_encoding}:backslashreplace",
"--- Set encoding only ---",
"Expected encoding: latin-1",
"Expected errors: default",
- "stdin: latin-1:strict",
- "stdout: latin-1:strict",
+ "stdin: latin-1:{errors}",
+ "stdout: latin-1:{errors}",
"stderr: latin-1:backslashreplace",
"--- Set encoding and errors ---",
"Expected encoding: latin-1",
- "Expected errors: surrogateescape",
- "stdin: latin-1:surrogateescape",
- "stdout: latin-1:surrogateescape",
- "stderr: latin-1:backslashreplace"]).format(expected_stdin_encoding,
- expected_pipe_encoding)
+ "Expected errors: replace",
+ "stdin: latin-1:replace",
+ "stdout: latin-1:replace",
+ "stderr: latin-1:backslashreplace"])
+ expected_output = expected_output.format(
+ in_encoding=expected_stdin_encoding,
+ out_encoding=expected_pipe_encoding,
+ errors=expected_errors)
# This is useful if we ever trip over odd platform behaviour
self.maxDiff = None
self.assertEqual(out.strip(), expected_output)
diff --git a/Lib/test/test_codeccallbacks.py b/Lib/test/test_codeccallbacks.py
index 84804bb..a1ce9cf 100644
--- a/Lib/test/test_codeccallbacks.py
+++ b/Lib/test/test_codeccallbacks.py
@@ -819,7 +819,7 @@ class CodecCallbackTest(unittest.TestCase):
def __getitem__(self, key):
raise ValueError
#self.assertRaises(ValueError, "\xff".translate, D())
- self.assertRaises(TypeError, "\xff".translate, {0xff: sys.maxunicode+1})
+ self.assertRaises(ValueError, "\xff".translate, {0xff: sys.maxunicode+1})
self.assertRaises(TypeError, "\xff".translate, {0xff: ()})
def test_bug828737(self):
diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py
index 9b62d5b..6945a99 100644
--- a/Lib/test/test_codecs.py
+++ b/Lib/test/test_codecs.py
@@ -890,10 +890,6 @@ class CP65001Test(ReadTest, unittest.TestCase):
"\U00010fff\uD800")
self.assertTrue(codecs.lookup_error("surrogatepass"))
- def test_readline(self):
- self.skipTest("issue #20571: code page 65001 codec does not "
- "support partial decoder yet")
-
class UTF7Test(ReadTest, unittest.TestCase):
encoding = "utf-7"
@@ -2750,15 +2746,15 @@ class CodePageTest(unittest.TestCase):
self.assertRaisesRegex(UnicodeEncodeError, 'cp932',
codecs.code_page_encode, 932, '\xff')
self.assertRaisesRegex(UnicodeDecodeError, 'cp932',
- codecs.code_page_decode, 932, b'\x81\x00')
+ codecs.code_page_decode, 932, b'\x81\x00', 'strict', True)
self.assertRaisesRegex(UnicodeDecodeError, 'CP_UTF8',
- codecs.code_page_decode, self.CP_UTF8, b'\xff')
+ codecs.code_page_decode, self.CP_UTF8, b'\xff', 'strict', True)
def check_decode(self, cp, tests):
for raw, errors, expected in tests:
if expected is not None:
try:
- decoded = codecs.code_page_decode(cp, raw, errors)
+ decoded = codecs.code_page_decode(cp, raw, errors, True)
except UnicodeDecodeError as err:
self.fail('Unable to decode %a from "cp%s" with '
'errors=%r: %s' % (raw, cp, errors, err))
@@ -2770,7 +2766,7 @@ class CodePageTest(unittest.TestCase):
self.assertLessEqual(decoded[1], len(raw))
else:
self.assertRaises(UnicodeDecodeError,
- codecs.code_page_decode, cp, raw, errors)
+ codecs.code_page_decode, cp, raw, errors, True)
def check_encode(self, cp, tests):
for text, errors, expected in tests:
diff --git a/Lib/test/test_collections.py b/Lib/test/test_collections.py
index ee28a6c..d352d2a 100644
--- a/Lib/test/test_collections.py
+++ b/Lib/test/test_collections.py
@@ -1187,6 +1187,11 @@ class TestOrderedDict(unittest.TestCase):
self.assertEqual(list(od.items()), pairs)
self.assertEqual(list(reversed(od)),
[t[0] for t in reversed(pairs)])
+ self.assertEqual(list(reversed(od.keys())),
+ [t[0] for t in reversed(pairs)])
+ self.assertEqual(list(reversed(od.values())),
+ [t[1] for t in reversed(pairs)])
+ self.assertEqual(list(reversed(od.items())), list(reversed(pairs)))
def test_popitem(self):
pairs = [('c', 1), ('b', 2), ('a', 3), ('d', 4), ('e', 5), ('f', 6)]
diff --git a/Lib/test/test_csv.py b/Lib/test/test_csv.py
index 7e2485f..7c31ac7 100644
--- a/Lib/test/test_csv.py
+++ b/Lib/test/test_csv.py
@@ -575,6 +575,16 @@ class TestDictFields(unittest.TestCase):
fileobj.readline() # header
self.assertEqual(fileobj.read(), "10,,abc\r\n")
+ def test_write_multiple_dict_rows(self):
+ fileobj = StringIO()
+ writer = csv.DictWriter(fileobj, fieldnames=["f1", "f2", "f3"])
+ writer.writeheader()
+ self.assertEqual(fileobj.getvalue(), "f1,f2,f3\r\n")
+ writer.writerows([{"f1": 1, "f2": "abc", "f3": "f"},
+ {"f1": 2, "f2": 5, "f3": "xyz"}])
+ self.assertEqual(fileobj.getvalue(),
+ "f1,f2,f3\r\n1,abc,f\r\n2,5,xyz\r\n")
+
def test_write_no_fields(self):
fileobj = StringIO()
self.assertRaises(TypeError, csv.DictWriter, fileobj)
diff --git a/Lib/test/test_decimal.py b/Lib/test/test_decimal.py
index 4b27907..8072899 100644
--- a/Lib/test/test_decimal.py
+++ b/Lib/test/test_decimal.py
@@ -33,12 +33,13 @@ import unittest
import numbers
import locale
from test.support import (run_unittest, run_doctest, is_resource_enabled,
- requires_IEEE_754)
+ requires_IEEE_754, requires_docstrings)
from test.support import (check_warnings, import_fresh_module, TestFailed,
run_with_locale, cpython_only)
import random
import time
import warnings
+import inspect
try:
import threading
except ImportError:
@@ -4448,18 +4449,6 @@ class PyCoverage(Coverage):
class PyFunctionality(unittest.TestCase):
"""Extra functionality in decimal.py"""
- def test_py_quantize_watchexp(self):
- # watchexp functionality
- Decimal = P.Decimal
- localcontext = P.localcontext
-
- with localcontext() as c:
- c.prec = 1
- c.Emax = 1
- c.Emin = -1
- x = Decimal(99999).quantize(Decimal("1e3"), watchexp=False)
- self.assertEqual(x, Decimal('1.00E+5'))
-
def test_py_alternate_formatting(self):
# triples giving a format, a Decimal, and the expected result
Decimal = P.Decimal
@@ -5402,6 +5391,143 @@ class CWhitebox(unittest.TestCase):
y = Decimal(10**(9*25)).__sizeof__()
self.assertEqual(y, x+4)
+@requires_docstrings
+@unittest.skipUnless(C, "test requires C version")
+class SignatureTest(unittest.TestCase):
+ """Function signatures"""
+
+ def test_inspect_module(self):
+ for attr in dir(P):
+ if attr.startswith('_'):
+ continue
+ p_func = getattr(P, attr)
+ c_func = getattr(C, attr)
+ if (attr == 'Decimal' or attr == 'Context' or
+ inspect.isfunction(p_func)):
+ p_sig = inspect.signature(p_func)
+ c_sig = inspect.signature(c_func)
+
+ # parameter names:
+ c_names = list(c_sig.parameters.keys())
+ p_names = [x for x in p_sig.parameters.keys() if not
+ x.startswith('_')]
+
+ self.assertEqual(c_names, p_names,
+ msg="parameter name mismatch in %s" % p_func)
+
+ c_kind = [x.kind for x in c_sig.parameters.values()]
+ p_kind = [x[1].kind for x in p_sig.parameters.items() if not
+ x[0].startswith('_')]
+
+ # parameters:
+ if attr != 'setcontext':
+ self.assertEqual(c_kind, p_kind,
+ msg="parameter kind mismatch in %s" % p_func)
+
+ def test_inspect_types(self):
+
+ POS = inspect._ParameterKind.POSITIONAL_ONLY
+ POS_KWD = inspect._ParameterKind.POSITIONAL_OR_KEYWORD
+
+ # Type heuristic (type annotations would help!):
+ pdict = {C: {'other': C.Decimal(1),
+ 'third': C.Decimal(1),
+ 'x': C.Decimal(1),
+ 'y': C.Decimal(1),
+ 'z': C.Decimal(1),
+ 'a': C.Decimal(1),
+ 'b': C.Decimal(1),
+ 'c': C.Decimal(1),
+ 'exp': C.Decimal(1),
+ 'modulo': C.Decimal(1),
+ 'num': "1",
+ 'f': 1.0,
+ 'rounding': C.ROUND_HALF_UP,
+ 'context': C.getcontext()},
+ P: {'other': P.Decimal(1),
+ 'third': P.Decimal(1),
+ 'a': P.Decimal(1),
+ 'b': P.Decimal(1),
+ 'c': P.Decimal(1),
+ 'exp': P.Decimal(1),
+ 'modulo': P.Decimal(1),
+ 'num': "1",
+ 'f': 1.0,
+ 'rounding': P.ROUND_HALF_UP,
+ 'context': P.getcontext()}}
+
+ def mkargs(module, sig):
+ args = []
+ kwargs = {}
+ for name, param in sig.parameters.items():
+ if name == 'self': continue
+ if param.kind == POS:
+ args.append(pdict[module][name])
+ elif param.kind == POS_KWD:
+ kwargs[name] = pdict[module][name]
+ else:
+ raise TestFailed("unexpected parameter kind")
+ return args, kwargs
+
+ def tr(s):
+ """The C Context docstrings use 'x' in order to prevent confusion
+ with the article 'a' in the descriptions."""
+ if s == 'x': return 'a'
+ if s == 'y': return 'b'
+ if s == 'z': return 'c'
+ return s
+
+ def doit(ty):
+ p_type = getattr(P, ty)
+ c_type = getattr(C, ty)
+ for attr in dir(p_type):
+ if attr.startswith('_'):
+ continue
+ p_func = getattr(p_type, attr)
+ c_func = getattr(c_type, attr)
+ if inspect.isfunction(p_func):
+ p_sig = inspect.signature(p_func)
+ c_sig = inspect.signature(c_func)
+
+ # parameter names:
+ p_names = list(p_sig.parameters.keys())
+ c_names = [tr(x) for x in c_sig.parameters.keys()]
+
+ self.assertEqual(c_names, p_names,
+ msg="parameter name mismatch in %s" % p_func)
+
+ p_kind = [x.kind for x in p_sig.parameters.values()]
+ c_kind = [x.kind for x in c_sig.parameters.values()]
+
+ # 'self' parameter:
+ self.assertIs(p_kind[0], POS_KWD)
+ self.assertIs(c_kind[0], POS)
+
+ # remaining parameters:
+ if ty == 'Decimal':
+ self.assertEqual(c_kind[1:], p_kind[1:],
+ msg="parameter kind mismatch in %s" % p_func)
+ else: # Context methods are positional only in the C version.
+ self.assertEqual(len(c_kind), len(p_kind),
+ msg="parameter kind mismatch in %s" % p_func)
+
+ # Run the function:
+ args, kwds = mkargs(C, c_sig)
+ try:
+ getattr(c_type(9), attr)(*args, **kwds)
+ except Exception as err:
+ raise TestFailed("invalid signature for %s: %s %s" % (c_func, args, kwds))
+
+ args, kwds = mkargs(P, p_sig)
+ try:
+ getattr(p_type(9), attr)(*args, **kwds)
+ except Exception as err:
+ raise TestFailed("invalid signature for %s: %s %s" % (p_func, args, kwds))
+
+ doit('Decimal')
+ doit('Context')
+
+
all_tests = [
CExplicitConstructionTest, PyExplicitConstructionTest,
CImplicitConstructionTest, PyImplicitConstructionTest,
@@ -5427,6 +5553,7 @@ if not C:
all_tests = all_tests[1::2]
else:
all_tests.insert(0, CheckAttributes)
+ all_tests.insert(1, SignatureTest)
def test_main(arith=False, verbose=None, todo_tests=None, debug=None):
diff --git a/Lib/test/test_descr.py b/Lib/test/test_descr.py
index 8bb7d6a..e65edb2 100644
--- a/Lib/test/test_descr.py
+++ b/Lib/test/test_descr.py
@@ -4160,6 +4160,7 @@ order (MRO) for bases """
('__add__', 'x + y', 'x += y'),
('__sub__', 'x - y', 'x -= y'),
('__mul__', 'x * y', 'x *= y'),
+ ('__matmul__', 'x @ y', 'x @= y'),
('__truediv__', 'operator.truediv(x, y)', None),
('__floordiv__', 'operator.floordiv(x, y)', None),
('__div__', 'x / y', 'x /= y'),
diff --git a/Lib/test/test_doctest.py b/Lib/test/test_doctest.py
index 56193e8..c62e7ca 100644
--- a/Lib/test/test_doctest.py
+++ b/Lib/test/test_doctest.py
@@ -2096,22 +2096,9 @@ def test_DocTestSuite():
>>> suite.run(unittest.TestResult())
<unittest.result.TestResult run=0 errors=0 failures=0>
- However, if DocTestSuite finds no docstrings, it raises an error:
+ The module need not contain any docstrings either:
- >>> try:
- ... doctest.DocTestSuite('test.sample_doctest_no_docstrings')
- ... except ValueError as e:
- ... error = e
-
- >>> print(error.args[1])
- has no docstrings
-
- You can prevent this error by passing a DocTestFinder instance with
- the `exclude_empty` keyword argument set to False:
-
- >>> finder = doctest.DocTestFinder(exclude_empty=False)
- >>> suite = doctest.DocTestSuite('test.sample_doctest_no_docstrings',
- ... test_finder=finder)
+ >>> suite = doctest.DocTestSuite('test.sample_doctest_no_docstrings')
>>> suite.run(unittest.TestResult())
<unittest.result.TestResult run=0 errors=0 failures=0>
@@ -2121,6 +2108,22 @@ def test_DocTestSuite():
>>> suite.run(unittest.TestResult())
<unittest.result.TestResult run=9 errors=0 failures=4>
+ We can also provide a DocTestFinder:
+
+ >>> finder = doctest.DocTestFinder()
+ >>> suite = doctest.DocTestSuite('test.sample_doctest',
+ ... test_finder=finder)
+ >>> suite.run(unittest.TestResult())
+ <unittest.result.TestResult run=9 errors=0 failures=4>
+
+ The DocTestFinder need not return any tests:
+
+ >>> finder = doctest.DocTestFinder()
+ >>> suite = doctest.DocTestSuite('test.sample_doctest_no_docstrings',
+ ... test_finder=finder)
+ >>> suite.run(unittest.TestResult())
+ <unittest.result.TestResult run=0 errors=0 failures=0>
+
We can supply global variables. If we pass globs, they will be
used instead of the module globals. Here we'll pass an empty
globals, triggering an extra error:
@@ -2168,7 +2171,7 @@ def test_DocTestSuite():
>>> test.test_doctest.sillySetup
Traceback (most recent call last):
...
- AttributeError: 'module' object has no attribute 'sillySetup'
+ AttributeError: module 'test.test_doctest' has no attribute 'sillySetup'
The setUp and tearDown funtions are passed test objects. Here
we'll use the setUp function to supply the missing variable y:
@@ -2314,7 +2317,7 @@ def test_DocFileSuite():
>>> test.test_doctest.sillySetup
Traceback (most recent call last):
...
- AttributeError: 'module' object has no attribute 'sillySetup'
+ AttributeError: module 'test.test_doctest' has no attribute 'sillySetup'
The setUp and tearDown funtions are passed test objects.
Here, we'll use a setUp function to set the favorite color in
@@ -2897,7 +2900,7 @@ Invalid doctest option:
def test_main():
# Check the doctest cases in doctest itself:
- support.run_doctest(doctest, verbosity=True)
+ ret = support.run_doctest(doctest, verbosity=True)
# Check the doctest cases defined here:
from test import test_doctest
support.run_doctest(test_doctest, verbosity=True)
diff --git a/Lib/test/test_docxmlrpc.py b/Lib/test/test_docxmlrpc.py
index cb6366c..eb97516 100644
--- a/Lib/test/test_docxmlrpc.py
+++ b/Lib/test/test_docxmlrpc.py
@@ -87,10 +87,11 @@ class DocXMLRPCHTTPGETServer(unittest.TestCase):
threading.Thread(target=server, args=(self.evt, 1)).start()
# wait for port to be assigned
- n = 1000
- while n > 0 and PORT is None:
- time.sleep(0.001)
- n -= 1
+ deadline = time.monotonic() + 10.0
+ while PORT is None:
+ time.sleep(0.010)
+ if time.monotonic() > deadline:
+ break
self.client = http.client.HTTPConnection("localhost:%d" % PORT)
diff --git a/Lib/test/test_fork1.py b/Lib/test/test_fork1.py
index e0626df..8bcbd46 100644
--- a/Lib/test/test_fork1.py
+++ b/Lib/test/test_fork1.py
@@ -18,13 +18,14 @@ get_attribute(os, 'fork')
class ForkTest(ForkWait):
def wait_impl(self, cpid):
- for i in range(10):
+ deadline = time.monotonic() + 10.0
+ while time.monotonic() <= deadline:
# waitpid() shouldn't hang, but some of the buildbots seem to hang
# in the forking tests. This is an attempt to fix the problem.
spid, status = os.waitpid(cpid, os.WNOHANG)
if spid == cpid:
break
- time.sleep(1.0)
+ time.sleep(0.1)
self.assertEqual(spid, cpid)
self.assertEqual(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8))
diff --git a/Lib/test/test_format.py b/Lib/test/test_format.py
index fc71e48..631bf35 100644
--- a/Lib/test/test_format.py
+++ b/Lib/test/test_format.py
@@ -142,8 +142,6 @@ class FormatTest(unittest.TestCase):
testformat("%#+027.23X", big, "+0X0001234567890ABCDEF12345")
# same, except no 0 flag
testformat("%#+27.23X", big, " +0X001234567890ABCDEF12345")
- with self.assertWarns(DeprecationWarning):
- testformat("%x", float(big), "123456_______________", 6)
big = 0o12345670123456701234567012345670 # 32 octal digits
testformat("%o", big, "12345670123456701234567012345670")
testformat("%o", -big, "-12345670123456701234567012345670")
@@ -183,8 +181,6 @@ class FormatTest(unittest.TestCase):
testformat("%034.33o", big, "0012345670123456701234567012345670")
# base marker shouldn't change that
testformat("%0#34.33o", big, "0o012345670123456701234567012345670")
- with self.assertWarns(DeprecationWarning):
- testformat("%o", float(big), "123456__________________________", 6)
# Some small ints, in both Python int and flavors).
testformat("%d", 42, "42")
testformat("%d", -42, "-42")
@@ -195,8 +191,6 @@ class FormatTest(unittest.TestCase):
testformat("%#x", 1, "0x1")
testformat("%#X", 1, "0X1")
testformat("%#X", 1, "0X1")
- with self.assertWarns(DeprecationWarning):
- testformat("%#x", 1.0, "0x1")
testformat("%#o", 1, "0o1")
testformat("%#o", 1, "0o1")
testformat("%#o", 0, "0o0")
@@ -213,14 +207,10 @@ class FormatTest(unittest.TestCase):
testformat("%x", -0x42, "-42")
testformat("%x", 0x42, "42")
testformat("%x", -0x42, "-42")
- with self.assertWarns(DeprecationWarning):
- testformat("%x", float(0x42), "42")
testformat("%o", 0o42, "42")
testformat("%o", -0o42, "-42")
testformat("%o", 0o42, "42")
testformat("%o", -0o42, "-42")
- with self.assertWarns(DeprecationWarning):
- testformat("%o", float(0o42), "42")
testformat("%r", "\u0378", "'\\u0378'") # non printable
testformat("%a", "\u0378", "'\\u0378'") # non printable
testformat("%r", "\u0374", "'\u0374'") # printable
diff --git a/Lib/test/test_fractions.py b/Lib/test/test_fractions.py
index 3336532..e86d5ce 100644
--- a/Lib/test/test_fractions.py
+++ b/Lib/test/test_fractions.py
@@ -330,7 +330,6 @@ class FractionTest(unittest.TestCase):
self.assertTypedEquals(F(-2, 10), round(F(-15, 100), 1))
self.assertTypedEquals(F(-2, 10), round(F(-25, 100), 1))
-
def testArithmetic(self):
self.assertEqual(F(1, 2), F(1, 10) + F(2, 5))
self.assertEqual(F(-3, 10), F(1, 10) - F(2, 5))
@@ -402,6 +401,8 @@ class FractionTest(unittest.TestCase):
self.assertTypedEquals(2.0 , 4 ** F(1, 2))
self.assertTypedEquals(0.25, 2.0 ** F(-2, 1))
self.assertTypedEquals(1.0 + 0j, (1.0 + 0j) ** F(1, 10))
+ self.assertRaises(ZeroDivisionError, operator.pow,
+ F(0, 1), -2)
def testMixingWithDecimal(self):
# Decimal refuses mixed arithmetic (but not mixed comparisons)
diff --git a/Lib/test/test_grammar.py b/Lib/test/test_grammar.py
index bba8820..a7bad2d 100644
--- a/Lib/test/test_grammar.py
+++ b/Lib/test/test_grammar.py
@@ -985,6 +985,20 @@ class GrammarTests(unittest.TestCase):
self.assertFalse((False is 2) is 3)
self.assertFalse(False is 2 is 3)
+ def test_matrix_mul(self):
+ # This is not intended to be a comprehensive test, rather just to be few
+ # samples of the @ operator in test_grammar.py.
+ class M:
+ def __matmul__(self, o):
+ return 4
+ def __imatmul__(self, o):
+ self.other = o
+ return self
+ m = M()
+ self.assertEqual(m @ m, 4)
+ m @= 42
+ self.assertEqual(m.other, 42)
+
def test_main():
run_unittest(TokenTests, GrammarTests)
diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py
index 22f7329..1a6d8d0 100644
--- a/Lib/test/test_httplib.py
+++ b/Lib/test/test_httplib.py
@@ -18,6 +18,26 @@ CERT_fakehostname = os.path.join(here, 'keycert2.pem')
# Root cert file (CA) for svn.python.org's cert
CACERT_svn_python_org = os.path.join(here, 'https_svn_python_org_root.pem')
+# constants for testing chunked encoding
+chunked_start = (
+ 'HTTP/1.1 200 OK\r\n'
+ 'Transfer-Encoding: chunked\r\n\r\n'
+ 'a\r\n'
+ 'hello worl\r\n'
+ '3\r\n'
+ 'd! \r\n'
+ '8\r\n'
+ 'and now \r\n'
+ '22\r\n'
+ 'for something completely different\r\n'
+)
+chunked_expected = b'hello world! and now for something completely different'
+chunk_extension = ";foo=bar"
+last_chunk = "0\r\n"
+last_chunk_extended = "0" + chunk_extension + "\r\n"
+trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
+chunked_end = "\r\n"
+
HOST = support.HOST
class FakeSocket:
@@ -38,7 +58,10 @@ class FakeSocket:
def makefile(self, mode, bufsize=None):
if mode != 'r' and mode != 'rb':
raise client.UnimplementedFileMode()
- return self.fileclass(self.text)
+ # keep the file around so we can check how much was read from it
+ self.file = self.fileclass(self.text)
+ self.file.close = lambda:None #nerf close ()
+ return self.file
def close(self):
pass
@@ -435,20 +458,8 @@ class BasicTest(TestCase):
conn.request('POST', 'test', conn)
def test_chunked(self):
- chunked_start = (
- 'HTTP/1.1 200 OK\r\n'
- 'Transfer-Encoding: chunked\r\n\r\n'
- 'a\r\n'
- 'hello worl\r\n'
- '3\r\n'
- 'd! \r\n'
- '8\r\n'
- 'and now \r\n'
- '22\r\n'
- 'for something completely different\r\n'
- )
- expected = b'hello world! and now for something completely different'
- sock = FakeSocket(chunked_start + '0\r\n')
+ expected = chunked_expected
+ sock = FakeSocket(chunked_start + last_chunk + chunked_end)
resp = client.HTTPResponse(sock, method="GET")
resp.begin()
self.assertEqual(resp.read(), expected)
@@ -456,7 +467,7 @@ class BasicTest(TestCase):
# Various read sizes
for n in range(1, 12):
- sock = FakeSocket(chunked_start + '0\r\n')
+ sock = FakeSocket(chunked_start + last_chunk + chunked_end)
resp = client.HTTPResponse(sock, method="GET")
resp.begin()
self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
@@ -479,23 +490,12 @@ class BasicTest(TestCase):
resp.close()
def test_readinto_chunked(self):
- chunked_start = (
- 'HTTP/1.1 200 OK\r\n'
- 'Transfer-Encoding: chunked\r\n\r\n'
- 'a\r\n'
- 'hello worl\r\n'
- '3\r\n'
- 'd! \r\n'
- '8\r\n'
- 'and now \r\n'
- '22\r\n'
- 'for something completely different\r\n'
- )
- expected = b'hello world! and now for something completely different'
+
+ expected = chunked_expected
nexpected = len(expected)
b = bytearray(128)
- sock = FakeSocket(chunked_start + '0\r\n')
+ sock = FakeSocket(chunked_start + last_chunk + chunked_end)
resp = client.HTTPResponse(sock, method="GET")
resp.begin()
n = resp.readinto(b)
@@ -505,7 +505,7 @@ class BasicTest(TestCase):
# Various read sizes
for n in range(1, 12):
- sock = FakeSocket(chunked_start + '0\r\n')
+ sock = FakeSocket(chunked_start + last_chunk + chunked_end)
resp = client.HTTPResponse(sock, method="GET")
resp.begin()
m = memoryview(b)
@@ -541,7 +541,7 @@ class BasicTest(TestCase):
'1\r\n'
'd\r\n'
)
- sock = FakeSocket(chunked_start + '0\r\n')
+ sock = FakeSocket(chunked_start + last_chunk + chunked_end)
resp = client.HTTPResponse(sock, method="HEAD")
resp.begin()
self.assertEqual(resp.read(), b'')
@@ -561,7 +561,7 @@ class BasicTest(TestCase):
'1\r\n'
'd\r\n'
)
- sock = FakeSocket(chunked_start + '0\r\n')
+ sock = FakeSocket(chunked_start + last_chunk + chunked_end)
resp = client.HTTPResponse(sock, method="HEAD")
resp.begin()
b = bytearray(5)
@@ -636,6 +636,7 @@ class BasicTest(TestCase):
+ '0' * 65536 + 'a\r\n'
'hello world\r\n'
'0\r\n'
+ '\r\n'
)
resp = client.HTTPResponse(FakeSocket(body))
resp.begin()
@@ -675,6 +676,239 @@ class BasicTest(TestCase):
conn.request('POST', '/', body)
self.assertGreater(sock.sendall_calls, 1)
+ def test_chunked_extension(self):
+ extra = '3;foo=bar\r\n' + 'abc\r\n'
+ expected = chunked_expected + b'abc'
+
+ sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
+ resp = client.HTTPResponse(sock, method="GET")
+ resp.begin()
+ self.assertEqual(resp.read(), expected)
+ resp.close()
+
+ def test_chunked_missing_end(self):
+ """some servers may serve up a short chunked encoding stream"""
+ expected = chunked_expected
+ sock = FakeSocket(chunked_start + last_chunk) #no terminating crlf
+ resp = client.HTTPResponse(sock, method="GET")
+ resp.begin()
+ self.assertEqual(resp.read(), expected)
+ resp.close()
+
+ def test_chunked_trailers(self):
+ """See that trailers are read and ignored"""
+ expected = chunked_expected
+ sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
+ resp = client.HTTPResponse(sock, method="GET")
+ resp.begin()
+ self.assertEqual(resp.read(), expected)
+ # we should have reached the end of the file
+ self.assertEqual(sock.file.read(100), b"") #we read to the end
+ resp.close()
+
+ def test_chunked_sync(self):
+ """Check that we don't read past the end of the chunked-encoding stream"""
+ expected = chunked_expected
+ extradata = "extradata"
+ sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
+ resp = client.HTTPResponse(sock, method="GET")
+ resp.begin()
+ self.assertEqual(resp.read(), expected)
+ # the file should now have our extradata ready to be read
+ self.assertEqual(sock.file.read(100), extradata.encode("ascii")) #we read to the end
+ resp.close()
+
+ def test_content_length_sync(self):
+ """Check that we don't read past the end of the Content-Length stream"""
+ extradata = "extradata"
+ expected = b"Hello123\r\n"
+ sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello123\r\n' + extradata)
+ resp = client.HTTPResponse(sock, method="GET")
+ resp.begin()
+ self.assertEqual(resp.read(), expected)
+ # the file should now have our extradata ready to be read
+ self.assertEqual(sock.file.read(100), extradata.encode("ascii")) #we read to the end
+ resp.close()
+
+class ExtendedReadTest(TestCase):
+ """
+ Test peek(), read1(), readline()
+ """
+ lines = (
+ 'HTTP/1.1 200 OK\r\n'
+ '\r\n'
+ 'hello world!\n'
+ 'and now \n'
+ 'for something completely different\n'
+ 'foo'
+ )
+ lines_expected = lines[lines.find('hello'):].encode("ascii")
+ lines_chunked = (
+ 'HTTP/1.1 200 OK\r\n'
+ 'Transfer-Encoding: chunked\r\n\r\n'
+ 'a\r\n'
+ 'hello worl\r\n'
+ '3\r\n'
+ 'd!\n\r\n'
+ '9\r\n'
+ 'and now \n\r\n'
+ '23\r\n'
+ 'for something completely different\n\r\n'
+ '3\r\n'
+ 'foo\r\n'
+ '0\r\n' # terminating chunk
+ '\r\n' # end of trailers
+ )
+
+ def setUp(self):
+ sock = FakeSocket(self.lines)
+ resp = client.HTTPResponse(sock, method="GET")
+ resp.begin()
+ resp.fp = io.BufferedReader(resp.fp)
+ self.resp = resp
+
+
+
+ def test_peek(self):
+ resp = self.resp
+ # patch up the buffered peek so that it returns not too much stuff
+ oldpeek = resp.fp.peek
+ def mypeek(n=-1):
+ p = oldpeek(n)
+ if n >= 0:
+ return p[:n]
+ return p[:10]
+ resp.fp.peek = mypeek
+
+ all = []
+ while True:
+ # try a short peek
+ p = resp.peek(3)
+ if p:
+ self.assertGreater(len(p), 0)
+ # then unbounded peek
+ p2 = resp.peek()
+ self.assertGreaterEqual(len(p2), len(p))
+ self.assertTrue(p2.startswith(p))
+ next = resp.read(len(p2))
+ self.assertEqual(next, p2)
+ else:
+ next = resp.read()
+ self.assertFalse(next)
+ all.append(next)
+ if not next:
+ break
+ self.assertEqual(b"".join(all), self.lines_expected)
+
+ def test_readline(self):
+ resp = self.resp
+ self._verify_readline(self.resp.readline, self.lines_expected)
+
+ def _verify_readline(self, readline, expected):
+ all = []
+ while True:
+ # short readlines
+ line = readline(5)
+ if line and line != b"foo":
+ if len(line) < 5:
+ self.assertTrue(line.endswith(b"\n"))
+ all.append(line)
+ if not line:
+ break
+ self.assertEqual(b"".join(all), expected)
+
+ def test_read1(self):
+ resp = self.resp
+ def r():
+ res = resp.read1(4)
+ self.assertLessEqual(len(res), 4)
+ return res
+ readliner = Readliner(r)
+ self._verify_readline(readliner.readline, self.lines_expected)
+
+ def test_read1_unbounded(self):
+ resp = self.resp
+ all = []
+ while True:
+ data = resp.read1()
+ if not data:
+ break
+ all.append(data)
+ self.assertEqual(b"".join(all), self.lines_expected)
+
+ def test_read1_bounded(self):
+ resp = self.resp
+ all = []
+ while True:
+ data = resp.read1(10)
+ if not data:
+ break
+ self.assertLessEqual(len(data), 10)
+ all.append(data)
+ self.assertEqual(b"".join(all), self.lines_expected)
+
+ def test_read1_0(self):
+ self.assertEqual(self.resp.read1(0), b"")
+
+ def test_peek_0(self):
+ p = self.resp.peek(0)
+ self.assertLessEqual(0, len(p))
+
+class ExtendedReadTestChunked(ExtendedReadTest):
+ """
+ Test peek(), read1(), readline() in chunked mode
+ """
+ lines = (
+ 'HTTP/1.1 200 OK\r\n'
+ 'Transfer-Encoding: chunked\r\n\r\n'
+ 'a\r\n'
+ 'hello worl\r\n'
+ '3\r\n'
+ 'd!\n\r\n'
+ '9\r\n'
+ 'and now \n\r\n'
+ '23\r\n'
+ 'for something completely different\n\r\n'
+ '3\r\n'
+ 'foo\r\n'
+ '0\r\n' # terminating chunk
+ '\r\n' # end of trailers
+ )
+
+
+class Readliner:
+ """
+ a simple readline class that uses an arbitrary read function and buffering
+ """
+ def __init__(self, readfunc):
+ self.readfunc = readfunc
+ self.remainder = b""
+
+ def readline(self, limit):
+ data = []
+ datalen = 0
+ read = self.remainder
+ try:
+ while True:
+ idx = read.find(b'\n')
+ if idx != -1:
+ break
+ if datalen + len(read) >= limit:
+ idx = limit - datalen - 1
+ # read more data
+ data.append(read)
+ read = self.readfunc()
+ if not read:
+ idx = 0 #eof condition
+ break
+ idx += 1
+ data.append(read[:idx])
+ self.remainder = read[idx:]
+ return b"".join(data)
+ except:
+ self.remainder = b"".join(data)
+ raise
+
class OfflineTest(TestCase):
def test_responses(self):
self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
@@ -1019,7 +1253,8 @@ class TunnelTests(TestCase):
def test_main(verbose=None):
support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
HTTPSTest, RequestBodyTest, SourceAddressTest,
- HTTPResponseTest, TunnelTests)
+ HTTPResponseTest, ExtendedReadTest,
+ ExtendedReadTestChunked, TunnelTests)
if __name__ == '__main__':
test_main()
diff --git a/Lib/test/test_importlib/test_lazy.py b/Lib/test/test_importlib/test_lazy.py
new file mode 100644
index 0000000..2e191bb
--- /dev/null
+++ b/Lib/test/test_importlib/test_lazy.py
@@ -0,0 +1,132 @@
+import importlib
+from importlib import abc
+from importlib import util
+import unittest
+
+from . import util as test_util
+
+
+class CollectInit:
+
+ def __init__(self, *args, **kwargs):
+ self.args = args
+ self.kwargs = kwargs
+
+ def exec_module(self, module):
+ return self
+
+
+class LazyLoaderFactoryTests(unittest.TestCase):
+
+ def test_init(self):
+ factory = util.LazyLoader.factory(CollectInit)
+ # E.g. what importlib.machinery.FileFinder instantiates loaders with
+ # plus keyword arguments.
+ lazy_loader = factory('module name', 'module path', kw='kw')
+ loader = lazy_loader.loader
+ self.assertEqual(('module name', 'module path'), loader.args)
+ self.assertEqual({'kw': 'kw'}, loader.kwargs)
+
+ def test_validation(self):
+ # No exec_module(), no lazy loading.
+ with self.assertRaises(TypeError):
+ util.LazyLoader.factory(object)
+
+
+class TestingImporter(abc.MetaPathFinder, abc.Loader):
+
+ module_name = 'lazy_loader_test'
+ mutated_name = 'changed'
+ loaded = None
+ source_code = 'attr = 42; __name__ = {!r}'.format(mutated_name)
+
+ def find_spec(self, name, path, target=None):
+ if name != self.module_name:
+ return None
+ return util.spec_from_loader(name, util.LazyLoader(self))
+
+ def exec_module(self, module):
+ exec(self.source_code, module.__dict__)
+ self.loaded = module
+
+
+class LazyLoaderTests(unittest.TestCase):
+
+ def test_init(self):
+ with self.assertRaises(TypeError):
+ util.LazyLoader(object)
+
+ def new_module(self, source_code=None):
+ loader = TestingImporter()
+ if source_code is not None:
+ loader.source_code = source_code
+ spec = util.spec_from_loader(TestingImporter.module_name,
+ util.LazyLoader(loader))
+ module = spec.loader.create_module(spec)
+ module.__spec__ = spec
+ module.__loader__ = spec.loader
+ spec.loader.exec_module(module)
+ # Module is now lazy.
+ self.assertIsNone(loader.loaded)
+ return module
+
+ def test_e2e(self):
+ # End-to-end test to verify the load is in fact lazy.
+ importer = TestingImporter()
+ assert importer.loaded is None
+ with test_util.uncache(importer.module_name):
+ with test_util.import_state(meta_path=[importer]):
+ module = importlib.import_module(importer.module_name)
+ self.assertIsNone(importer.loaded)
+ # Trigger load.
+ self.assertEqual(module.__loader__, importer)
+ self.assertIsNotNone(importer.loaded)
+ self.assertEqual(module, importer.loaded)
+
+ def test_attr_unchanged(self):
+ # An attribute only mutated as a side-effect of import should not be
+ # changed needlessly.
+ module = self.new_module()
+ self.assertEqual(TestingImporter.mutated_name, module.__name__)
+
+ def test_new_attr(self):
+ # A new attribute should persist.
+ module = self.new_module()
+ module.new_attr = 42
+ self.assertEqual(42, module.new_attr)
+
+ def test_mutated_preexisting_attr(self):
+ # Changing an attribute that already existed on the module --
+ # e.g. __name__ -- should persist.
+ module = self.new_module()
+ module.__name__ = 'bogus'
+ self.assertEqual('bogus', module.__name__)
+
+ def test_mutated_attr(self):
+ # Changing an attribute that comes into existence after an import
+ # should persist.
+ module = self.new_module()
+ module.attr = 6
+ self.assertEqual(6, module.attr)
+
+ def test_delete_eventual_attr(self):
+ # Deleting an attribute should stay deleted.
+ module = self.new_module()
+ del module.attr
+ self.assertFalse(hasattr(module, 'attr'))
+
+ def test_delete_preexisting_attr(self):
+ module = self.new_module()
+ del module.__name__
+ self.assertFalse(hasattr(module, '__name__'))
+
+ def test_module_substitution_error(self):
+ source_code = 'import sys; sys.modules[__name__] = 42'
+ module = self.new_module(source_code)
+ with test_util.uncache(TestingImporter.module_name):
+ with self.assertRaises(ValueError):
+ module.__name__
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/Lib/test/test_inspect.py b/Lib/test/test_inspect.py
index 1ede3b5..7ad190b 100644
--- a/Lib/test/test_inspect.py
+++ b/Lib/test/test_inspect.py
@@ -8,6 +8,7 @@ import linecache
import os
from os.path import normcase
import _pickle
+import pickle
import re
import shutil
import sys
@@ -73,6 +74,7 @@ def generator_function_example(self):
for i in range(2):
yield i
+
class TestPredicates(IsTestBase):
def test_sixteen(self):
count = len([x for x in dir(inspect) if x.startswith('is')])
@@ -1611,6 +1613,17 @@ class TestGetGeneratorState(unittest.TestCase):
self.assertRaises(TypeError, inspect.getgeneratorlocals, (2,3))
+class MySignature(inspect.Signature):
+ # Top-level to make it picklable;
+ # used in test_signature_object_pickle
+ pass
+
+class MyParameter(inspect.Parameter):
+ # Top-level to make it picklable;
+ # used in test_signature_object_pickle
+ pass
+
+
class TestSignatureObject(unittest.TestCase):
@staticmethod
def signature(func):
@@ -1668,6 +1681,37 @@ class TestSignatureObject(unittest.TestCase):
with self.assertRaisesRegex(ValueError, 'follows default argument'):
S((pkd, pk))
+ self.assertTrue(repr(sig).startswith('<Signature'))
+ self.assertTrue('"(po, pk' in repr(sig))
+
+ def test_signature_object_pickle(self):
+ def foo(a, b, *, c:1={}, **kw) -> {42:'ham'}: pass
+ foo_partial = functools.partial(foo, a=1)
+
+ sig = inspect.signature(foo_partial)
+
+ for ver in range(pickle.HIGHEST_PROTOCOL + 1):
+ with self.subTest(pickle_ver=ver, subclass=False):
+ sig_pickled = pickle.loads(pickle.dumps(sig, ver))
+ self.assertEqual(sig, sig_pickled)
+
+ # Test that basic sub-classing works
+ sig = inspect.signature(foo)
+ myparam = MyParameter(name='z', kind=inspect.Parameter.POSITIONAL_ONLY)
+ myparams = collections.OrderedDict(sig.parameters, a=myparam)
+ mysig = MySignature().replace(parameters=myparams.values(),
+ return_annotation=sig.return_annotation)
+ self.assertTrue(isinstance(mysig, MySignature))
+ self.assertTrue(isinstance(mysig.parameters['z'], MyParameter))
+
+ for ver in range(pickle.HIGHEST_PROTOCOL + 1):
+ with self.subTest(pickle_ver=ver, subclass=True):
+ sig_pickled = pickle.loads(pickle.dumps(mysig, ver))
+ self.assertEqual(mysig, sig_pickled)
+ self.assertTrue(isinstance(sig_pickled, MySignature))
+ self.assertTrue(isinstance(sig_pickled.parameters['z'],
+ MyParameter))
+
def test_signature_immutability(self):
def test(a):
pass
@@ -2469,11 +2513,29 @@ class TestSignatureObject(unittest.TestCase):
def bar(pos, *args, c, b, a=42, **kwargs:int): pass
self.assertEqual(inspect.signature(foo), inspect.signature(bar))
- def test_signature_unhashable(self):
+ def test_signature_hashable(self):
+ S = inspect.Signature
+ P = inspect.Parameter
+
def foo(a): pass
- sig = inspect.signature(foo)
+ foo_sig = inspect.signature(foo)
+
+ manual_sig = S(parameters=[P('a', P.POSITIONAL_OR_KEYWORD)])
+
+ self.assertEqual(hash(foo_sig), hash(manual_sig))
+ self.assertNotEqual(hash(foo_sig),
+ hash(manual_sig.replace(return_annotation='spam')))
+
+ def bar(a) -> 1: pass
+ self.assertNotEqual(hash(foo_sig), hash(inspect.signature(bar)))
+
+ def foo(a={}): pass
+ with self.assertRaisesRegex(TypeError, 'unhashable type'):
+ hash(inspect.signature(foo))
+
+ def foo(a) -> {}: pass
with self.assertRaisesRegex(TypeError, 'unhashable type'):
- hash(sig)
+ hash(inspect.signature(foo))
def test_signature_str(self):
def foo(a:int=1, *, b, c=None, **kwargs) -> 42:
@@ -2547,6 +2609,19 @@ class TestSignatureObject(unittest.TestCase):
self.assertEqual(self.signature(Spam.foo),
self.signature(Ham.foo))
+ def test_signature_from_callable_python_obj(self):
+ class MySignature(inspect.Signature): pass
+ def foo(a, *, b:1): pass
+ foo_sig = MySignature.from_callable(foo)
+ self.assertTrue(isinstance(foo_sig, MySignature))
+
+ @unittest.skipIf(MISSING_C_DOCSTRINGS,
+ "Signature information for builtins requires docstrings")
+ def test_signature_from_callable_builtin_obj(self):
+ class MySignature(inspect.Signature): pass
+ sig = MySignature.from_callable(_pickle.Pickler)
+ self.assertTrue(isinstance(sig, MySignature))
+
class TestParameterObject(unittest.TestCase):
def test_signature_parameter_kinds(self):
@@ -2592,6 +2667,16 @@ class TestParameterObject(unittest.TestCase):
p.replace(kind=inspect.Parameter.VAR_POSITIONAL)
self.assertTrue(repr(p).startswith('<Parameter'))
+ self.assertTrue('"a=42"' in repr(p))
+
+ def test_signature_parameter_hashable(self):
+ P = inspect.Parameter
+ foo = P('foo', kind=P.POSITIONAL_ONLY)
+ self.assertEqual(hash(foo), hash(P('foo', kind=P.POSITIONAL_ONLY)))
+ self.assertNotEqual(hash(foo), hash(P('foo', kind=P.POSITIONAL_ONLY,
+ default=42)))
+ self.assertNotEqual(hash(foo),
+ hash(foo.replace(kind=P.VAR_POSITIONAL)))
def test_signature_parameter_equality(self):
P = inspect.Parameter
@@ -2603,13 +2688,6 @@ class TestParameterObject(unittest.TestCase):
self.assertEqual(p, P('foo', default=42,
kind=inspect.Parameter.KEYWORD_ONLY))
- def test_signature_parameter_unhashable(self):
- p = inspect.Parameter('foo', default=42,
- kind=inspect.Parameter.KEYWORD_ONLY)
-
- with self.assertRaisesRegex(TypeError, 'unhashable type'):
- hash(p)
-
def test_signature_parameter_replace(self):
p = inspect.Parameter('foo', default=42,
kind=inspect.Parameter.KEYWORD_ONLY)
@@ -2918,6 +2996,16 @@ class TestBoundArguments(unittest.TestCase):
ba4 = inspect.signature(bar).bind(1)
self.assertNotEqual(ba, ba4)
+ def test_signature_bound_arguments_pickle(self):
+ def foo(a, b, *, c:1={}, **kw) -> {42:'ham'}: pass
+ sig = inspect.signature(foo)
+ ba = sig.bind(20, 30, z={})
+
+ for ver in range(pickle.HIGHEST_PROTOCOL + 1):
+ with self.subTest(pickle_ver=ver):
+ ba_pickled = pickle.loads(pickle.dumps(ba, ver))
+ self.assertEqual(ba, ba_pickled)
+
class TestSignaturePrivateHelpers(unittest.TestCase):
def test_signature_get_bound_param(self):
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 267537f..8e702db 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -2681,6 +2681,34 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertFalse(err)
self.assertEqual("ok", out.decode().strip())
+ def test_read_byteslike(self):
+ r = MemviewBytesIO(b'Just some random string\n')
+ t = self.TextIOWrapper(r, 'utf-8')
+
+ # TextIOwrapper will not read the full string, because
+ # we truncate it to a multiple of the native int size
+ # so that we can construct a more complex memoryview.
+ bytes_val = _to_memoryview(r.getvalue()).tobytes()
+
+ self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
+
+class MemviewBytesIO(io.BytesIO):
+ '''A BytesIO object whose read method returns memoryviews
+ rather than bytes'''
+
+ def read1(self, len_):
+ return _to_memoryview(super().read1(len_))
+
+ def read(self, len_):
+ return _to_memoryview(super().read(len_))
+
+def _to_memoryview(buf):
+ '''Convert bytes-object *buf* to a non-trivial memoryview'''
+
+ arr = array.array('i')
+ idx = len(buf) - len(buf) % arr.itemsize
+ arr.frombytes(buf[:idx])
+ return memoryview(arr)
class CTextIOWrapperTest(TextIOWrapperTest):
io = io
diff --git a/Lib/test/test_ipaddress.py b/Lib/test/test_ipaddress.py
index f2947b9..33adb3b 100644
--- a/Lib/test/test_ipaddress.py
+++ b/Lib/test/test_ipaddress.py
@@ -1593,6 +1593,14 @@ class IpaddrUnitTest(unittest.TestCase):
addr3.exploded)
self.assertEqual('192.168.178.1', addr4.exploded)
+ def testReversePointer(self):
+ addr1 = ipaddress.IPv4Address('127.0.0.1')
+ addr2 = ipaddress.IPv6Address('2001:db8::1')
+ self.assertEqual('1.0.0.127.in-addr.arpa', addr1.reverse_pointer)
+ self.assertEqual('1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.' +
+ 'b.d.0.1.0.0.2.ip6.arpa',
+ addr2.reverse_pointer)
+
def testIntRepresentation(self):
self.assertEqual(16909060, int(self.ipv4_address))
self.assertEqual(42540616829182469433547762482097946625,
diff --git a/Lib/test/test_json/test_tool.py b/Lib/test/test_json/test_tool.py
index 0c39e56..5484a8a 100644
--- a/Lib/test/test_json/test_tool.py
+++ b/Lib/test/test_json/test_tool.py
@@ -55,6 +55,7 @@ class TestTool(unittest.TestCase):
def test_infile_stdout(self):
infile = self._create_infile()
rc, out, err = assert_python_ok('-m', 'json.tool', infile)
+ self.assertEqual(rc, 0)
self.assertEqual(out.splitlines(), self.expect.encode().splitlines())
self.assertEqual(err, b'')
@@ -65,5 +66,12 @@ class TestTool(unittest.TestCase):
self.addCleanup(os.remove, outfile)
with open(outfile, "r") as fp:
self.assertEqual(fp.read(), self.expect)
+ self.assertEqual(rc, 0)
self.assertEqual(out, b'')
self.assertEqual(err, b'')
+
+ def test_help_flag(self):
+ rc, out, err = assert_python_ok('-m', 'json.tool', '-h')
+ self.assertEqual(rc, 0)
+ self.assertTrue(out.startswith(b'usage: '))
+ self.assertEqual(err, b'')
diff --git a/Lib/test/test_math.py b/Lib/test/test_math.py
index 48f84ba..c9f3f16 100644
--- a/Lib/test/test_math.py
+++ b/Lib/test/test_math.py
@@ -422,9 +422,17 @@ class MathTests(unittest.TestCase):
self.assertEqual(math.factorial(i), py_factorial(i))
self.assertRaises(ValueError, math.factorial, -1)
self.assertRaises(ValueError, math.factorial, -1.0)
+ self.assertRaises(ValueError, math.factorial, -10**100)
+ self.assertRaises(ValueError, math.factorial, -1e100)
self.assertRaises(ValueError, math.factorial, math.pi)
- self.assertRaises(OverflowError, math.factorial, sys.maxsize+1)
- self.assertRaises(OverflowError, math.factorial, 10e100)
+
+ # Other implementations may place different upper bounds.
+ @support.cpython_only
+ def testFactorialHugeInputs(self):
+ # Currently raises ValueError for inputs that are too large
+ # to fit into a C long.
+ self.assertRaises(OverflowError, math.factorial, 10**100)
+ self.assertRaises(OverflowError, math.factorial, 1e100)
def testFloor(self):
self.assertRaises(TypeError, math.floor)
diff --git a/Lib/test/test_module.py b/Lib/test/test_module.py
index 1230293..9da3536 100644
--- a/Lib/test/test_module.py
+++ b/Lib/test/test_module.py
@@ -30,6 +30,22 @@ class ModuleTests(unittest.TestCase):
pass
self.assertEqual(foo.__doc__, ModuleType.__doc__)
+ def test_unintialized_missing_getattr(self):
+ # Issue 8297
+ # test the text in the AttributeError of an uninitialized module
+ foo = ModuleType.__new__(ModuleType)
+ self.assertRaisesRegex(
+ AttributeError, "module has no attribute 'not_here'",
+ getattr, foo, "not_here")
+
+ def test_missing_getattr(self):
+ # Issue 8297
+ # test the text in the AttributeError
+ foo = ModuleType("foo")
+ self.assertRaisesRegex(
+ AttributeError, "module 'foo' has no attribute 'not_here'",
+ getattr, foo, "not_here")
+
def test_no_docstring(self):
# Regularly initialized module, no docstring
foo = ModuleType("foo")
@@ -211,6 +227,14 @@ a = A(destroyed)"""
b"len = len",
b"shutil.rmtree = rmtree"})
+ def test_descriptor_errors_propogate(self):
+ class Descr:
+ def __get__(self, o, t):
+ raise RuntimeError
+ class M(ModuleType):
+ melon = Descr()
+ self.assertRaises(RuntimeError, getattr, M("mymod"), "melon")
+
# frozen and namespace module reprs are tested in importlib.
diff --git a/Lib/test/test_operator.py b/Lib/test/test_operator.py
index ab58a98..1bd0391 100644
--- a/Lib/test/test_operator.py
+++ b/Lib/test/test_operator.py
@@ -203,6 +203,15 @@ class OperatorTestCase:
self.assertRaises(TypeError, operator.mul, None, None)
self.assertTrue(operator.mul(5, 2) == 10)
+ def test_matmul(self):
+ operator = self.module
+ self.assertRaises(TypeError, operator.matmul)
+ self.assertRaises(TypeError, operator.matmul, 42, 42)
+ class M:
+ def __matmul__(self, other):
+ return other - 1
+ self.assertEqual(M() @ 42, 41)
+
def test_neg(self):
operator = self.module
self.assertRaises(TypeError, operator.neg)
@@ -416,6 +425,7 @@ class OperatorTestCase:
def __ilshift__ (self, other): return "ilshift"
def __imod__ (self, other): return "imod"
def __imul__ (self, other): return "imul"
+ def __imatmul__ (self, other): return "imatmul"
def __ior__ (self, other): return "ior"
def __ipow__ (self, other): return "ipow"
def __irshift__ (self, other): return "irshift"
@@ -430,6 +440,7 @@ class OperatorTestCase:
self.assertEqual(operator.ilshift (c, 5), "ilshift")
self.assertEqual(operator.imod (c, 5), "imod")
self.assertEqual(operator.imul (c, 5), "imul")
+ self.assertEqual(operator.imatmul (c, 5), "imatmul")
self.assertEqual(operator.ior (c, 5), "ior")
self.assertEqual(operator.ipow (c, 5), "ipow")
self.assertEqual(operator.irshift (c, 5), "irshift")
diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py
index d076fc1..d695a0d 100644
--- a/Lib/test/test_poplib.py
+++ b/Lib/test/test_poplib.py
@@ -349,23 +349,18 @@ class TestPOP3Class(TestCase):
if SUPPORTS_SSL:
+ from test.test_ftplib import SSLConnection
- class DummyPOP3_SSLHandler(DummyPOP3Handler):
+ class DummyPOP3_SSLHandler(SSLConnection, DummyPOP3Handler):
def __init__(self, conn):
asynchat.async_chat.__init__(self, conn)
- ssl_socket = ssl.wrap_socket(self.socket, certfile=CERTFILE,
- server_side=True,
- do_handshake_on_connect=False)
- self.del_channel()
- self.set_socket(ssl_socket)
- # Must try handshake before calling push()
- self.tls_active = True
- self.tls_starting = True
- self._do_tls_handshake()
+ self.secure_connection()
self.set_terminator(b"\r\n")
self.in_buffer = []
self.push('+OK dummy pop3 server ready. <timestamp>')
+ self.tls_active = True
+ self.tls_starting = False
@requires_ssl
diff --git a/Lib/test/test_selectors.py b/Lib/test/test_selectors.py
index 34edd76..8f83c90 100644
--- a/Lib/test/test_selectors.py
+++ b/Lib/test/test_selectors.py
@@ -441,10 +441,18 @@ class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
SELECTOR = getattr(selectors, 'KqueueSelector', None)
+@unittest.skipUnless(hasattr(selectors, 'DevpollSelector'),
+ "Test needs selectors.DevpollSelector")
+class DevpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
+
+ SELECTOR = getattr(selectors, 'DevpollSelector', None)
+
+
+
def test_main():
tests = [DefaultSelectorTestCase, SelectSelectorTestCase,
PollSelectorTestCase, EpollSelectorTestCase,
- KqueueSelectorTestCase]
+ KqueueSelectorTestCase, DevpollSelectorTestCase]
support.run_unittest(*tests)
support.reap_children()
diff --git a/Lib/test/test_set.py b/Lib/test/test_set.py
index bfef621..992a4ce 100644
--- a/Lib/test/test_set.py
+++ b/Lib/test/test_set.py
@@ -929,7 +929,7 @@ class TestBasicOpsString(TestBasicOps, unittest.TestCase):
class TestBasicOpsBytes(TestBasicOps, unittest.TestCase):
def setUp(self):
- self.case = "string set"
+ self.case = "bytes set"
self.values = [b"a", b"b", b"c"]
self.set = set(self.values)
self.dup = set(self.values)
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index a6f2c64..50cae07 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -1,6 +1,7 @@
import unittest
from test import support
from contextlib import closing
+import enum
import gc
import pickle
import select
@@ -39,6 +40,23 @@ def ignoring_eintr(__func, *args, **kwargs):
return None
+class GenericTests(unittest.TestCase):
+
+ @unittest.skipIf(threading is None, "test needs threading module")
+ def test_enums(self):
+ for name in dir(signal):
+ sig = getattr(signal, name)
+ if name in {'SIG_DFL', 'SIG_IGN'}:
+ self.assertIsInstance(sig, signal.Handlers)
+ elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
+ self.assertIsInstance(sig, signal.Sigmasks)
+ elif name.startswith('SIG') and not name.startswith('SIG_'):
+ self.assertIsInstance(sig, signal.Signals)
+ elif name.startswith('CTRL_'):
+ self.assertIsInstance(sig, signal.Signals)
+ self.assertEqual(sys.platform, "win32")
+
+
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class InterProcessSignalTests(unittest.TestCase):
MAX_DURATION = 20 # Entire test should last at most 20 sec.
@@ -195,6 +213,7 @@ class PosixTests(unittest.TestCase):
def test_getsignal(self):
hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
+ self.assertIsInstance(hup, signal.Handlers)
self.assertEqual(signal.getsignal(signal.SIGHUP),
self.trivial_signal_handler)
signal.signal(signal.SIGHUP, hup)
@@ -271,7 +290,7 @@ class WakeupSignalTests(unittest.TestCase):
os.close(read)
os.close(write)
- """.format(signals, ordered, test_body)
+ """.format(tuple(map(int, signals)), ordered, test_body)
assert_python_ok('-c', code)
@@ -604,6 +623,8 @@ class PendingSignalsTests(unittest.TestCase):
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
os.kill(os.getpid(), signum)
pending = signal.sigpending()
+ for sig in pending:
+ assert isinstance(sig, signal.Signals), repr(pending)
if pending != {signum}:
raise Exception('%s != {%s}' % (pending, signum))
try:
@@ -660,6 +681,7 @@ class PendingSignalsTests(unittest.TestCase):
code = '''if 1:
import signal
import sys
+ from signal import Signals
def handler(signum, frame):
1/0
@@ -702,6 +724,7 @@ class PendingSignalsTests(unittest.TestCase):
def test(signum):
signal.alarm(1)
received = signal.sigwait([signum])
+ assert isinstance(received, signal.Signals), received
if received != signum:
raise Exception('received %s, not %s' % (received, signum))
''')
@@ -842,8 +865,14 @@ class PendingSignalsTests(unittest.TestCase):
def kill(signum):
os.kill(os.getpid(), signum)
+ def check_mask(mask):
+ for sig in mask:
+ assert isinstance(sig, signal.Signals), repr(sig)
+
def read_sigmask():
- return signal.pthread_sigmask(signal.SIG_BLOCK, [])
+ sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
+ check_mask(sigmask)
+ return sigmask
signum = signal.SIGUSR1
@@ -852,6 +881,7 @@ class PendingSignalsTests(unittest.TestCase):
# Unblock SIGUSR1 (and copy the old mask) to test our signal handler
old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+ check_mask(old_mask)
try:
kill(signum)
except ZeroDivisionError:
@@ -861,11 +891,13 @@ class PendingSignalsTests(unittest.TestCase):
# Block and then raise SIGUSR1. The signal is blocked: the signal
# handler is not called, and the signal is now pending
- signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+ mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+ check_mask(mask)
kill(signum)
# Check the new mask
blocked = read_sigmask()
+ check_mask(blocked)
if signum not in blocked:
raise Exception("%s not in %s" % (signum, blocked))
if old_mask ^ blocked != {signum}:
@@ -928,7 +960,7 @@ class PendingSignalsTests(unittest.TestCase):
def test_main():
try:
- support.run_unittest(PosixTests, InterProcessSignalTests,
+ support.run_unittest(GenericTests, PosixTests, InterProcessSignalTests,
WakeupFDTests, WakeupSignalTests,
SiginterruptTest, ItimerTest, WindowsSignalTests,
PendingSignalsTests)
diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py
index 0617b30..8e0fde4 100644
--- a/Lib/test/test_socketserver.py
+++ b/Lib/test/test_socketserver.py
@@ -222,38 +222,6 @@ class SocketServerTest(unittest.TestCase):
socketserver.DatagramRequestHandler,
self.dgram_examine)
- @contextlib.contextmanager
- def mocked_select_module(self):
- """Mocks the select.select() call to raise EINTR for first call"""
- old_select = select.select
-
- class MockSelect:
- def __init__(self):
- self.called = 0
-
- def __call__(self, *args):
- self.called += 1
- if self.called == 1:
- # raise the exception on first call
- raise OSError(errno.EINTR, os.strerror(errno.EINTR))
- else:
- # Return real select value for consecutive calls
- return old_select(*args)
-
- select.select = MockSelect()
- try:
- yield select.select
- finally:
- select.select = old_select
-
- def test_InterruptServerSelectCall(self):
- with self.mocked_select_module() as mock_select:
- pid = self.run_server(socketserver.TCPServer,
- socketserver.StreamRequestHandler,
- self.stream_examine)
- # Make sure select was called again:
- self.assertGreater(mock_select.called, 1)
-
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work:
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 2b3de1f..f72fb15 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -86,6 +86,12 @@ def have_verify_flags():
# 0.9.8 or higher
return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
+def utc_offset(): #NOTE: ignore issues like #1647654
+ # local time = utc time + utc offset
+ if time.daylight and time.localtime().tm_isdst > 0:
+ return -time.altzone # seconds
+ return -time.timezone
+
def asn1time(cert_time):
# Some versions of OpenSSL ignore seconds, see #18207
# 0.9.8.i
@@ -134,6 +140,14 @@ class BasicSocketTests(unittest.TestCase):
self.assertIn(ssl.HAS_SNI, {True, False})
self.assertIn(ssl.HAS_ECDH, {True, False})
+ def test_str_for_enums(self):
+ # Make sure that the PROTOCOL_* constants have enum-like string
+ # reprs.
+ proto = ssl.PROTOCOL_SSLv3
+ self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_SSLv3')
+ ctx = ssl.SSLContext(proto)
+ self.assertIs(ctx.protocol, proto)
+
def test_random(self):
v = ssl.RAND_status()
if support.verbose:
@@ -643,6 +657,71 @@ class BasicSocketTests(unittest.TestCase):
ctx.wrap_socket(s)
self.assertEqual(str(cx.exception), "only stream sockets are supported")
+ def cert_time_ok(self, timestring, timestamp):
+ self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
+
+ def cert_time_fail(self, timestring):
+ with self.assertRaises(ValueError):
+ ssl.cert_time_to_seconds(timestring)
+
+ @unittest.skipUnless(utc_offset(),
+ 'local time needs to be different from UTC')
+ def test_cert_time_to_seconds_timezone(self):
+ # Issue #19940: ssl.cert_time_to_seconds() returns wrong
+ # results if local timezone is not UTC
+ self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
+ self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
+
+ def test_cert_time_to_seconds(self):
+ timestring = "Jan 5 09:34:43 2018 GMT"
+ ts = 1515144883.0
+ self.cert_time_ok(timestring, ts)
+ # accept keyword parameter, assert its name
+ self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
+ # accept both %e and %d (space or zero generated by strftime)
+ self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
+ # case-insensitive
+ self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
+ self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
+ self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
+ self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
+ self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
+ self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
+ self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
+ self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
+
+ newyear_ts = 1230768000.0
+ # leap seconds
+ self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
+ # same timestamp
+ self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
+
+ self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
+ # allow 60th second (even if it is not a leap second)
+ self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
+ # allow 2nd leap second for compatibility with time.strptime()
+ self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
+ self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
+
+ # no special treatement for the special value:
+ # 99991231235959Z (rfc 5280)
+ self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
+
+ @support.run_with_locale('LC_ALL', '')
+ def test_cert_time_to_seconds_locale(self):
+ # `cert_time_to_seconds()` should be locale independent
+
+ def local_february_name():
+ return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
+
+ if local_february_name().lower() == 'feb':
+ self.skipTest("locale-specific month name needs to be "
+ "different from C locale")
+
+ # locale-independent
+ self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
+ self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
+
class ContextTests(unittest.TestCase):
@@ -1371,14 +1450,12 @@ class NetworkedTests(unittest.TestCase):
def test_get_server_certificate(self):
def _test_get_server_certificate(host, port, cert=None):
with support.transient_internet(host):
- pem = ssl.get_server_certificate((host, port),
- ssl.PROTOCOL_SSLv23)
+ pem = ssl.get_server_certificate((host, port))
if not pem:
self.fail("No server certificate on %s:%s!" % (host, port))
try:
pem = ssl.get_server_certificate((host, port),
- ssl.PROTOCOL_SSLv23,
ca_certs=CERTFILE)
except ssl.SSLError as x:
#should fail
@@ -1388,7 +1465,6 @@ class NetworkedTests(unittest.TestCase):
self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
pem = ssl.get_server_certificate((host, port),
- ssl.PROTOCOL_SSLv23,
ca_certs=cert)
if not pem:
self.fail("No server certificate on %s:%s!" % (host, port))
@@ -2471,6 +2547,36 @@ else:
s.write(b"over\n")
s.close()
+ def test_nonblocking_send(self):
+ server = ThreadedEchoServer(CERTFILE,
+ certreqs=ssl.CERT_NONE,
+ ssl_version=ssl.PROTOCOL_TLSv1,
+ cacerts=CERTFILE,
+ chatty=True,
+ connectionchatty=False)
+ with server:
+ s = ssl.wrap_socket(socket.socket(),
+ server_side=False,
+ certfile=CERTFILE,
+ ca_certs=CERTFILE,
+ cert_reqs=ssl.CERT_NONE,
+ ssl_version=ssl.PROTOCOL_TLSv1)
+ s.connect((HOST, server.port))
+ s.setblocking(False)
+
+ # If we keep sending data, at some point the buffers
+ # will be full and the call will block
+ buf = bytearray(8192)
+ def fill_buffer():
+ while True:
+ s.send(buf)
+ self.assertRaises((ssl.SSLWantWriteError,
+ ssl.SSLWantReadError), fill_buffer)
+
+ # Now read all the output and discard it
+ s.setblocking(True)
+ s.close()
+
def test_handshake_timeout(self):
# Issue #5103: SSL handshake must respect the socket timeout
server = socket.socket(socket.AF_INET)
diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py
index a68ed08..4eadd4b 100644
--- a/Lib/test/test_sys.py
+++ b/Lib/test/test_sys.py
@@ -635,6 +635,53 @@ class SysModuleTest(unittest.TestCase):
expected = None
self.check_fsencoding(fs_encoding, expected)
+ def c_locale_get_error_handler(self, isolated=False, encoding=None):
+ # Force the POSIX locale
+ env = os.environ.copy()
+ env["LC_ALL"] = "C"
+ code = '\n'.join((
+ 'import sys',
+ 'def dump(name):',
+ ' std = getattr(sys, name)',
+ ' print("%s: %s" % (name, std.errors))',
+ 'dump("stdin")',
+ 'dump("stdout")',
+ 'dump("stderr")',
+ ))
+ args = [sys.executable, "-c", code]
+ if isolated:
+ args.append("-I")
+ elif encoding:
+ env['PYTHONIOENCODING'] = encoding
+ p = subprocess.Popen(args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ env=env,
+ universal_newlines=True)
+ stdout, stderr = p.communicate()
+ return stdout
+
+ def test_c_locale_surrogateescape(self):
+ out = self.c_locale_get_error_handler(isolated=True)
+ self.assertEqual(out,
+ 'stdin: surrogateescape\n'
+ 'stdout: surrogateescape\n'
+ 'stderr: backslashreplace\n')
+
+ # replace the default error handler
+ out = self.c_locale_get_error_handler(encoding=':strict')
+ self.assertEqual(out,
+ 'stdin: strict\n'
+ 'stdout: strict\n'
+ 'stderr: backslashreplace\n')
+
+ # force the encoding
+ out = self.c_locale_get_error_handler(encoding='iso8859-1')
+ self.assertEqual(out,
+ 'stdin: surrogateescape\n'
+ 'stdout: surrogateescape\n'
+ 'stderr: backslashreplace\n')
+
def test_implementation(self):
# This test applies to all implementations equally.
@@ -925,7 +972,7 @@ class SizeofTest(unittest.TestCase):
check(int, s)
# (PyTypeObject + PyNumberMethods + PyMappingMethods +
# PySequenceMethods + PyBufferProcs + 4P)
- s = vsize('P2n15Pl4Pn9Pn11PIP') + struct.calcsize('34P 3P 10P 2P 4P')
+ s = vsize('P2n17Pl4Pn9Pn11PIP') + struct.calcsize('34P 3P 10P 2P 4P')
# Separate block for PyDictKeysObject with 4 entries
s += struct.calcsize("2nPn") + 4*struct.calcsize("n2P")
# class
diff --git a/Lib/test/test_tokenize.py b/Lib/test/test_tokenize.py
index 38611a7..8f74a06 100644
--- a/Lib/test/test_tokenize.py
+++ b/Lib/test/test_tokenize.py
@@ -464,7 +464,7 @@ Additive
Multiplicative
- >>> dump_tokens("x = 1//1*1/5*12%0x12")
+ >>> dump_tokens("x = 1//1*1/5*12%0x12@42")
ENCODING 'utf-8' (0, 0) (0, 0)
NAME 'x' (1, 0) (1, 1)
OP '=' (1, 2) (1, 3)
@@ -479,6 +479,8 @@ Multiplicative
NUMBER '12' (1, 13) (1, 15)
OP '%' (1, 15) (1, 16)
NUMBER '0x12' (1, 16) (1, 20)
+ OP '@' (1, 20) (1, 21)
+ NUMBER '42' (1, 21) (1, 23)
Unary
@@ -1154,6 +1156,7 @@ class TestTokenize(TestCase):
self.assertExactTypeEqual('//', token.DOUBLESLASH)
self.assertExactTypeEqual('//=', token.DOUBLESLASHEQUAL)
self.assertExactTypeEqual('@', token.AT)
+ self.assertExactTypeEqual('@=', token.ATEQUAL)
self.assertExactTypeEqual('a**2+b**2==c**2',
NAME, token.DOUBLESTAR, NUMBER,
diff --git a/Lib/test/test_tuple.py b/Lib/test/test_tuple.py
index e41711c..14c6430 100644
--- a/Lib/test/test_tuple.py
+++ b/Lib/test/test_tuple.py
@@ -201,6 +201,14 @@ class TupleTest(seq_tests.CommonTest):
with self.assertRaises(TypeError):
[3,] + T((1,2))
+ def test_lexicographic_ordering(self):
+ # Issue 21100
+ a = self.type2test([1, 2])
+ b = self.type2test([1, 2, 0])
+ c = self.type2test([1, 3])
+ self.assertLess(a, b)
+ self.assertLess(b, c)
+
def test_main():
support.run_unittest(TupleTest)
diff --git a/Lib/test/test_types.py b/Lib/test/test_types.py
index ec10752..11d9546 100644
--- a/Lib/test/test_types.py
+++ b/Lib/test/test_types.py
@@ -343,6 +343,8 @@ class TypesTests(unittest.TestCase):
self.assertRaises(ValueError, 3 .__format__, ",n")
# can't have ',' with 'c'
self.assertRaises(ValueError, 3 .__format__, ",c")
+ # can't have '#' with 'c'
+ self.assertRaises(ValueError, 3 .__format__, "#c")
# ensure that only int and float type specifiers work
for format_spec in ([chr(x) for x in range(ord('a'), ord('z')+1)] +
diff --git a/Lib/test/test_unicode.py b/Lib/test/test_unicode.py
index 9ae31d1..64e6bf5 100644
--- a/Lib/test/test_unicode.py
+++ b/Lib/test/test_unicode.py
@@ -8,6 +8,7 @@ Written by Marc-Andre Lemburg (mal@lemburg.com).
import _string
import codecs
import itertools
+import operator
import struct
import sys
import unittest
@@ -250,6 +251,7 @@ class UnicodeTest(string_tests.CommonTest,
{ord('a'): None, ord('b'): ''})
self.checkequalnofix('xyyx', 'xzx', 'translate',
{ord('z'): 'yy'})
+
# this needs maketrans()
self.checkequalnofix('abababc', 'abababc', 'translate',
{'b': '<i>'})
@@ -259,6 +261,33 @@ class UnicodeTest(string_tests.CommonTest,
tbl = self.type2test.maketrans('abc', 'xyz', 'd')
self.checkequalnofix('xyzzy', 'abdcdcbdddd', 'translate', tbl)
+ # various tests switching from ASCII to latin1 or the opposite;
+ # same length, remove a letter, or replace with a longer string.
+ self.assertEqual("[a]".translate(str.maketrans('a', 'X')),
+ "[X]")
+ self.assertEqual("[a]".translate(str.maketrans({'a': 'X'})),
+ "[X]")
+ self.assertEqual("[a]".translate(str.maketrans({'a': None})),
+ "[]")
+ self.assertEqual("[a]".translate(str.maketrans({'a': 'XXX'})),
+ "[XXX]")
+ self.assertEqual("[a]".translate(str.maketrans({'a': '\xe9'})),
+ "[\xe9]")
+ self.assertEqual("[a]".translate(str.maketrans({'a': '<\xe9>'})),
+ "[<\xe9>]")
+ self.assertEqual("[\xe9]".translate(str.maketrans({'\xe9': 'a'})),
+ "[a]")
+ self.assertEqual("[\xe9]".translate(str.maketrans({'\xe9': None})),
+ "[]")
+
+ # invalid Unicode characters
+ invalid_char = 0x10ffff+1
+ for before in "a\xe9\u20ac\U0010ffff":
+ mapping = str.maketrans({before: invalid_char})
+ text = "[%s]" % before
+ self.assertRaises(ValueError, text.translate, mapping)
+
+ # errors
self.assertRaises(TypeError, self.type2test.maketrans)
self.assertRaises(ValueError, self.type2test.maketrans, 'abc', 'defg')
self.assertRaises(TypeError, self.type2test.maketrans, 2, 'def')
@@ -1148,20 +1177,20 @@ class UnicodeTest(string_tests.CommonTest,
self.assertEqual('%.2s' % "a\xe9\u20ac", 'a\xe9')
#issue 19995
- class PsuedoInt:
+ class PseudoInt:
def __init__(self, value):
self.value = int(value)
def __int__(self):
return self.value
def __index__(self):
return self.value
- class PsuedoFloat:
+ class PseudoFloat:
def __init__(self, value):
self.value = float(value)
def __int__(self):
return int(self.value)
- pi = PsuedoFloat(3.1415)
- letter_m = PsuedoInt(109)
+ pi = PseudoFloat(3.1415)
+ letter_m = PseudoInt(109)
self.assertEqual('%x' % 42, '2a')
self.assertEqual('%X' % 15, 'F')
self.assertEqual('%o' % 9, '11')
@@ -1170,11 +1199,11 @@ class UnicodeTest(string_tests.CommonTest,
self.assertEqual('%X' % letter_m, '6D')
self.assertEqual('%o' % letter_m, '155')
self.assertEqual('%c' % letter_m, 'm')
- self.assertWarns(DeprecationWarning, '%x'.__mod__, pi),
- self.assertWarns(DeprecationWarning, '%x'.__mod__, 3.14),
- self.assertWarns(DeprecationWarning, '%X'.__mod__, 2.11),
- self.assertWarns(DeprecationWarning, '%o'.__mod__, 1.79),
- self.assertWarns(DeprecationWarning, '%c'.__mod__, pi),
+ self.assertRaisesRegex(TypeError, '%x format: an integer is required, not float', operator.mod, '%x', 3.14),
+ self.assertRaisesRegex(TypeError, '%X format: an integer is required, not float', operator.mod, '%X', 2.11),
+ self.assertRaisesRegex(TypeError, '%o format: an integer is required, not float', operator.mod, '%o', 1.79),
+ self.assertRaisesRegex(TypeError, '%x format: an integer is required, not PseudoFloat', operator.mod, '%x', pi),
+ self.assertRaises(TypeError, operator.mod, '%c', pi),
def test_formatting_with_enum(self):
# issue18780
diff --git a/Lib/test/test_wait3.py b/Lib/test/test_wait3.py
index f6a065d..bb71481 100644
--- a/Lib/test/test_wait3.py
+++ b/Lib/test/test_wait3.py
@@ -18,7 +18,8 @@ class Wait3Test(ForkWait):
# This many iterations can be required, since some previously run
# tests (e.g. test_ctypes) could have spawned a lot of children
# very quickly.
- for i in range(30):
+ deadline = time.monotonic() + 10.0
+ while time.monotonic() <= deadline:
# wait3() shouldn't hang, but some of the buildbots seem to hang
# in the forking tests. This is an attempt to fix the problem.
spid, status, rusage = os.wait3(os.WNOHANG)
diff --git a/Lib/test/test_wait4.py b/Lib/test/test_wait4.py
index 352c11a..b427a9b 100644
--- a/Lib/test/test_wait4.py
+++ b/Lib/test/test_wait4.py
@@ -19,13 +19,14 @@ class Wait4Test(ForkWait):
# Issue #11185: wait4 is broken on AIX and will always return 0
# with WNOHANG.
option = 0
- for i in range(10):
+ deadline = time.monotonic() + 10.0
+ while time.monotonic() <= deadline:
# wait4() shouldn't hang, but some of the buildbots seem to hang
# in the forking tests. This is an attempt to fix the problem.
spid, status, rusage = os.wait4(cpid, option)
if spid == cpid:
break
- time.sleep(1.0)
+ time.sleep(0.1)
self.assertEqual(spid, cpid)
self.assertEqual(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8))
self.assertTrue(rusage)
diff --git a/Lib/test/test_warnings.py b/Lib/test/test_warnings.py
index eec2c24..cf7f747 100644
--- a/Lib/test/test_warnings.py
+++ b/Lib/test/test_warnings.py
@@ -5,7 +5,7 @@ from io import StringIO
import sys
import unittest
from test import support
-from test.script_helper import assert_python_ok
+from test.script_helper import assert_python_ok, assert_python_failure
from test import warning_tests
@@ -748,7 +748,19 @@ class EnvironmentVariableTests(BaseTest):
"import sys; sys.stdout.write(str(sys.warnoptions))",
PYTHONWARNINGS="ignore::DeprecationWarning")
self.assertEqual(stdout,
- b"['ignore::UnicodeWarning', 'ignore::DeprecationWarning']")
+ b"['ignore::DeprecationWarning', 'ignore::UnicodeWarning']")
+
+ def test_conflicting_envvar_and_command_line(self):
+ rc, stdout, stderr = assert_python_failure("-Werror::DeprecationWarning", "-c",
+ "import sys, warnings; sys.stdout.write(str(sys.warnoptions)); "
+ "warnings.warn('Message', DeprecationWarning)",
+ PYTHONWARNINGS="default::DeprecationWarning")
+ self.assertEqual(stdout,
+ b"['default::DeprecationWarning', 'error::DeprecationWarning']")
+ self.assertEqual(stderr.splitlines(),
+ [b"Traceback (most recent call last):",
+ b" File \"<string>\", line 1, in <module>",
+ b"DeprecationWarning: Message"])
@unittest.skipUnless(sys.getfilesystemencoding() != 'ascii',
'requires non-ascii filesystemencoding')
diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py
index 99b3eda..120c54f 100644
--- a/Lib/test/test_xmlrpc.py
+++ b/Lib/test/test_xmlrpc.py
@@ -713,6 +713,23 @@ class SimpleServerTestCase(BaseServerTestCase):
conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye')
conn.close()
+ def test_context_manager(self):
+ with xmlrpclib.ServerProxy(URL) as server:
+ server.add(2, 3)
+ self.assertNotEqual(server('transport')._connection,
+ (None, None))
+ self.assertEqual(server('transport')._connection,
+ (None, None))
+
+ def test_context_manager_method_error(self):
+ try:
+ with xmlrpclib.ServerProxy(URL) as server:
+ server.add(2, "a")
+ except xmlrpclib.Fault:
+ pass
+ self.assertEqual(server('transport')._connection,
+ (None, None))
+
class MultiPathServerTestCase(BaseServerTestCase):
threadFunc = staticmethod(http_multi_server)
@@ -898,6 +915,7 @@ class ServerProxyTestCase(unittest.TestCase):
p = xmlrpclib.ServerProxy(self.url, transport=t)
self.assertEqual(p('transport'), t)
+
# This is a contrived way to make a failure occur on the server side
# in order to test the _send_traceback_header flag on the server
class FailingMessageClass(http.client.HTTPMessage):