diff options
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/crashers/losing_mro_ref.py (renamed from Lib/test/crashers/loosing_mro_ref.py) | 0 | ||||
-rw-r--r-- | Lib/test/crashers/nasty_eq_vs_dict.py | 47 | ||||
-rw-r--r-- | Lib/test/pickletester.py | 99 | ||||
-rwxr-xr-x | Lib/test/regrtest.py | 23 | ||||
-rw-r--r-- | Lib/test/test_base64.py | 5 | ||||
-rw-r--r-- | Lib/test/test_buffer.py | 201 | ||||
-rw-r--r-- | Lib/test/test_dict.py | 22 | ||||
-rw-r--r-- | Lib/test/test_exceptions.py | 2 | ||||
-rw-r--r-- | Lib/test/test_logging.py | 14 | ||||
-rw-r--r-- | Lib/test/test_mailbox.py | 12 | ||||
-rw-r--r-- | Lib/test/test_marshal.py | 30 | ||||
-rw-r--r-- | Lib/test/test_minidom.py | 15 | ||||
-rw-r--r-- | Lib/test/test_multiprocessing.py | 236 | ||||
-rw-r--r-- | Lib/test/test_mutants.py | 291 | ||||
-rw-r--r-- | Lib/test/test_pickle.py | 28 | ||||
-rw-r--r-- | Lib/test/test_pty.py | 19 | ||||
-rw-r--r-- | Lib/test/test_signal.py | 10 | ||||
-rw-r--r-- | Lib/test/test_time.py | 21 | ||||
-rw-r--r-- | Lib/test/test_tokenize.py | 12 | ||||
-rw-r--r-- | Lib/test/test_weakset.py | 56 | ||||
-rw-r--r-- | Lib/test/test_xml_etree.py | 114 | ||||
-rw-r--r-- | Lib/test/test_xml_etree_c.py | 21 |
22 files changed, 868 insertions, 410 deletions
diff --git a/Lib/test/crashers/loosing_mro_ref.py b/Lib/test/crashers/losing_mro_ref.py index b3bcd32..b3bcd32 100644 --- a/Lib/test/crashers/loosing_mro_ref.py +++ b/Lib/test/crashers/losing_mro_ref.py diff --git a/Lib/test/crashers/nasty_eq_vs_dict.py b/Lib/test/crashers/nasty_eq_vs_dict.py deleted file mode 100644 index 85f7caf..0000000 --- a/Lib/test/crashers/nasty_eq_vs_dict.py +++ /dev/null @@ -1,47 +0,0 @@ -# from http://mail.python.org/pipermail/python-dev/2001-June/015239.html - -# if you keep changing a dictionary while looking up a key, you can -# provoke an infinite recursion in C - -# At the time neither Tim nor Michael could be bothered to think of a -# way to fix it. - -class Yuck: - def __init__(self): - self.i = 0 - - def make_dangerous(self): - self.i = 1 - - def __hash__(self): - # direct to slot 4 in table of size 8; slot 12 when size 16 - return 4 + 8 - - def __eq__(self, other): - if self.i == 0: - # leave dict alone - pass - elif self.i == 1: - # fiddle to 16 slots - self.__fill_dict(6) - self.i = 2 - else: - # fiddle to 8 slots - self.__fill_dict(4) - self.i = 1 - - return 1 - - def __fill_dict(self, n): - self.i = 0 - dict.clear() - for i in range(n): - dict[i] = i - dict[self] = "OK!" - -y = Yuck() -dict = {y: "OK!"} - -z = Yuck() -y.make_dangerous() -print(dict[z]) diff --git a/Lib/test/pickletester.py b/Lib/test/pickletester.py index 831306f..1a551c8 100644 --- a/Lib/test/pickletester.py +++ b/Lib/test/pickletester.py @@ -1605,6 +1605,105 @@ class AbstractPicklerUnpicklerObjectTests(unittest.TestCase): self.assertEqual(unpickler.load(), data) +# Tests for dispatch_table attribute + +REDUCE_A = 'reduce_A' + +class AAA(object): + def __reduce__(self): + return str, (REDUCE_A,) + +class BBB(object): + pass + +class AbstractDispatchTableTests(unittest.TestCase): + + def test_default_dispatch_table(self): + # No dispatch_table attribute by default + f = io.BytesIO() + p = self.pickler_class(f, 0) + with self.assertRaises(AttributeError): + p.dispatch_table + self.assertFalse(hasattr(p, 'dispatch_table')) + + def test_class_dispatch_table(self): + # A dispatch_table attribute can be specified class-wide + dt = self.get_dispatch_table() + + class MyPickler(self.pickler_class): + dispatch_table = dt + + def dumps(obj, protocol=None): + f = io.BytesIO() + p = MyPickler(f, protocol) + self.assertEqual(p.dispatch_table, dt) + p.dump(obj) + return f.getvalue() + + self._test_dispatch_table(dumps, dt) + + def test_instance_dispatch_table(self): + # A dispatch_table attribute can also be specified instance-wide + dt = self.get_dispatch_table() + + def dumps(obj, protocol=None): + f = io.BytesIO() + p = self.pickler_class(f, protocol) + p.dispatch_table = dt + self.assertEqual(p.dispatch_table, dt) + p.dump(obj) + return f.getvalue() + + self._test_dispatch_table(dumps, dt) + + def _test_dispatch_table(self, dumps, dispatch_table): + def custom_load_dump(obj): + return pickle.loads(dumps(obj, 0)) + + def default_load_dump(obj): + return pickle.loads(pickle.dumps(obj, 0)) + + # pickling complex numbers using protocol 0 relies on copyreg + # so check pickling a complex number still works + z = 1 + 2j + self.assertEqual(custom_load_dump(z), z) + self.assertEqual(default_load_dump(z), z) + + # modify pickling of complex + REDUCE_1 = 'reduce_1' + def reduce_1(obj): + return str, (REDUCE_1,) + dispatch_table[complex] = reduce_1 + self.assertEqual(custom_load_dump(z), REDUCE_1) + self.assertEqual(default_load_dump(z), z) + + # check picklability of AAA and BBB + a = AAA() + b = BBB() + self.assertEqual(custom_load_dump(a), REDUCE_A) + self.assertIsInstance(custom_load_dump(b), BBB) + self.assertEqual(default_load_dump(a), REDUCE_A) + self.assertIsInstance(default_load_dump(b), BBB) + + # modify pickling of BBB + dispatch_table[BBB] = reduce_1 + self.assertEqual(custom_load_dump(a), REDUCE_A) + self.assertEqual(custom_load_dump(b), REDUCE_1) + self.assertEqual(default_load_dump(a), REDUCE_A) + self.assertIsInstance(default_load_dump(b), BBB) + + # revert pickling of BBB and modify pickling of AAA + REDUCE_2 = 'reduce_2' + def reduce_2(obj): + return str, (REDUCE_2,) + dispatch_table[AAA] = reduce_2 + del dispatch_table[BBB] + self.assertEqual(custom_load_dump(a), REDUCE_2) + self.assertIsInstance(custom_load_dump(b), BBB) + self.assertEqual(default_load_dump(a), REDUCE_A) + self.assertIsInstance(default_load_dump(b), BBB) + + if __name__ == "__main__": # Print some stuff that can be used to rewrite DATA{0,1,2} from pickletools import dis diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py index 871ae61..44d3426 100755 --- a/Lib/test/regrtest.py +++ b/Lib/test/regrtest.py @@ -749,10 +749,10 @@ def main(tests=None, testdir=None, verbose=0, quiet=False, if bad: print(count(len(bad), "test"), "failed:") printlist(bad) - if environment_changed: - print("{} altered the execution environment:".format( - count(len(environment_changed), "test"))) - printlist(environment_changed) + if environment_changed: + print("{} altered the execution environment:".format( + count(len(environment_changed), "test"))) + printlist(environment_changed) if skipped and not quiet: print(count(len(skipped), "test"), "skipped:") printlist(skipped) @@ -970,6 +970,7 @@ class saved_test_environment: 'multiprocessing.process._dangling', 'sysconfig._CONFIG_VARS', 'sysconfig._SCHEMES', 'packaging.command._COMMANDS', 'packaging.database_caches', + 'support.TESTFN', ) def get_sys_argv(self): @@ -1163,6 +1164,20 @@ class saved_test_environment: sysconfig._SCHEMES._sections.clear() sysconfig._SCHEMES._sections.update(saved[2]) + def get_support_TESTFN(self): + if os.path.isfile(support.TESTFN): + result = 'f' + elif os.path.isdir(support.TESTFN): + result = 'd' + else: + result = None + return result + def restore_support_TESTFN(self, saved_value): + if saved_value is None: + if os.path.isfile(support.TESTFN): + os.unlink(support.TESTFN) + elif os.path.isdir(support.TESTFN): + shutil.rmtree(support.TESTFN) def resource_info(self): for name in self.resources: diff --git a/Lib/test/test_base64.py b/Lib/test/test_base64.py index b02f86d..2569476 100644 --- a/Lib/test/test_base64.py +++ b/Lib/test/test_base64.py @@ -2,6 +2,7 @@ import unittest from test import support import base64 import binascii +import os import sys import subprocess @@ -274,6 +275,10 @@ class BaseXYTestCase(unittest.TestCase): class TestMain(unittest.TestCase): + def tearDown(self): + if os.path.exists(support.TESTFN): + os.unlink(support.TESTFN) + def get_output(self, *args, **options): args = (sys.executable, '-m', 'base64') + args return subprocess.check_output(args, **options) diff --git a/Lib/test/test_buffer.py b/Lib/test/test_buffer.py index 25324ef..e0006f2 100644 --- a/Lib/test/test_buffer.py +++ b/Lib/test/test_buffer.py @@ -3373,6 +3373,15 @@ class TestBufferProtocol(unittest.TestCase): del nd m.release() + a = bytearray([1,2,3]) + m = memoryview(a) + nd1 = ndarray(m, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT) + nd2 = ndarray(nd1, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT) + self.assertIs(nd2.obj, m) + self.assertRaises(BufferError, m.release) + del nd1, nd2 + m.release() + # chained views a = bytearray([1,2,3]) m1 = memoryview(a) @@ -3383,6 +3392,17 @@ class TestBufferProtocol(unittest.TestCase): del nd m2.release() + a = bytearray([1,2,3]) + m1 = memoryview(a) + m2 = memoryview(m1) + nd1 = ndarray(m2, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT) + nd2 = ndarray(nd1, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT) + self.assertIs(nd2.obj, m2) + m1.release() + self.assertRaises(BufferError, m2.release) + del nd1, nd2 + m2.release() + # Allow changing layout while buffers are exported. nd = ndarray([1,2,3], shape=[3], flags=ND_VAREXPORT) m1 = memoryview(nd) @@ -3418,11 +3438,182 @@ class TestBufferProtocol(unittest.TestCase): catch22(m1) self.assertEqual(m1[0], ord(b'1')) - # XXX If m1 has exports, raise BufferError. - # x = bytearray(b'123') - # with memoryview(x) as m1: - # ex = ndarray(m1) - # m1[0] == ord(b'1') + x = ndarray(list(range(12)), shape=[2,2,3], format='l') + y = ndarray(x, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT) + z = ndarray(y, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT) + self.assertIs(z.obj, x) + with memoryview(z) as m: + catch22(m) + self.assertEqual(m[0:1].tolist(), [[[0, 1, 2], [3, 4, 5]]]) + + # Test garbage collection. + for flags in (0, ND_REDIRECT): + x = bytearray(b'123') + with memoryview(x) as m1: + del x + y = ndarray(m1, getbuf=PyBUF_FULL_RO, flags=flags) + with memoryview(y) as m2: + del y + z = ndarray(m2, getbuf=PyBUF_FULL_RO, flags=flags) + with memoryview(z) as m3: + del z + catch22(m3) + catch22(m2) + catch22(m1) + self.assertEqual(m1[0], ord(b'1')) + self.assertEqual(m2[1], ord(b'2')) + self.assertEqual(m3[2], ord(b'3')) + del m3 + del m2 + del m1 + + x = bytearray(b'123') + with memoryview(x) as m1: + del x + y = ndarray(m1, getbuf=PyBUF_FULL_RO, flags=flags) + with memoryview(y) as m2: + del y + z = ndarray(m2, getbuf=PyBUF_FULL_RO, flags=flags) + with memoryview(z) as m3: + del z + catch22(m1) + catch22(m2) + catch22(m3) + self.assertEqual(m1[0], ord(b'1')) + self.assertEqual(m2[1], ord(b'2')) + self.assertEqual(m3[2], ord(b'3')) + del m1, m2, m3 + + # memoryview.release() fails if the view has exported buffers. + x = bytearray(b'123') + with self.assertRaises(BufferError): + with memoryview(x) as m: + ex = ndarray(m) + m[0] == ord(b'1') + + def test_memoryview_redirect(self): + + nd = ndarray([1.0 * x for x in range(12)], shape=[12], format='d') + a = array.array('d', [1.0 * x for x in range(12)]) + + for x in (nd, a): + y = ndarray(x, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT) + z = ndarray(y, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT) + m = memoryview(z) + + self.assertIs(y.obj, x) + self.assertIs(z.obj, x) + self.assertIs(m.obj, x) + + self.assertEqual(m, x) + self.assertEqual(m, y) + self.assertEqual(m, z) + + self.assertEqual(m[1:3], x[1:3]) + self.assertEqual(m[1:3], y[1:3]) + self.assertEqual(m[1:3], z[1:3]) + del y, z + self.assertEqual(m[1:3], x[1:3]) + + def test_memoryview_from_static_exporter(self): + + fmt = 'B' + lst = [0,1,2,3,4,5,6,7,8,9,10,11] + + # exceptions + self.assertRaises(TypeError, staticarray, 1, 2, 3) + + # view.obj==x + x = staticarray() + y = memoryview(x) + self.verify(y, obj=x, + itemsize=1, fmt=fmt, readonly=1, + ndim=1, shape=[12], strides=[1], + lst=lst) + for i in range(12): + self.assertEqual(y[i], i) + del x + del y + + x = staticarray() + y = memoryview(x) + del y + del x + + x = staticarray() + y = ndarray(x, getbuf=PyBUF_FULL_RO) + z = ndarray(y, getbuf=PyBUF_FULL_RO) + m = memoryview(z) + self.assertIs(y.obj, x) + self.assertIs(m.obj, z) + self.verify(m, obj=z, + itemsize=1, fmt=fmt, readonly=1, + ndim=1, shape=[12], strides=[1], + lst=lst) + del x, y, z, m + + x = staticarray() + y = ndarray(x, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT) + z = ndarray(y, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT) + m = memoryview(z) + self.assertIs(y.obj, x) + self.assertIs(z.obj, x) + self.assertIs(m.obj, x) + self.verify(m, obj=x, + itemsize=1, fmt=fmt, readonly=1, + ndim=1, shape=[12], strides=[1], + lst=lst) + del x, y, z, m + + # view.obj==NULL + x = staticarray(legacy_mode=True) + y = memoryview(x) + self.verify(y, obj=None, + itemsize=1, fmt=fmt, readonly=1, + ndim=1, shape=[12], strides=[1], + lst=lst) + for i in range(12): + self.assertEqual(y[i], i) + del x + del y + + x = staticarray(legacy_mode=True) + y = memoryview(x) + del y + del x + + x = staticarray(legacy_mode=True) + y = ndarray(x, getbuf=PyBUF_FULL_RO) + z = ndarray(y, getbuf=PyBUF_FULL_RO) + m = memoryview(z) + self.assertIs(y.obj, None) + self.assertIs(m.obj, z) + self.verify(m, obj=z, + itemsize=1, fmt=fmt, readonly=1, + ndim=1, shape=[12], strides=[1], + lst=lst) + del x, y, z, m + + x = staticarray(legacy_mode=True) + y = ndarray(x, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT) + z = ndarray(y, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT) + m = memoryview(z) + # Clearly setting view.obj==NULL is inferior, since it + # messes up the redirection chain: + self.assertIs(y.obj, None) + self.assertIs(z.obj, y) + self.assertIs(m.obj, y) + self.verify(m, obj=y, + itemsize=1, fmt=fmt, readonly=1, + ndim=1, shape=[12], strides=[1], + lst=lst) + del x, y, z, m + + def test_memoryview_getbuffer_undefined(self): + + # getbufferproc does not adhere to the new documentation + nd = ndarray([1,2,3], [3], flags=ND_GETBUF_FAIL|ND_GETBUF_UNDEFINED) + self.assertRaises(BufferError, memoryview, nd) def test_issue_7385(self): x = ndarray([1,2,3], shape=[3], flags=ND_GETBUF_FAIL) diff --git a/Lib/test/test_dict.py b/Lib/test/test_dict.py index d2740a3..15db51d 100644 --- a/Lib/test/test_dict.py +++ b/Lib/test/test_dict.py @@ -379,7 +379,7 @@ class DictTest(unittest.TestCase): x.fail = True self.assertRaises(Exc, d.pop, x) - def test_mutatingiteration(self): + def test_mutating_iteration(self): # changing dict size during iteration d = {} d[1] = 1 @@ -387,6 +387,26 @@ class DictTest(unittest.TestCase): for i in d: d[i+1] = 1 + def test_mutating_lookup(self): + # changing dict during a lookup + class NastyKey: + mutate_dict = None + + def __hash__(self): + # hash collision! + return 1 + + def __eq__(self, other): + if self.mutate_dict: + self.mutate_dict[self] = 1 + return self == other + + d = {} + d[NastyKey()] = 0 + NastyKey.mutate_dict = d + with self.assertRaises(RuntimeError): + d[NastyKey()] = None + def test_repr(self): d = {} self.assertEqual(repr(d), '{}') diff --git a/Lib/test/test_exceptions.py b/Lib/test/test_exceptions.py index 91d85ef..42536d3 100644 --- a/Lib/test/test_exceptions.py +++ b/Lib/test/test_exceptions.py @@ -38,7 +38,7 @@ class ExceptionTests(unittest.TestCase): try: try: import marshal - marshal.loads('') + marshal.loads(b'') except EOFError: pass finally: diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index b239f58..98a2819 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -3651,11 +3651,14 @@ class TimedRotatingFileHandlerTest(BaseFileTest): def test_rollover(self): fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S', backupCount=1) - r = logging.makeLogRecord({'msg': 'testing'}) - fh.emit(r) + fmt = logging.Formatter('%(asctime)s %(message)s') + fh.setFormatter(fmt) + r1 = logging.makeLogRecord({'msg': 'testing - initial'}) + fh.emit(r1) self.assertLogFile(self.fn) - time.sleep(1.01) # just a little over a second ... - fh.emit(r) + time.sleep(1.1) # a little over a second ... + r2 = logging.makeLogRecord({'msg': 'testing - after delay'}) + fh.emit(r2) fh.close() # At this point, we should have a recent rotated file which we # can test for the existence of. However, in practice, on some @@ -3682,7 +3685,8 @@ class TimedRotatingFileHandlerTest(BaseFileTest): print('The only matching files are: %s' % files, file=sys.stderr) for f in files: print('Contents of %s:' % f) - with open(f, 'r') as tf: + path = os.path.join(dn, f) + with open(path, 'r') as tf: print(tf.read()) self.assertTrue(found, msg=msg) diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py index 4bb05a4..212ceb9 100644 --- a/Lib/test/test_mailbox.py +++ b/Lib/test/test_mailbox.py @@ -7,6 +7,7 @@ import email import email.message import re import io +import shutil import tempfile from test import support import unittest @@ -38,12 +39,7 @@ class TestBase(unittest.TestCase): def _delete_recursively(self, target): # Delete a file or delete a directory recursively if os.path.isdir(target): - for path, dirs, files in os.walk(target, topdown=False): - for name in files: - os.remove(os.path.join(path, name)) - for name in dirs: - os.rmdir(os.path.join(path, name)) - os.rmdir(target) + shutil.rmtree(target) elif os.path.exists(target): os.remove(target) @@ -2028,6 +2024,10 @@ class MaildirTestCase(unittest.TestCase): def setUp(self): # create a new maildir mailbox to work with: self._dir = support.TESTFN + if os.path.isdir(self._dir): + shutil.rmtree(self._dir) + elif os.path.isfile(self._dir): + os.unlink(self._dir) os.mkdir(self._dir) os.mkdir(os.path.join(self._dir, "cur")) os.mkdir(os.path.join(self._dir, "tmp")) diff --git a/Lib/test/test_marshal.py b/Lib/test/test_marshal.py index dec8129..83c348c 100644 --- a/Lib/test/test_marshal.py +++ b/Lib/test/test_marshal.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 from test import support +import array import marshal import sys import unittest @@ -154,6 +155,27 @@ class ContainerTestCase(unittest.TestCase, HelperMixin): for constructor in (set, frozenset): self.helper(constructor(self.d.keys())) + +class BufferTestCase(unittest.TestCase, HelperMixin): + + def test_bytearray(self): + b = bytearray(b"abc") + self.helper(b) + new = marshal.loads(marshal.dumps(b)) + self.assertEqual(type(new), bytes) + + def test_memoryview(self): + b = memoryview(b"abc") + self.helper(b) + new = marshal.loads(marshal.dumps(b)) + self.assertEqual(type(new), bytes) + + def test_array(self): + a = array.array('B', b"abc") + new = marshal.loads(marshal.dumps(a)) + self.assertEqual(new, b"abc") + + class BugsTestCase(unittest.TestCase): def test_bug_5888452(self): # Simple-minded check for SF 588452: Debug build crashes @@ -179,7 +201,7 @@ class BugsTestCase(unittest.TestCase): pass def test_loads_recursion(self): - s = 'c' + ('X' * 4*4) + '{' * 2**20 + s = b'c' + (b'X' * 4*4) + b'{' * 2**20 self.assertRaises(ValueError, marshal.loads, s) def test_recursion_limit(self): @@ -252,6 +274,11 @@ class BugsTestCase(unittest.TestCase): finally: support.unlink(support.TESTFN) + def test_loads_reject_unicode_strings(self): + # Issue #14177: marshal.loads() should not accept unicode strings + unicode_string = 'T' + self.assertRaises(TypeError, marshal.loads, unicode_string) + def test_main(): support.run_unittest(IntTestCase, @@ -260,6 +287,7 @@ def test_main(): CodeTestCase, ContainerTestCase, ExceptionTestCase, + BufferTestCase, BugsTestCase) if __name__ == "__main__": diff --git a/Lib/test/test_minidom.py b/Lib/test/test_minidom.py index 752a840..cc4c95b 100644 --- a/Lib/test/test_minidom.py +++ b/Lib/test/test_minidom.py @@ -362,11 +362,17 @@ class MinidomTest(unittest.TestCase): def testGetAttrList(self): pass - def testGetAttrValues(self): pass + def testGetAttrValues(self): + pass - def testGetAttrLength(self): pass + def testGetAttrLength(self): + pass - def testGetAttribute(self): pass + def testGetAttribute(self): + dom = Document() + child = dom.appendChild( + dom.createElementNS("http://www.python.org", "python:abc")) + self.assertEqual(child.getAttribute('missing'), '') def testGetAttributeNS(self): dom = Document() @@ -378,6 +384,9 @@ class MinidomTest(unittest.TestCase): 'http://www.python.org') self.assertEqual(child.getAttributeNS("http://www.w3.org", "other"), '') + child2 = child.appendChild(dom.createElement('abc')) + self.assertEqual(child2.getAttributeNS("http://www.python.org", "missing"), + '') def testGetAttributeNode(self): pass diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index f141bd4..0db6352 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -1811,6 +1811,84 @@ class _TestListenerClient(BaseTestCase): p.join() l.close() +class _TestPoll(unittest.TestCase): + + ALLOWED_TYPES = ('processes', 'threads') + + def test_empty_string(self): + a, b = self.Pipe() + self.assertEqual(a.poll(), False) + b.send_bytes(b'') + self.assertEqual(a.poll(), True) + self.assertEqual(a.poll(), True) + + @classmethod + def _child_strings(cls, conn, strings): + for s in strings: + time.sleep(0.1) + conn.send_bytes(s) + conn.close() + + def test_strings(self): + strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') + a, b = self.Pipe() + p = self.Process(target=self._child_strings, args=(b, strings)) + p.start() + + for s in strings: + for i in range(200): + if a.poll(0.01): + break + x = a.recv_bytes() + self.assertEqual(s, x) + + p.join() + + @classmethod + def _child_boundaries(cls, r): + # Polling may "pull" a message in to the child process, but we + # don't want it to pull only part of a message, as that would + # corrupt the pipe for any other processes which might later + # read from it. + r.poll(5) + + def test_boundaries(self): + r, w = self.Pipe(False) + p = self.Process(target=self._child_boundaries, args=(r,)) + p.start() + time.sleep(2) + L = [b"first", b"second"] + for obj in L: + w.send_bytes(obj) + w.close() + p.join() + self.assertIn(r.recv_bytes(), L) + + @classmethod + def _child_dont_merge(cls, b): + b.send_bytes(b'a') + b.send_bytes(b'b') + b.send_bytes(b'cd') + + def test_dont_merge(self): + a, b = self.Pipe() + self.assertEqual(a.poll(0.0), False) + self.assertEqual(a.poll(0.1), False) + + p = self.Process(target=self._child_dont_merge, args=(b,)) + p.start() + + self.assertEqual(a.recv_bytes(), b'a') + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.recv_bytes(), b'b') + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.poll(0.0), True) + self.assertEqual(a.recv_bytes(), b'cd') + + p.join() + # # Test of sending connection and socket objects between processes # @@ -2404,8 +2482,164 @@ class TestStdinBadfiledescriptor(unittest.TestCase): flike.flush() assert sio.getvalue() == 'foo' + +class TestWait(unittest.TestCase): + + @classmethod + def _child_test_wait(cls, w, slow): + for i in range(10): + if slow: + time.sleep(random.random()*0.1) + w.send((i, os.getpid())) + w.close() + + def test_wait(self, slow=False): + from multiprocessing.connection import wait + readers = [] + procs = [] + messages = [] + + for i in range(4): + r, w = multiprocessing.Pipe(duplex=False) + p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) + p.daemon = True + p.start() + w.close() + readers.append(r) + procs.append(p) + self.addCleanup(p.join) + + while readers: + for r in wait(readers): + try: + msg = r.recv() + except EOFError: + readers.remove(r) + r.close() + else: + messages.append(msg) + + messages.sort() + expected = sorted((i, p.pid) for i in range(10) for p in procs) + self.assertEqual(messages, expected) + + @classmethod + def _child_test_wait_socket(cls, address, slow): + s = socket.socket() + s.connect(address) + for i in range(10): + if slow: + time.sleep(random.random()*0.1) + s.sendall(('%s\n' % i).encode('ascii')) + s.close() + + def test_wait_socket(self, slow=False): + from multiprocessing.connection import wait + l = socket.socket() + l.bind(('', 0)) + l.listen(4) + addr = ('localhost', l.getsockname()[1]) + readers = [] + procs = [] + dic = {} + + for i in range(4): + p = multiprocessing.Process(target=self._child_test_wait_socket, + args=(addr, slow)) + p.daemon = True + p.start() + procs.append(p) + self.addCleanup(p.join) + + for i in range(4): + r, _ = l.accept() + readers.append(r) + dic[r] = [] + l.close() + + while readers: + for r in wait(readers): + msg = r.recv(32) + if not msg: + readers.remove(r) + r.close() + else: + dic[r].append(msg) + + expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') + for v in dic.values(): + self.assertEqual(b''.join(v), expected) + + def test_wait_slow(self): + self.test_wait(True) + + def test_wait_socket_slow(self): + self.test_wait(True) + + def test_wait_timeout(self): + from multiprocessing.connection import wait + + expected = 1 + a, b = multiprocessing.Pipe() + + start = time.time() + res = wait([a, b], 1) + delta = time.time() - start + + self.assertEqual(res, []) + self.assertLess(delta, expected + 0.2) + self.assertGreater(delta, expected - 0.2) + + b.send(None) + + start = time.time() + res = wait([a, b], 1) + delta = time.time() - start + + self.assertEqual(res, [a]) + self.assertLess(delta, 0.2) + + def test_wait_integer(self): + from multiprocessing.connection import wait + + expected = 5 + a, b = multiprocessing.Pipe() + p = multiprocessing.Process(target=time.sleep, args=(expected,)) + + p.start() + self.assertIsInstance(p.sentinel, int) + + start = time.time() + res = wait([a, p.sentinel, b], expected + 20) + delta = time.time() - start + + self.assertEqual(res, [p.sentinel]) + self.assertLess(delta, expected + 1) + self.assertGreater(delta, expected - 1) + + a.send(None) + + start = time.time() + res = wait([a, p.sentinel, b], 20) + delta = time.time() - start + + self.assertEqual(res, [p.sentinel, b]) + self.assertLess(delta, 0.2) + + b.send(None) + + start = time.time() + res = wait([a, p.sentinel, b], 20) + delta = time.time() - start + + self.assertEqual(res, [a, p.sentinel, b]) + self.assertLess(delta, 0.2) + + p.join() + + testcases_other = [OtherTest, TestInvalidHandle, TestInitializers, - TestStdinBadfiledescriptor] + TestStdinBadfiledescriptor, TestWait] # # diff --git a/Lib/test/test_mutants.py b/Lib/test/test_mutants.py deleted file mode 100644 index b43fa47..0000000 --- a/Lib/test/test_mutants.py +++ /dev/null @@ -1,291 +0,0 @@ -from test.support import verbose, TESTFN -import random -import os - -# From SF bug #422121: Insecurities in dict comparison. - -# Safety of code doing comparisons has been an historical Python weak spot. -# The problem is that comparison of structures written in C *naturally* -# wants to hold on to things like the size of the container, or "the -# biggest" containee so far, across a traversal of the container; but -# code to do containee comparisons can call back into Python and mutate -# the container in arbitrary ways while the C loop is in midstream. If the -# C code isn't extremely paranoid about digging things out of memory on -# each trip, and artificially boosting refcounts for the duration, anything -# from infinite loops to OS crashes can result (yes, I use Windows <wink>). -# -# The other problem is that code designed to provoke a weakness is usually -# white-box code, and so catches only the particular vulnerabilities the -# author knew to protect against. For example, Python's list.sort() code -# went thru many iterations as one "new" vulnerability after another was -# discovered. -# -# So the dict comparison test here uses a black-box approach instead, -# generating dicts of various sizes at random, and performing random -# mutations on them at random times. This proved very effective, -# triggering at least six distinct failure modes the first 20 times I -# ran it. Indeed, at the start, the driver never got beyond 6 iterations -# before the test died. - -# The dicts are global to make it easy to mutate tham from within functions. -dict1 = {} -dict2 = {} - -# The current set of keys in dict1 and dict2. These are materialized as -# lists to make it easy to pick a dict key at random. -dict1keys = [] -dict2keys = [] - -# Global flag telling maybe_mutate() whether to *consider* mutating. -mutate = 0 - -# If global mutate is true, consider mutating a dict. May or may not -# mutate a dict even if mutate is true. If it does decide to mutate a -# dict, it picks one of {dict1, dict2} at random, and deletes a random -# entry from it; or, more rarely, adds a random element. - -def maybe_mutate(): - global mutate - if not mutate: - return - if random.random() < 0.5: - return - - if random.random() < 0.5: - target, keys = dict1, dict1keys - else: - target, keys = dict2, dict2keys - - if random.random() < 0.2: - # Insert a new key. - mutate = 0 # disable mutation until key inserted - while 1: - newkey = Horrid(random.randrange(100)) - if newkey not in target: - break - target[newkey] = Horrid(random.randrange(100)) - keys.append(newkey) - mutate = 1 - - elif keys: - # Delete a key at random. - mutate = 0 # disable mutation until key deleted - i = random.randrange(len(keys)) - key = keys[i] - del target[key] - del keys[i] - mutate = 1 - -# A horrid class that triggers random mutations of dict1 and dict2 when -# instances are compared. - -class Horrid: - def __init__(self, i): - # Comparison outcomes are determined by the value of i. - self.i = i - - # An artificial hashcode is selected at random so that we don't - # have any systematic relationship between comparison outcomes - # (based on self.i and other.i) and relative position within the - # hash vector (based on hashcode). - # XXX This is no longer effective. - ##self.hashcode = random.randrange(1000000000) - - def __hash__(self): - return 42 - return self.hashcode - - def __eq__(self, other): - maybe_mutate() # The point of the test. - return self.i == other.i - - def __ne__(self, other): - raise RuntimeError("I didn't expect some kind of Spanish inquisition!") - - __lt__ = __le__ = __gt__ = __ge__ = __ne__ - - def __repr__(self): - return "Horrid(%d)" % self.i - -# Fill dict d with numentries (Horrid(i), Horrid(j)) key-value pairs, -# where i and j are selected at random from the candidates list. -# Return d.keys() after filling. - -def fill_dict(d, candidates, numentries): - d.clear() - for i in range(numentries): - d[Horrid(random.choice(candidates))] = \ - Horrid(random.choice(candidates)) - return list(d.keys()) - -# Test one pair of randomly generated dicts, each with n entries. -# Note that dict comparison is trivial if they don't have the same number -# of entires (then the "shorter" dict is instantly considered to be the -# smaller one, without even looking at the entries). - -def test_one(n): - global mutate, dict1, dict2, dict1keys, dict2keys - - # Fill the dicts without mutating them. - mutate = 0 - dict1keys = fill_dict(dict1, range(n), n) - dict2keys = fill_dict(dict2, range(n), n) - - # Enable mutation, then compare the dicts so long as they have the - # same size. - mutate = 1 - if verbose: - print("trying w/ lengths", len(dict1), len(dict2), end=' ') - while dict1 and len(dict1) == len(dict2): - if verbose: - print(".", end=' ') - c = dict1 == dict2 - if verbose: - print() - -# Run test_one n times. At the start (before the bugs were fixed), 20 -# consecutive runs of this test each blew up on or before the sixth time -# test_one was run. So n doesn't have to be large to get an interesting -# test. -# OTOH, calling with large n is also interesting, to ensure that the fixed -# code doesn't hold on to refcounts *too* long (in which case memory would -# leak). - -def test(n): - for i in range(n): - test_one(random.randrange(1, 100)) - -# See last comment block for clues about good values for n. -test(100) - -########################################################################## -# Another segfault bug, distilled by Michael Hudson from a c.l.py post. - -class Child: - def __init__(self, parent): - self.__dict__['parent'] = parent - def __getattr__(self, attr): - self.parent.a = 1 - self.parent.b = 1 - self.parent.c = 1 - self.parent.d = 1 - self.parent.e = 1 - self.parent.f = 1 - self.parent.g = 1 - self.parent.h = 1 - self.parent.i = 1 - return getattr(self.parent, attr) - -class Parent: - def __init__(self): - self.a = Child(self) - -# Hard to say what this will print! May vary from time to time. But -# we're specifically trying to test the tp_print slot here, and this is -# the clearest way to do it. We print the result to a temp file so that -# the expected-output file doesn't need to change. - -f = open(TESTFN, "w") -print(Parent().__dict__, file=f) -f.close() -os.unlink(TESTFN) - -########################################################################## -# And another core-dumper from Michael Hudson. - -dict = {} - -# Force dict to malloc its table. -for i in range(1, 10): - dict[i] = i - -f = open(TESTFN, "w") - -class Machiavelli: - def __repr__(self): - dict.clear() - - # Michael sez: "doesn't crash without this. don't know why." - # Tim sez: "luck of the draw; crashes with or without for me." - print(file=f) - - return repr("machiavelli") - - def __hash__(self): - return 0 - -dict[Machiavelli()] = Machiavelli() - -print(str(dict), file=f) -f.close() -os.unlink(TESTFN) -del f, dict - - -########################################################################## -# And another core-dumper from Michael Hudson. - -dict = {} - -# let's force dict to malloc its table -for i in range(1, 10): - dict[i] = i - -class Machiavelli2: - def __eq__(self, other): - dict.clear() - return 1 - - def __hash__(self): - return 0 - -dict[Machiavelli2()] = Machiavelli2() - -try: - dict[Machiavelli2()] -except KeyError: - pass - -del dict - -########################################################################## -# And another core-dumper from Michael Hudson. - -dict = {} - -# let's force dict to malloc its table -for i in range(1, 10): - dict[i] = i - -class Machiavelli3: - def __init__(self, id): - self.id = id - - def __eq__(self, other): - if self.id == other.id: - dict.clear() - return 1 - else: - return 0 - - def __repr__(self): - return "%s(%s)"%(self.__class__.__name__, self.id) - - def __hash__(self): - return 0 - -dict[Machiavelli3(1)] = Machiavelli3(0) -dict[Machiavelli3(2)] = Machiavelli3(0) - -f = open(TESTFN, "w") -try: - try: - print(dict[Machiavelli3(2)], file=f) - except KeyError: - pass -finally: - f.close() - os.unlink(TESTFN) - -del dict -del dict1, dict2, dict1keys, dict2keys diff --git a/Lib/test/test_pickle.py b/Lib/test/test_pickle.py index 9da2cae..f52d4bd 100644 --- a/Lib/test/test_pickle.py +++ b/Lib/test/test_pickle.py @@ -1,5 +1,6 @@ import pickle import io +import collections from test import support @@ -7,6 +8,7 @@ from test.pickletester import AbstractPickleTests from test.pickletester import AbstractPickleModuleTests from test.pickletester import AbstractPersistentPicklerTests from test.pickletester import AbstractPicklerUnpicklerObjectTests +from test.pickletester import AbstractDispatchTableTests from test.pickletester import BigmemPickleTests try: @@ -80,6 +82,18 @@ class PyPicklerUnpicklerObjectTests(AbstractPicklerUnpicklerObjectTests): unpickler_class = pickle._Unpickler +class PyDispatchTableTests(AbstractDispatchTableTests): + pickler_class = pickle._Pickler + def get_dispatch_table(self): + return pickle.dispatch_table.copy() + + +class PyChainDispatchTableTests(AbstractDispatchTableTests): + pickler_class = pickle._Pickler + def get_dispatch_table(self): + return collections.ChainMap({}, pickle.dispatch_table) + + if has_c_implementation: class CPicklerTests(PyPicklerTests): pickler = _pickle.Pickler @@ -101,14 +115,26 @@ if has_c_implementation: pickler_class = _pickle.Pickler unpickler_class = _pickle.Unpickler + class CDispatchTableTests(AbstractDispatchTableTests): + pickler_class = pickle.Pickler + def get_dispatch_table(self): + return pickle.dispatch_table.copy() + + class CChainDispatchTableTests(AbstractDispatchTableTests): + pickler_class = pickle.Pickler + def get_dispatch_table(self): + return collections.ChainMap({}, pickle.dispatch_table) + def test_main(): - tests = [PickleTests, PyPicklerTests, PyPersPicklerTests] + tests = [PickleTests, PyPicklerTests, PyPersPicklerTests, + PyDispatchTableTests, PyChainDispatchTableTests] if has_c_implementation: tests.extend([CPicklerTests, CPersPicklerTests, CDumpPickle_LoadPickle, DumpPickle_CLoadPickle, PyPicklerUnpicklerObjectTests, CPicklerUnpicklerObjectTests, + CDispatchTableTests, CChainDispatchTableTests, InMemoryPickleTests]) support.run_unittest(*tests) support.run_doctest(pickle) diff --git a/Lib/test/test_pty.py b/Lib/test/test_pty.py index 4f1251c..ef95268 100644 --- a/Lib/test/test_pty.py +++ b/Lib/test/test_pty.py @@ -205,6 +205,7 @@ class SmallPtyTests(unittest.TestCase): self.orig_stdout_fileno = pty.STDOUT_FILENO self.orig_pty_select = pty.select self.fds = [] # A list of file descriptors to close. + self.files = [] self.select_rfds_lengths = [] self.select_rfds_results = [] @@ -212,10 +213,15 @@ class SmallPtyTests(unittest.TestCase): pty.STDIN_FILENO = self.orig_stdin_fileno pty.STDOUT_FILENO = self.orig_stdout_fileno pty.select = self.orig_pty_select + for file in self.files: + try: + file.close() + except OSError: + pass for fd in self.fds: try: os.close(fd) - except: + except OSError: pass def _pipe(self): @@ -223,6 +229,11 @@ class SmallPtyTests(unittest.TestCase): self.fds.extend(pipe_fds) return pipe_fds + def _socketpair(self): + socketpair = socket.socketpair() + self.files.extend(socketpair) + return socketpair + def _mock_select(self, rfds, wfds, xfds): # This will raise IndexError when no more expected calls exist. self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds)) @@ -234,9 +245,8 @@ class SmallPtyTests(unittest.TestCase): pty.STDOUT_FILENO = mock_stdout_fd mock_stdin_fd, write_to_stdin_fd = self._pipe() pty.STDIN_FILENO = mock_stdin_fd - socketpair = socket.socketpair() + socketpair = self._socketpair() masters = [s.fileno() for s in socketpair] - self.fds.extend(masters) # Feed data. Smaller than PIPEBUF. These writes will not block. os.write(masters[1], b'from master') @@ -263,9 +273,8 @@ class SmallPtyTests(unittest.TestCase): pty.STDOUT_FILENO = mock_stdout_fd mock_stdin_fd, write_to_stdin_fd = self._pipe() pty.STDIN_FILENO = mock_stdin_fd - socketpair = socket.socketpair() + socketpair = self._socketpair() masters = [s.fileno() for s in socketpair] - self.fds.extend(masters) os.close(masters[1]) socketpair[1].close() diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index fdeb4c2..6be259b 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -662,7 +662,7 @@ class PendingSignalsTests(unittest.TestCase): self.wait_helper(signal.SIGALRM, ''' def test(signum): signal.alarm(1) - info = signal.sigtimedwait([signum], (10, 1000)) + info = signal.sigtimedwait([signum], 10.1000) if info.si_signo != signum: raise Exception('info.si_signo != %s' % signum) ''') @@ -675,7 +675,7 @@ class PendingSignalsTests(unittest.TestCase): def test(signum): import os os.kill(os.getpid(), signum) - info = signal.sigtimedwait([signum], (0, 0)) + info = signal.sigtimedwait([signum], 0) if info.si_signo != signum: raise Exception('info.si_signo != %s' % signum) ''') @@ -685,7 +685,7 @@ class PendingSignalsTests(unittest.TestCase): def test_sigtimedwait_timeout(self): self.wait_helper(signal.SIGALRM, ''' def test(signum): - received = signal.sigtimedwait([signum], (1, 0)) + received = signal.sigtimedwait([signum], 1.0) if received is not None: raise Exception("received=%r" % (received,)) ''') @@ -694,9 +694,7 @@ class PendingSignalsTests(unittest.TestCase): 'need signal.sigtimedwait()') def test_sigtimedwait_negative_timeout(self): signum = signal.SIGALRM - self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, -1)) - self.assertRaises(ValueError, signal.sigtimedwait, [signum], (0, -1)) - self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, 0)) + self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0) @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), 'need signal.sigwaitinfo()') diff --git a/Lib/test/test_time.py b/Lib/test/test_time.py index a89c511..26492c1 100644 --- a/Lib/test/test_time.py +++ b/Lib/test/test_time.py @@ -497,12 +497,31 @@ class TestStrftime4dyear(_TestStrftimeYear, _Test4dYear): pass +class TestPytime(unittest.TestCase): + def test_timespec(self): + from _testcapi import pytime_object_to_timespec + for obj, timespec in ( + (0, (0, 0)), + (-1, (-1, 0)), + (-1.0, (-1, 0)), + (-1e-9, (-1, 999999999)), + (-1.2, (-2, 800000000)), + (1.123456789, (1, 123456789)), + ): + self.assertEqual(pytime_object_to_timespec(obj), timespec) + + for invalid in (-(2 ** 100), -(2.0 ** 100.0), 2 ** 100, 2.0 ** 100.0): + self.assertRaises(OverflowError, pytime_object_to_timespec, invalid) + + + def test_main(): support.run_unittest( TimeTestCase, TestLocale, TestAsctime4dyear, - TestStrftime4dyear) + TestStrftime4dyear, + TestPytime) if __name__ == "__main__": test_main() diff --git a/Lib/test/test_tokenize.py b/Lib/test/test_tokenize.py index dce3c6e..db87e11 100644 --- a/Lib/test/test_tokenize.py +++ b/Lib/test/test_tokenize.py @@ -563,6 +563,18 @@ Non-ascii identifiers NAME 'grün' (2, 0) (2, 4) OP '=' (2, 5) (2, 6) STRING "'green'" (2, 7) (2, 14) + +Legacy unicode literals: + + >>> dump_tokens("Örter = u'places'\\ngrün = UR'green'") + ENCODING 'utf-8' (0, 0) (0, 0) + NAME 'Örter' (1, 0) (1, 5) + OP '=' (1, 6) (1, 7) + STRING "u'places'" (1, 8) (1, 17) + NEWLINE '\\n' (1, 17) (1, 18) + NAME 'grün' (2, 0) (2, 4) + OP '=' (2, 5) (2, 6) + STRING "UR'green'" (2, 7) (2, 16) """ from test import support diff --git a/Lib/test/test_weakset.py b/Lib/test/test_weakset.py index 35db7a6..4d3878f 100644 --- a/Lib/test/test_weakset.py +++ b/Lib/test/test_weakset.py @@ -28,6 +28,12 @@ class TestWeakSet(unittest.TestCase): # need to keep references to them self.items = [ustr(c) for c in ('a', 'b', 'c')] self.items2 = [ustr(c) for c in ('x', 'y', 'z')] + self.ab_items = [ustr(c) for c in 'ab'] + self.abcde_items = [ustr(c) for c in 'abcde'] + self.def_items = [ustr(c) for c in 'def'] + self.ab_weakset = WeakSet(self.ab_items) + self.abcde_weakset = WeakSet(self.abcde_items) + self.def_weakset = WeakSet(self.def_items) self.letters = [ustr(c) for c in string.ascii_letters] self.s = WeakSet(self.items) self.d = dict.fromkeys(self.items) @@ -71,6 +77,11 @@ class TestWeakSet(unittest.TestCase): x = WeakSet(self.items + self.items2) c = C(self.items2) self.assertEqual(self.s.union(c), x) + del c + self.assertEqual(len(u), len(self.items) + len(self.items2)) + self.items2.pop() + gc.collect() + self.assertEqual(len(u), len(self.items) + len(self.items2)) def test_or(self): i = self.s.union(self.items2) @@ -78,14 +89,19 @@ class TestWeakSet(unittest.TestCase): self.assertEqual(self.s | frozenset(self.items2), i) def test_intersection(self): - i = self.s.intersection(self.items2) + s = WeakSet(self.letters) + i = s.intersection(self.items2) for c in self.letters: - self.assertEqual(c in i, c in self.d and c in self.items2) - self.assertEqual(self.s, WeakSet(self.items)) + self.assertEqual(c in i, c in self.items2 and c in self.letters) + self.assertEqual(s, WeakSet(self.letters)) self.assertEqual(type(i), WeakSet) for C in set, frozenset, dict.fromkeys, list, tuple: x = WeakSet([]) - self.assertEqual(self.s.intersection(C(self.items2)), x) + self.assertEqual(i.intersection(C(self.items)), x) + self.assertEqual(len(i), len(self.items2)) + self.items2.pop() + gc.collect() + self.assertEqual(len(i), len(self.items2)) def test_isdisjoint(self): self.assertTrue(self.s.isdisjoint(WeakSet(self.items2))) @@ -116,6 +132,10 @@ class TestWeakSet(unittest.TestCase): self.assertEqual(self.s, WeakSet(self.items)) self.assertEqual(type(i), WeakSet) self.assertRaises(TypeError, self.s.symmetric_difference, [[]]) + self.assertEqual(len(i), len(self.items) + len(self.items2)) + self.items2.pop() + gc.collect() + self.assertEqual(len(i), len(self.items) + len(self.items2)) def test_xor(self): i = self.s.symmetric_difference(self.items2) @@ -123,22 +143,28 @@ class TestWeakSet(unittest.TestCase): self.assertEqual(self.s ^ frozenset(self.items2), i) def test_sub_and_super(self): - pl, ql, rl = map(lambda s: [ustr(c) for c in s], ['ab', 'abcde', 'def']) - p, q, r = map(WeakSet, (pl, ql, rl)) - self.assertTrue(p < q) - self.assertTrue(p <= q) - self.assertTrue(q <= q) - self.assertTrue(q > p) - self.assertTrue(q >= p) - self.assertFalse(q < r) - self.assertFalse(q <= r) - self.assertFalse(q > r) - self.assertFalse(q >= r) + self.assertTrue(self.ab_weakset <= self.abcde_weakset) + self.assertTrue(self.abcde_weakset <= self.abcde_weakset) + self.assertTrue(self.abcde_weakset >= self.ab_weakset) + self.assertFalse(self.abcde_weakset <= self.def_weakset) + self.assertFalse(self.abcde_weakset >= self.def_weakset) self.assertTrue(set('a').issubset('abc')) self.assertTrue(set('abc').issuperset('a')) self.assertFalse(set('a').issubset('cbs')) self.assertFalse(set('cbs').issuperset('a')) + def test_lt(self): + self.assertTrue(self.ab_weakset < self.abcde_weakset) + self.assertFalse(self.abcde_weakset < self.def_weakset) + self.assertFalse(self.ab_weakset < self.ab_weakset) + self.assertFalse(WeakSet() < WeakSet()) + + def test_gt(self): + self.assertTrue(self.abcde_weakset > self.ab_weakset) + self.assertFalse(self.abcde_weakset > self.def_weakset) + self.assertFalse(self.ab_weakset > self.ab_weakset) + self.assertFalse(WeakSet() > WeakSet()) + def test_gc(self): # Create a nest of cycles to exercise overall ref count check s = WeakSet(Foo() for i in range(1000)) diff --git a/Lib/test/test_xml_etree.py b/Lib/test/test_xml_etree.py index 58fdcd4..b9230b7 100644 --- a/Lib/test/test_xml_etree.py +++ b/Lib/test/test_xml_etree.py @@ -1855,6 +1855,102 @@ def check_issue10777(): # -------------------------------------------------------------------- +class ElementTreeTest(unittest.TestCase): + + def test_istype(self): + self.assertIsInstance(ET.ParseError, type) + self.assertIsInstance(ET.QName, type) + self.assertIsInstance(ET.ElementTree, type) + self.assertIsInstance(ET.Element, type) + # XXX issue 14128 with C ElementTree + # self.assertIsInstance(ET.TreeBuilder, type) + # self.assertIsInstance(ET.XMLParser, type) + + def test_Element_subclass_trivial(self): + class MyElement(ET.Element): + pass + + mye = MyElement('foo') + self.assertIsInstance(mye, ET.Element) + self.assertIsInstance(mye, MyElement) + self.assertEqual(mye.tag, 'foo') + + def test_Element_subclass_constructor(self): + class MyElement(ET.Element): + def __init__(self, tag, attrib={}, **extra): + super(MyElement, self).__init__(tag + '__', attrib, **extra) + + mye = MyElement('foo', {'a': 1, 'b': 2}, c=3, d=4) + self.assertEqual(mye.tag, 'foo__') + self.assertEqual(sorted(mye.items()), + [('a', 1), ('b', 2), ('c', 3), ('d', 4)]) + + def test_Element_subclass_new_method(self): + class MyElement(ET.Element): + def newmethod(self): + return self.tag + + mye = MyElement('joe') + self.assertEqual(mye.newmethod(), 'joe') + + +class TreeBuilderTest(unittest.TestCase): + + sample1 = ('<!DOCTYPE html PUBLIC' + ' "-//W3C//DTD XHTML 1.0 Transitional//EN"' + ' "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">' + '<html>text</html>') + + def test_dummy_builder(self): + class BaseDummyBuilder: + def close(self): + return 42 + + class DummyBuilder(BaseDummyBuilder): + data = start = end = lambda *a: None + + parser = ET.XMLParser(target=DummyBuilder()) + parser.feed(self.sample1) + self.assertEqual(parser.close(), 42) + + parser = ET.XMLParser(target=BaseDummyBuilder()) + parser.feed(self.sample1) + self.assertEqual(parser.close(), 42) + + parser = ET.XMLParser(target=object()) + parser.feed(self.sample1) + self.assertIsNone(parser.close()) + + + @unittest.expectedFailure # XXX issue 14007 with C ElementTree + def test_doctype(self): + class DoctypeParser: + _doctype = None + + def doctype(self, name, pubid, system): + self._doctype = (name, pubid, system) + + def close(self): + return self._doctype + + parser = ET.XMLParser(target=DoctypeParser()) + parser.feed(self.sample1) + + self.assertEqual(parser.close(), + ('html', '-//W3C//DTD XHTML 1.0 Transitional//EN', + 'http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd')) + + +class NoAcceleratorTest(unittest.TestCase): + + # Test that the C accelerator was not imported for pyET + def test_correct_import_pyET(self): + self.assertEqual(pyET.Element.__module__, 'xml.etree.ElementTree') + self.assertEqual(pyET.SubElement.__module__, 'xml.etree.ElementTree') + +# -------------------------------------------------------------------- + + class CleanContext(object): """Provide default namespace mapping and path cache.""" checkwarnings = None @@ -1873,10 +1969,7 @@ class CleanContext(object): ("This method will be removed in future versions. " "Use .+ instead.", DeprecationWarning), ("This method will be removed in future versions. " - "Use .+ instead.", PendingDeprecationWarning), - # XMLParser.doctype() is deprecated. - ("This method of XMLParser is deprecated. Define doctype.. " - "method on the TreeBuilder target.", DeprecationWarning)) + "Use .+ instead.", PendingDeprecationWarning)) self.checkwarnings = support.check_warnings(*deprecations, quiet=quiet) def __enter__(self): @@ -1898,19 +1991,18 @@ class CleanContext(object): self.checkwarnings.__exit__(*args) -class TestAcceleratorNotImported(unittest.TestCase): - # Test that the C accelerator was not imported for pyET - def test_correct_import_pyET(self): - self.assertEqual(pyET.Element.__module__, 'xml.etree.ElementTree') - - def test_main(module=pyET): from test import test_xml_etree # The same doctests are used for both the Python and the C implementations test_xml_etree.ET = module - support.run_unittest(TestAcceleratorNotImported) + test_classes = [ElementTreeTest, TreeBuilderTest] + if module is pyET: + # Run the tests specific to the Python implementation + test_classes += [NoAcceleratorTest] + + support.run_unittest(*test_classes) # XXX the C module should give the same warnings as the Python module with CleanContext(quiet=(module is not pyET)): diff --git a/Lib/test/test_xml_etree_c.py b/Lib/test/test_xml_etree_c.py index a73d0c4..10416d2 100644 --- a/Lib/test/test_xml_etree_c.py +++ b/Lib/test/test_xml_etree_c.py @@ -47,13 +47,20 @@ class MiscTests(unittest.TestCase): data = None @unittest.skipUnless(cET, 'requires _elementtree') +class TestAliasWorking(unittest.TestCase): + # Test that the cET alias module is alive + def test_alias_working(self): + e = cET_alias.Element('foo') + self.assertEqual(e.tag, 'foo') + +@unittest.skipUnless(cET, 'requires _elementtree') class TestAcceleratorImported(unittest.TestCase): # Test that the C accelerator was imported, as expected def test_correct_import_cET(self): - self.assertEqual(cET.Element.__module__, '_elementtree') + self.assertEqual(cET.SubElement.__module__, '_elementtree') def test_correct_import_cET_alias(self): - self.assertEqual(cET_alias.Element.__module__, '_elementtree') + self.assertEqual(cET_alias.SubElement.__module__, '_elementtree') def test_main(): @@ -61,13 +68,15 @@ def test_main(): # Run the tests specific to the C implementation support.run_doctest(test_xml_etree_c, verbosity=True) - - support.run_unittest(MiscTests, TestAcceleratorImported) + support.run_unittest( + MiscTests, + TestAliasWorking, + TestAcceleratorImported + ) # Run the same test suite as the Python module test_xml_etree.test_main(module=cET) - # Exercise the deprecated alias - test_xml_etree.test_main(module=cET_alias) + if __name__ == '__main__': test_main() |