summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/crashers/losing_mro_ref.py (renamed from Lib/test/crashers/loosing_mro_ref.py)0
-rw-r--r--Lib/test/crashers/nasty_eq_vs_dict.py47
-rw-r--r--Lib/test/pickletester.py99
-rwxr-xr-xLib/test/regrtest.py23
-rw-r--r--Lib/test/test_base64.py5
-rw-r--r--Lib/test/test_buffer.py201
-rw-r--r--Lib/test/test_dict.py22
-rw-r--r--Lib/test/test_exceptions.py2
-rw-r--r--Lib/test/test_logging.py14
-rw-r--r--Lib/test/test_mailbox.py12
-rw-r--r--Lib/test/test_marshal.py30
-rw-r--r--Lib/test/test_minidom.py15
-rw-r--r--Lib/test/test_multiprocessing.py236
-rw-r--r--Lib/test/test_mutants.py291
-rw-r--r--Lib/test/test_pickle.py28
-rw-r--r--Lib/test/test_pty.py19
-rw-r--r--Lib/test/test_signal.py10
-rw-r--r--Lib/test/test_time.py21
-rw-r--r--Lib/test/test_tokenize.py12
-rw-r--r--Lib/test/test_weakset.py56
-rw-r--r--Lib/test/test_xml_etree.py114
-rw-r--r--Lib/test/test_xml_etree_c.py21
22 files changed, 868 insertions, 410 deletions
diff --git a/Lib/test/crashers/loosing_mro_ref.py b/Lib/test/crashers/losing_mro_ref.py
index b3bcd32..b3bcd32 100644
--- a/Lib/test/crashers/loosing_mro_ref.py
+++ b/Lib/test/crashers/losing_mro_ref.py
diff --git a/Lib/test/crashers/nasty_eq_vs_dict.py b/Lib/test/crashers/nasty_eq_vs_dict.py
deleted file mode 100644
index 85f7caf..0000000
--- a/Lib/test/crashers/nasty_eq_vs_dict.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# from http://mail.python.org/pipermail/python-dev/2001-June/015239.html
-
-# if you keep changing a dictionary while looking up a key, you can
-# provoke an infinite recursion in C
-
-# At the time neither Tim nor Michael could be bothered to think of a
-# way to fix it.
-
-class Yuck:
- def __init__(self):
- self.i = 0
-
- def make_dangerous(self):
- self.i = 1
-
- def __hash__(self):
- # direct to slot 4 in table of size 8; slot 12 when size 16
- return 4 + 8
-
- def __eq__(self, other):
- if self.i == 0:
- # leave dict alone
- pass
- elif self.i == 1:
- # fiddle to 16 slots
- self.__fill_dict(6)
- self.i = 2
- else:
- # fiddle to 8 slots
- self.__fill_dict(4)
- self.i = 1
-
- return 1
-
- def __fill_dict(self, n):
- self.i = 0
- dict.clear()
- for i in range(n):
- dict[i] = i
- dict[self] = "OK!"
-
-y = Yuck()
-dict = {y: "OK!"}
-
-z = Yuck()
-y.make_dangerous()
-print(dict[z])
diff --git a/Lib/test/pickletester.py b/Lib/test/pickletester.py
index 831306f..1a551c8 100644
--- a/Lib/test/pickletester.py
+++ b/Lib/test/pickletester.py
@@ -1605,6 +1605,105 @@ class AbstractPicklerUnpicklerObjectTests(unittest.TestCase):
self.assertEqual(unpickler.load(), data)
+# Tests for dispatch_table attribute
+
+REDUCE_A = 'reduce_A'
+
+class AAA(object):
+ def __reduce__(self):
+ return str, (REDUCE_A,)
+
+class BBB(object):
+ pass
+
+class AbstractDispatchTableTests(unittest.TestCase):
+
+ def test_default_dispatch_table(self):
+ # No dispatch_table attribute by default
+ f = io.BytesIO()
+ p = self.pickler_class(f, 0)
+ with self.assertRaises(AttributeError):
+ p.dispatch_table
+ self.assertFalse(hasattr(p, 'dispatch_table'))
+
+ def test_class_dispatch_table(self):
+ # A dispatch_table attribute can be specified class-wide
+ dt = self.get_dispatch_table()
+
+ class MyPickler(self.pickler_class):
+ dispatch_table = dt
+
+ def dumps(obj, protocol=None):
+ f = io.BytesIO()
+ p = MyPickler(f, protocol)
+ self.assertEqual(p.dispatch_table, dt)
+ p.dump(obj)
+ return f.getvalue()
+
+ self._test_dispatch_table(dumps, dt)
+
+ def test_instance_dispatch_table(self):
+ # A dispatch_table attribute can also be specified instance-wide
+ dt = self.get_dispatch_table()
+
+ def dumps(obj, protocol=None):
+ f = io.BytesIO()
+ p = self.pickler_class(f, protocol)
+ p.dispatch_table = dt
+ self.assertEqual(p.dispatch_table, dt)
+ p.dump(obj)
+ return f.getvalue()
+
+ self._test_dispatch_table(dumps, dt)
+
+ def _test_dispatch_table(self, dumps, dispatch_table):
+ def custom_load_dump(obj):
+ return pickle.loads(dumps(obj, 0))
+
+ def default_load_dump(obj):
+ return pickle.loads(pickle.dumps(obj, 0))
+
+ # pickling complex numbers using protocol 0 relies on copyreg
+ # so check pickling a complex number still works
+ z = 1 + 2j
+ self.assertEqual(custom_load_dump(z), z)
+ self.assertEqual(default_load_dump(z), z)
+
+ # modify pickling of complex
+ REDUCE_1 = 'reduce_1'
+ def reduce_1(obj):
+ return str, (REDUCE_1,)
+ dispatch_table[complex] = reduce_1
+ self.assertEqual(custom_load_dump(z), REDUCE_1)
+ self.assertEqual(default_load_dump(z), z)
+
+ # check picklability of AAA and BBB
+ a = AAA()
+ b = BBB()
+ self.assertEqual(custom_load_dump(a), REDUCE_A)
+ self.assertIsInstance(custom_load_dump(b), BBB)
+ self.assertEqual(default_load_dump(a), REDUCE_A)
+ self.assertIsInstance(default_load_dump(b), BBB)
+
+ # modify pickling of BBB
+ dispatch_table[BBB] = reduce_1
+ self.assertEqual(custom_load_dump(a), REDUCE_A)
+ self.assertEqual(custom_load_dump(b), REDUCE_1)
+ self.assertEqual(default_load_dump(a), REDUCE_A)
+ self.assertIsInstance(default_load_dump(b), BBB)
+
+ # revert pickling of BBB and modify pickling of AAA
+ REDUCE_2 = 'reduce_2'
+ def reduce_2(obj):
+ return str, (REDUCE_2,)
+ dispatch_table[AAA] = reduce_2
+ del dispatch_table[BBB]
+ self.assertEqual(custom_load_dump(a), REDUCE_2)
+ self.assertIsInstance(custom_load_dump(b), BBB)
+ self.assertEqual(default_load_dump(a), REDUCE_A)
+ self.assertIsInstance(default_load_dump(b), BBB)
+
+
if __name__ == "__main__":
# Print some stuff that can be used to rewrite DATA{0,1,2}
from pickletools import dis
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py
index 871ae61..44d3426 100755
--- a/Lib/test/regrtest.py
+++ b/Lib/test/regrtest.py
@@ -749,10 +749,10 @@ def main(tests=None, testdir=None, verbose=0, quiet=False,
if bad:
print(count(len(bad), "test"), "failed:")
printlist(bad)
- if environment_changed:
- print("{} altered the execution environment:".format(
- count(len(environment_changed), "test")))
- printlist(environment_changed)
+ if environment_changed:
+ print("{} altered the execution environment:".format(
+ count(len(environment_changed), "test")))
+ printlist(environment_changed)
if skipped and not quiet:
print(count(len(skipped), "test"), "skipped:")
printlist(skipped)
@@ -970,6 +970,7 @@ class saved_test_environment:
'multiprocessing.process._dangling',
'sysconfig._CONFIG_VARS', 'sysconfig._SCHEMES',
'packaging.command._COMMANDS', 'packaging.database_caches',
+ 'support.TESTFN',
)
def get_sys_argv(self):
@@ -1163,6 +1164,20 @@ class saved_test_environment:
sysconfig._SCHEMES._sections.clear()
sysconfig._SCHEMES._sections.update(saved[2])
+ def get_support_TESTFN(self):
+ if os.path.isfile(support.TESTFN):
+ result = 'f'
+ elif os.path.isdir(support.TESTFN):
+ result = 'd'
+ else:
+ result = None
+ return result
+ def restore_support_TESTFN(self, saved_value):
+ if saved_value is None:
+ if os.path.isfile(support.TESTFN):
+ os.unlink(support.TESTFN)
+ elif os.path.isdir(support.TESTFN):
+ shutil.rmtree(support.TESTFN)
def resource_info(self):
for name in self.resources:
diff --git a/Lib/test/test_base64.py b/Lib/test/test_base64.py
index b02f86d..2569476 100644
--- a/Lib/test/test_base64.py
+++ b/Lib/test/test_base64.py
@@ -2,6 +2,7 @@ import unittest
from test import support
import base64
import binascii
+import os
import sys
import subprocess
@@ -274,6 +275,10 @@ class BaseXYTestCase(unittest.TestCase):
class TestMain(unittest.TestCase):
+ def tearDown(self):
+ if os.path.exists(support.TESTFN):
+ os.unlink(support.TESTFN)
+
def get_output(self, *args, **options):
args = (sys.executable, '-m', 'base64') + args
return subprocess.check_output(args, **options)
diff --git a/Lib/test/test_buffer.py b/Lib/test/test_buffer.py
index 25324ef..e0006f2 100644
--- a/Lib/test/test_buffer.py
+++ b/Lib/test/test_buffer.py
@@ -3373,6 +3373,15 @@ class TestBufferProtocol(unittest.TestCase):
del nd
m.release()
+ a = bytearray([1,2,3])
+ m = memoryview(a)
+ nd1 = ndarray(m, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT)
+ nd2 = ndarray(nd1, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT)
+ self.assertIs(nd2.obj, m)
+ self.assertRaises(BufferError, m.release)
+ del nd1, nd2
+ m.release()
+
# chained views
a = bytearray([1,2,3])
m1 = memoryview(a)
@@ -3383,6 +3392,17 @@ class TestBufferProtocol(unittest.TestCase):
del nd
m2.release()
+ a = bytearray([1,2,3])
+ m1 = memoryview(a)
+ m2 = memoryview(m1)
+ nd1 = ndarray(m2, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT)
+ nd2 = ndarray(nd1, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT)
+ self.assertIs(nd2.obj, m2)
+ m1.release()
+ self.assertRaises(BufferError, m2.release)
+ del nd1, nd2
+ m2.release()
+
# Allow changing layout while buffers are exported.
nd = ndarray([1,2,3], shape=[3], flags=ND_VAREXPORT)
m1 = memoryview(nd)
@@ -3418,11 +3438,182 @@ class TestBufferProtocol(unittest.TestCase):
catch22(m1)
self.assertEqual(m1[0], ord(b'1'))
- # XXX If m1 has exports, raise BufferError.
- # x = bytearray(b'123')
- # with memoryview(x) as m1:
- # ex = ndarray(m1)
- # m1[0] == ord(b'1')
+ x = ndarray(list(range(12)), shape=[2,2,3], format='l')
+ y = ndarray(x, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT)
+ z = ndarray(y, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT)
+ self.assertIs(z.obj, x)
+ with memoryview(z) as m:
+ catch22(m)
+ self.assertEqual(m[0:1].tolist(), [[[0, 1, 2], [3, 4, 5]]])
+
+ # Test garbage collection.
+ for flags in (0, ND_REDIRECT):
+ x = bytearray(b'123')
+ with memoryview(x) as m1:
+ del x
+ y = ndarray(m1, getbuf=PyBUF_FULL_RO, flags=flags)
+ with memoryview(y) as m2:
+ del y
+ z = ndarray(m2, getbuf=PyBUF_FULL_RO, flags=flags)
+ with memoryview(z) as m3:
+ del z
+ catch22(m3)
+ catch22(m2)
+ catch22(m1)
+ self.assertEqual(m1[0], ord(b'1'))
+ self.assertEqual(m2[1], ord(b'2'))
+ self.assertEqual(m3[2], ord(b'3'))
+ del m3
+ del m2
+ del m1
+
+ x = bytearray(b'123')
+ with memoryview(x) as m1:
+ del x
+ y = ndarray(m1, getbuf=PyBUF_FULL_RO, flags=flags)
+ with memoryview(y) as m2:
+ del y
+ z = ndarray(m2, getbuf=PyBUF_FULL_RO, flags=flags)
+ with memoryview(z) as m3:
+ del z
+ catch22(m1)
+ catch22(m2)
+ catch22(m3)
+ self.assertEqual(m1[0], ord(b'1'))
+ self.assertEqual(m2[1], ord(b'2'))
+ self.assertEqual(m3[2], ord(b'3'))
+ del m1, m2, m3
+
+ # memoryview.release() fails if the view has exported buffers.
+ x = bytearray(b'123')
+ with self.assertRaises(BufferError):
+ with memoryview(x) as m:
+ ex = ndarray(m)
+ m[0] == ord(b'1')
+
+ def test_memoryview_redirect(self):
+
+ nd = ndarray([1.0 * x for x in range(12)], shape=[12], format='d')
+ a = array.array('d', [1.0 * x for x in range(12)])
+
+ for x in (nd, a):
+ y = ndarray(x, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT)
+ z = ndarray(y, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT)
+ m = memoryview(z)
+
+ self.assertIs(y.obj, x)
+ self.assertIs(z.obj, x)
+ self.assertIs(m.obj, x)
+
+ self.assertEqual(m, x)
+ self.assertEqual(m, y)
+ self.assertEqual(m, z)
+
+ self.assertEqual(m[1:3], x[1:3])
+ self.assertEqual(m[1:3], y[1:3])
+ self.assertEqual(m[1:3], z[1:3])
+ del y, z
+ self.assertEqual(m[1:3], x[1:3])
+
+ def test_memoryview_from_static_exporter(self):
+
+ fmt = 'B'
+ lst = [0,1,2,3,4,5,6,7,8,9,10,11]
+
+ # exceptions
+ self.assertRaises(TypeError, staticarray, 1, 2, 3)
+
+ # view.obj==x
+ x = staticarray()
+ y = memoryview(x)
+ self.verify(y, obj=x,
+ itemsize=1, fmt=fmt, readonly=1,
+ ndim=1, shape=[12], strides=[1],
+ lst=lst)
+ for i in range(12):
+ self.assertEqual(y[i], i)
+ del x
+ del y
+
+ x = staticarray()
+ y = memoryview(x)
+ del y
+ del x
+
+ x = staticarray()
+ y = ndarray(x, getbuf=PyBUF_FULL_RO)
+ z = ndarray(y, getbuf=PyBUF_FULL_RO)
+ m = memoryview(z)
+ self.assertIs(y.obj, x)
+ self.assertIs(m.obj, z)
+ self.verify(m, obj=z,
+ itemsize=1, fmt=fmt, readonly=1,
+ ndim=1, shape=[12], strides=[1],
+ lst=lst)
+ del x, y, z, m
+
+ x = staticarray()
+ y = ndarray(x, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT)
+ z = ndarray(y, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT)
+ m = memoryview(z)
+ self.assertIs(y.obj, x)
+ self.assertIs(z.obj, x)
+ self.assertIs(m.obj, x)
+ self.verify(m, obj=x,
+ itemsize=1, fmt=fmt, readonly=1,
+ ndim=1, shape=[12], strides=[1],
+ lst=lst)
+ del x, y, z, m
+
+ # view.obj==NULL
+ x = staticarray(legacy_mode=True)
+ y = memoryview(x)
+ self.verify(y, obj=None,
+ itemsize=1, fmt=fmt, readonly=1,
+ ndim=1, shape=[12], strides=[1],
+ lst=lst)
+ for i in range(12):
+ self.assertEqual(y[i], i)
+ del x
+ del y
+
+ x = staticarray(legacy_mode=True)
+ y = memoryview(x)
+ del y
+ del x
+
+ x = staticarray(legacy_mode=True)
+ y = ndarray(x, getbuf=PyBUF_FULL_RO)
+ z = ndarray(y, getbuf=PyBUF_FULL_RO)
+ m = memoryview(z)
+ self.assertIs(y.obj, None)
+ self.assertIs(m.obj, z)
+ self.verify(m, obj=z,
+ itemsize=1, fmt=fmt, readonly=1,
+ ndim=1, shape=[12], strides=[1],
+ lst=lst)
+ del x, y, z, m
+
+ x = staticarray(legacy_mode=True)
+ y = ndarray(x, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT)
+ z = ndarray(y, getbuf=PyBUF_FULL_RO, flags=ND_REDIRECT)
+ m = memoryview(z)
+ # Clearly setting view.obj==NULL is inferior, since it
+ # messes up the redirection chain:
+ self.assertIs(y.obj, None)
+ self.assertIs(z.obj, y)
+ self.assertIs(m.obj, y)
+ self.verify(m, obj=y,
+ itemsize=1, fmt=fmt, readonly=1,
+ ndim=1, shape=[12], strides=[1],
+ lst=lst)
+ del x, y, z, m
+
+ def test_memoryview_getbuffer_undefined(self):
+
+ # getbufferproc does not adhere to the new documentation
+ nd = ndarray([1,2,3], [3], flags=ND_GETBUF_FAIL|ND_GETBUF_UNDEFINED)
+ self.assertRaises(BufferError, memoryview, nd)
def test_issue_7385(self):
x = ndarray([1,2,3], shape=[3], flags=ND_GETBUF_FAIL)
diff --git a/Lib/test/test_dict.py b/Lib/test/test_dict.py
index d2740a3..15db51d 100644
--- a/Lib/test/test_dict.py
+++ b/Lib/test/test_dict.py
@@ -379,7 +379,7 @@ class DictTest(unittest.TestCase):
x.fail = True
self.assertRaises(Exc, d.pop, x)
- def test_mutatingiteration(self):
+ def test_mutating_iteration(self):
# changing dict size during iteration
d = {}
d[1] = 1
@@ -387,6 +387,26 @@ class DictTest(unittest.TestCase):
for i in d:
d[i+1] = 1
+ def test_mutating_lookup(self):
+ # changing dict during a lookup
+ class NastyKey:
+ mutate_dict = None
+
+ def __hash__(self):
+ # hash collision!
+ return 1
+
+ def __eq__(self, other):
+ if self.mutate_dict:
+ self.mutate_dict[self] = 1
+ return self == other
+
+ d = {}
+ d[NastyKey()] = 0
+ NastyKey.mutate_dict = d
+ with self.assertRaises(RuntimeError):
+ d[NastyKey()] = None
+
def test_repr(self):
d = {}
self.assertEqual(repr(d), '{}')
diff --git a/Lib/test/test_exceptions.py b/Lib/test/test_exceptions.py
index 91d85ef..42536d3 100644
--- a/Lib/test/test_exceptions.py
+++ b/Lib/test/test_exceptions.py
@@ -38,7 +38,7 @@ class ExceptionTests(unittest.TestCase):
try:
try:
import marshal
- marshal.loads('')
+ marshal.loads(b'')
except EOFError:
pass
finally:
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index b239f58..98a2819 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -3651,11 +3651,14 @@ class TimedRotatingFileHandlerTest(BaseFileTest):
def test_rollover(self):
fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S',
backupCount=1)
- r = logging.makeLogRecord({'msg': 'testing'})
- fh.emit(r)
+ fmt = logging.Formatter('%(asctime)s %(message)s')
+ fh.setFormatter(fmt)
+ r1 = logging.makeLogRecord({'msg': 'testing - initial'})
+ fh.emit(r1)
self.assertLogFile(self.fn)
- time.sleep(1.01) # just a little over a second ...
- fh.emit(r)
+ time.sleep(1.1) # a little over a second ...
+ r2 = logging.makeLogRecord({'msg': 'testing - after delay'})
+ fh.emit(r2)
fh.close()
# At this point, we should have a recent rotated file which we
# can test for the existence of. However, in practice, on some
@@ -3682,7 +3685,8 @@ class TimedRotatingFileHandlerTest(BaseFileTest):
print('The only matching files are: %s' % files, file=sys.stderr)
for f in files:
print('Contents of %s:' % f)
- with open(f, 'r') as tf:
+ path = os.path.join(dn, f)
+ with open(path, 'r') as tf:
print(tf.read())
self.assertTrue(found, msg=msg)
diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py
index 4bb05a4..212ceb9 100644
--- a/Lib/test/test_mailbox.py
+++ b/Lib/test/test_mailbox.py
@@ -7,6 +7,7 @@ import email
import email.message
import re
import io
+import shutil
import tempfile
from test import support
import unittest
@@ -38,12 +39,7 @@ class TestBase(unittest.TestCase):
def _delete_recursively(self, target):
# Delete a file or delete a directory recursively
if os.path.isdir(target):
- for path, dirs, files in os.walk(target, topdown=False):
- for name in files:
- os.remove(os.path.join(path, name))
- for name in dirs:
- os.rmdir(os.path.join(path, name))
- os.rmdir(target)
+ shutil.rmtree(target)
elif os.path.exists(target):
os.remove(target)
@@ -2028,6 +2024,10 @@ class MaildirTestCase(unittest.TestCase):
def setUp(self):
# create a new maildir mailbox to work with:
self._dir = support.TESTFN
+ if os.path.isdir(self._dir):
+ shutil.rmtree(self._dir)
+ elif os.path.isfile(self._dir):
+ os.unlink(self._dir)
os.mkdir(self._dir)
os.mkdir(os.path.join(self._dir, "cur"))
os.mkdir(os.path.join(self._dir, "tmp"))
diff --git a/Lib/test/test_marshal.py b/Lib/test/test_marshal.py
index dec8129..83c348c 100644
--- a/Lib/test/test_marshal.py
+++ b/Lib/test/test_marshal.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
from test import support
+import array
import marshal
import sys
import unittest
@@ -154,6 +155,27 @@ class ContainerTestCase(unittest.TestCase, HelperMixin):
for constructor in (set, frozenset):
self.helper(constructor(self.d.keys()))
+
+class BufferTestCase(unittest.TestCase, HelperMixin):
+
+ def test_bytearray(self):
+ b = bytearray(b"abc")
+ self.helper(b)
+ new = marshal.loads(marshal.dumps(b))
+ self.assertEqual(type(new), bytes)
+
+ def test_memoryview(self):
+ b = memoryview(b"abc")
+ self.helper(b)
+ new = marshal.loads(marshal.dumps(b))
+ self.assertEqual(type(new), bytes)
+
+ def test_array(self):
+ a = array.array('B', b"abc")
+ new = marshal.loads(marshal.dumps(a))
+ self.assertEqual(new, b"abc")
+
+
class BugsTestCase(unittest.TestCase):
def test_bug_5888452(self):
# Simple-minded check for SF 588452: Debug build crashes
@@ -179,7 +201,7 @@ class BugsTestCase(unittest.TestCase):
pass
def test_loads_recursion(self):
- s = 'c' + ('X' * 4*4) + '{' * 2**20
+ s = b'c' + (b'X' * 4*4) + b'{' * 2**20
self.assertRaises(ValueError, marshal.loads, s)
def test_recursion_limit(self):
@@ -252,6 +274,11 @@ class BugsTestCase(unittest.TestCase):
finally:
support.unlink(support.TESTFN)
+ def test_loads_reject_unicode_strings(self):
+ # Issue #14177: marshal.loads() should not accept unicode strings
+ unicode_string = 'T'
+ self.assertRaises(TypeError, marshal.loads, unicode_string)
+
def test_main():
support.run_unittest(IntTestCase,
@@ -260,6 +287,7 @@ def test_main():
CodeTestCase,
ContainerTestCase,
ExceptionTestCase,
+ BufferTestCase,
BugsTestCase)
if __name__ == "__main__":
diff --git a/Lib/test/test_minidom.py b/Lib/test/test_minidom.py
index 752a840..cc4c95b 100644
--- a/Lib/test/test_minidom.py
+++ b/Lib/test/test_minidom.py
@@ -362,11 +362,17 @@ class MinidomTest(unittest.TestCase):
def testGetAttrList(self):
pass
- def testGetAttrValues(self): pass
+ def testGetAttrValues(self):
+ pass
- def testGetAttrLength(self): pass
+ def testGetAttrLength(self):
+ pass
- def testGetAttribute(self): pass
+ def testGetAttribute(self):
+ dom = Document()
+ child = dom.appendChild(
+ dom.createElementNS("http://www.python.org", "python:abc"))
+ self.assertEqual(child.getAttribute('missing'), '')
def testGetAttributeNS(self):
dom = Document()
@@ -378,6 +384,9 @@ class MinidomTest(unittest.TestCase):
'http://www.python.org')
self.assertEqual(child.getAttributeNS("http://www.w3.org", "other"),
'')
+ child2 = child.appendChild(dom.createElement('abc'))
+ self.assertEqual(child2.getAttributeNS("http://www.python.org", "missing"),
+ '')
def testGetAttributeNode(self): pass
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index f141bd4..0db6352 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1811,6 +1811,84 @@ class _TestListenerClient(BaseTestCase):
p.join()
l.close()
+class _TestPoll(unittest.TestCase):
+
+ ALLOWED_TYPES = ('processes', 'threads')
+
+ def test_empty_string(self):
+ a, b = self.Pipe()
+ self.assertEqual(a.poll(), False)
+ b.send_bytes(b'')
+ self.assertEqual(a.poll(), True)
+ self.assertEqual(a.poll(), True)
+
+ @classmethod
+ def _child_strings(cls, conn, strings):
+ for s in strings:
+ time.sleep(0.1)
+ conn.send_bytes(s)
+ conn.close()
+
+ def test_strings(self):
+ strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
+ a, b = self.Pipe()
+ p = self.Process(target=self._child_strings, args=(b, strings))
+ p.start()
+
+ for s in strings:
+ for i in range(200):
+ if a.poll(0.01):
+ break
+ x = a.recv_bytes()
+ self.assertEqual(s, x)
+
+ p.join()
+
+ @classmethod
+ def _child_boundaries(cls, r):
+ # Polling may "pull" a message in to the child process, but we
+ # don't want it to pull only part of a message, as that would
+ # corrupt the pipe for any other processes which might later
+ # read from it.
+ r.poll(5)
+
+ def test_boundaries(self):
+ r, w = self.Pipe(False)
+ p = self.Process(target=self._child_boundaries, args=(r,))
+ p.start()
+ time.sleep(2)
+ L = [b"first", b"second"]
+ for obj in L:
+ w.send_bytes(obj)
+ w.close()
+ p.join()
+ self.assertIn(r.recv_bytes(), L)
+
+ @classmethod
+ def _child_dont_merge(cls, b):
+ b.send_bytes(b'a')
+ b.send_bytes(b'b')
+ b.send_bytes(b'cd')
+
+ def test_dont_merge(self):
+ a, b = self.Pipe()
+ self.assertEqual(a.poll(0.0), False)
+ self.assertEqual(a.poll(0.1), False)
+
+ p = self.Process(target=self._child_dont_merge, args=(b,))
+ p.start()
+
+ self.assertEqual(a.recv_bytes(), b'a')
+ self.assertEqual(a.poll(1.0), True)
+ self.assertEqual(a.poll(1.0), True)
+ self.assertEqual(a.recv_bytes(), b'b')
+ self.assertEqual(a.poll(1.0), True)
+ self.assertEqual(a.poll(1.0), True)
+ self.assertEqual(a.poll(0.0), True)
+ self.assertEqual(a.recv_bytes(), b'cd')
+
+ p.join()
+
#
# Test of sending connection and socket objects between processes
#
@@ -2404,8 +2482,164 @@ class TestStdinBadfiledescriptor(unittest.TestCase):
flike.flush()
assert sio.getvalue() == 'foo'
+
+class TestWait(unittest.TestCase):
+
+ @classmethod
+ def _child_test_wait(cls, w, slow):
+ for i in range(10):
+ if slow:
+ time.sleep(random.random()*0.1)
+ w.send((i, os.getpid()))
+ w.close()
+
+ def test_wait(self, slow=False):
+ from multiprocessing.connection import wait
+ readers = []
+ procs = []
+ messages = []
+
+ for i in range(4):
+ r, w = multiprocessing.Pipe(duplex=False)
+ p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
+ p.daemon = True
+ p.start()
+ w.close()
+ readers.append(r)
+ procs.append(p)
+ self.addCleanup(p.join)
+
+ while readers:
+ for r in wait(readers):
+ try:
+ msg = r.recv()
+ except EOFError:
+ readers.remove(r)
+ r.close()
+ else:
+ messages.append(msg)
+
+ messages.sort()
+ expected = sorted((i, p.pid) for i in range(10) for p in procs)
+ self.assertEqual(messages, expected)
+
+ @classmethod
+ def _child_test_wait_socket(cls, address, slow):
+ s = socket.socket()
+ s.connect(address)
+ for i in range(10):
+ if slow:
+ time.sleep(random.random()*0.1)
+ s.sendall(('%s\n' % i).encode('ascii'))
+ s.close()
+
+ def test_wait_socket(self, slow=False):
+ from multiprocessing.connection import wait
+ l = socket.socket()
+ l.bind(('', 0))
+ l.listen(4)
+ addr = ('localhost', l.getsockname()[1])
+ readers = []
+ procs = []
+ dic = {}
+
+ for i in range(4):
+ p = multiprocessing.Process(target=self._child_test_wait_socket,
+ args=(addr, slow))
+ p.daemon = True
+ p.start()
+ procs.append(p)
+ self.addCleanup(p.join)
+
+ for i in range(4):
+ r, _ = l.accept()
+ readers.append(r)
+ dic[r] = []
+ l.close()
+
+ while readers:
+ for r in wait(readers):
+ msg = r.recv(32)
+ if not msg:
+ readers.remove(r)
+ r.close()
+ else:
+ dic[r].append(msg)
+
+ expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
+ for v in dic.values():
+ self.assertEqual(b''.join(v), expected)
+
+ def test_wait_slow(self):
+ self.test_wait(True)
+
+ def test_wait_socket_slow(self):
+ self.test_wait(True)
+
+ def test_wait_timeout(self):
+ from multiprocessing.connection import wait
+
+ expected = 1
+ a, b = multiprocessing.Pipe()
+
+ start = time.time()
+ res = wait([a, b], 1)
+ delta = time.time() - start
+
+ self.assertEqual(res, [])
+ self.assertLess(delta, expected + 0.2)
+ self.assertGreater(delta, expected - 0.2)
+
+ b.send(None)
+
+ start = time.time()
+ res = wait([a, b], 1)
+ delta = time.time() - start
+
+ self.assertEqual(res, [a])
+ self.assertLess(delta, 0.2)
+
+ def test_wait_integer(self):
+ from multiprocessing.connection import wait
+
+ expected = 5
+ a, b = multiprocessing.Pipe()
+ p = multiprocessing.Process(target=time.sleep, args=(expected,))
+
+ p.start()
+ self.assertIsInstance(p.sentinel, int)
+
+ start = time.time()
+ res = wait([a, p.sentinel, b], expected + 20)
+ delta = time.time() - start
+
+ self.assertEqual(res, [p.sentinel])
+ self.assertLess(delta, expected + 1)
+ self.assertGreater(delta, expected - 1)
+
+ a.send(None)
+
+ start = time.time()
+ res = wait([a, p.sentinel, b], 20)
+ delta = time.time() - start
+
+ self.assertEqual(res, [p.sentinel, b])
+ self.assertLess(delta, 0.2)
+
+ b.send(None)
+
+ start = time.time()
+ res = wait([a, p.sentinel, b], 20)
+ delta = time.time() - start
+
+ self.assertEqual(res, [a, p.sentinel, b])
+ self.assertLess(delta, 0.2)
+
+ p.join()
+
+
testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
- TestStdinBadfiledescriptor]
+ TestStdinBadfiledescriptor, TestWait]
#
#
diff --git a/Lib/test/test_mutants.py b/Lib/test/test_mutants.py
deleted file mode 100644
index b43fa47..0000000
--- a/Lib/test/test_mutants.py
+++ /dev/null
@@ -1,291 +0,0 @@
-from test.support import verbose, TESTFN
-import random
-import os
-
-# From SF bug #422121: Insecurities in dict comparison.
-
-# Safety of code doing comparisons has been an historical Python weak spot.
-# The problem is that comparison of structures written in C *naturally*
-# wants to hold on to things like the size of the container, or "the
-# biggest" containee so far, across a traversal of the container; but
-# code to do containee comparisons can call back into Python and mutate
-# the container in arbitrary ways while the C loop is in midstream. If the
-# C code isn't extremely paranoid about digging things out of memory on
-# each trip, and artificially boosting refcounts for the duration, anything
-# from infinite loops to OS crashes can result (yes, I use Windows <wink>).
-#
-# The other problem is that code designed to provoke a weakness is usually
-# white-box code, and so catches only the particular vulnerabilities the
-# author knew to protect against. For example, Python's list.sort() code
-# went thru many iterations as one "new" vulnerability after another was
-# discovered.
-#
-# So the dict comparison test here uses a black-box approach instead,
-# generating dicts of various sizes at random, and performing random
-# mutations on them at random times. This proved very effective,
-# triggering at least six distinct failure modes the first 20 times I
-# ran it. Indeed, at the start, the driver never got beyond 6 iterations
-# before the test died.
-
-# The dicts are global to make it easy to mutate tham from within functions.
-dict1 = {}
-dict2 = {}
-
-# The current set of keys in dict1 and dict2. These are materialized as
-# lists to make it easy to pick a dict key at random.
-dict1keys = []
-dict2keys = []
-
-# Global flag telling maybe_mutate() whether to *consider* mutating.
-mutate = 0
-
-# If global mutate is true, consider mutating a dict. May or may not
-# mutate a dict even if mutate is true. If it does decide to mutate a
-# dict, it picks one of {dict1, dict2} at random, and deletes a random
-# entry from it; or, more rarely, adds a random element.
-
-def maybe_mutate():
- global mutate
- if not mutate:
- return
- if random.random() < 0.5:
- return
-
- if random.random() < 0.5:
- target, keys = dict1, dict1keys
- else:
- target, keys = dict2, dict2keys
-
- if random.random() < 0.2:
- # Insert a new key.
- mutate = 0 # disable mutation until key inserted
- while 1:
- newkey = Horrid(random.randrange(100))
- if newkey not in target:
- break
- target[newkey] = Horrid(random.randrange(100))
- keys.append(newkey)
- mutate = 1
-
- elif keys:
- # Delete a key at random.
- mutate = 0 # disable mutation until key deleted
- i = random.randrange(len(keys))
- key = keys[i]
- del target[key]
- del keys[i]
- mutate = 1
-
-# A horrid class that triggers random mutations of dict1 and dict2 when
-# instances are compared.
-
-class Horrid:
- def __init__(self, i):
- # Comparison outcomes are determined by the value of i.
- self.i = i
-
- # An artificial hashcode is selected at random so that we don't
- # have any systematic relationship between comparison outcomes
- # (based on self.i and other.i) and relative position within the
- # hash vector (based on hashcode).
- # XXX This is no longer effective.
- ##self.hashcode = random.randrange(1000000000)
-
- def __hash__(self):
- return 42
- return self.hashcode
-
- def __eq__(self, other):
- maybe_mutate() # The point of the test.
- return self.i == other.i
-
- def __ne__(self, other):
- raise RuntimeError("I didn't expect some kind of Spanish inquisition!")
-
- __lt__ = __le__ = __gt__ = __ge__ = __ne__
-
- def __repr__(self):
- return "Horrid(%d)" % self.i
-
-# Fill dict d with numentries (Horrid(i), Horrid(j)) key-value pairs,
-# where i and j are selected at random from the candidates list.
-# Return d.keys() after filling.
-
-def fill_dict(d, candidates, numentries):
- d.clear()
- for i in range(numentries):
- d[Horrid(random.choice(candidates))] = \
- Horrid(random.choice(candidates))
- return list(d.keys())
-
-# Test one pair of randomly generated dicts, each with n entries.
-# Note that dict comparison is trivial if they don't have the same number
-# of entires (then the "shorter" dict is instantly considered to be the
-# smaller one, without even looking at the entries).
-
-def test_one(n):
- global mutate, dict1, dict2, dict1keys, dict2keys
-
- # Fill the dicts without mutating them.
- mutate = 0
- dict1keys = fill_dict(dict1, range(n), n)
- dict2keys = fill_dict(dict2, range(n), n)
-
- # Enable mutation, then compare the dicts so long as they have the
- # same size.
- mutate = 1
- if verbose:
- print("trying w/ lengths", len(dict1), len(dict2), end=' ')
- while dict1 and len(dict1) == len(dict2):
- if verbose:
- print(".", end=' ')
- c = dict1 == dict2
- if verbose:
- print()
-
-# Run test_one n times. At the start (before the bugs were fixed), 20
-# consecutive runs of this test each blew up on or before the sixth time
-# test_one was run. So n doesn't have to be large to get an interesting
-# test.
-# OTOH, calling with large n is also interesting, to ensure that the fixed
-# code doesn't hold on to refcounts *too* long (in which case memory would
-# leak).
-
-def test(n):
- for i in range(n):
- test_one(random.randrange(1, 100))
-
-# See last comment block for clues about good values for n.
-test(100)
-
-##########################################################################
-# Another segfault bug, distilled by Michael Hudson from a c.l.py post.
-
-class Child:
- def __init__(self, parent):
- self.__dict__['parent'] = parent
- def __getattr__(self, attr):
- self.parent.a = 1
- self.parent.b = 1
- self.parent.c = 1
- self.parent.d = 1
- self.parent.e = 1
- self.parent.f = 1
- self.parent.g = 1
- self.parent.h = 1
- self.parent.i = 1
- return getattr(self.parent, attr)
-
-class Parent:
- def __init__(self):
- self.a = Child(self)
-
-# Hard to say what this will print! May vary from time to time. But
-# we're specifically trying to test the tp_print slot here, and this is
-# the clearest way to do it. We print the result to a temp file so that
-# the expected-output file doesn't need to change.
-
-f = open(TESTFN, "w")
-print(Parent().__dict__, file=f)
-f.close()
-os.unlink(TESTFN)
-
-##########################################################################
-# And another core-dumper from Michael Hudson.
-
-dict = {}
-
-# Force dict to malloc its table.
-for i in range(1, 10):
- dict[i] = i
-
-f = open(TESTFN, "w")
-
-class Machiavelli:
- def __repr__(self):
- dict.clear()
-
- # Michael sez: "doesn't crash without this. don't know why."
- # Tim sez: "luck of the draw; crashes with or without for me."
- print(file=f)
-
- return repr("machiavelli")
-
- def __hash__(self):
- return 0
-
-dict[Machiavelli()] = Machiavelli()
-
-print(str(dict), file=f)
-f.close()
-os.unlink(TESTFN)
-del f, dict
-
-
-##########################################################################
-# And another core-dumper from Michael Hudson.
-
-dict = {}
-
-# let's force dict to malloc its table
-for i in range(1, 10):
- dict[i] = i
-
-class Machiavelli2:
- def __eq__(self, other):
- dict.clear()
- return 1
-
- def __hash__(self):
- return 0
-
-dict[Machiavelli2()] = Machiavelli2()
-
-try:
- dict[Machiavelli2()]
-except KeyError:
- pass
-
-del dict
-
-##########################################################################
-# And another core-dumper from Michael Hudson.
-
-dict = {}
-
-# let's force dict to malloc its table
-for i in range(1, 10):
- dict[i] = i
-
-class Machiavelli3:
- def __init__(self, id):
- self.id = id
-
- def __eq__(self, other):
- if self.id == other.id:
- dict.clear()
- return 1
- else:
- return 0
-
- def __repr__(self):
- return "%s(%s)"%(self.__class__.__name__, self.id)
-
- def __hash__(self):
- return 0
-
-dict[Machiavelli3(1)] = Machiavelli3(0)
-dict[Machiavelli3(2)] = Machiavelli3(0)
-
-f = open(TESTFN, "w")
-try:
- try:
- print(dict[Machiavelli3(2)], file=f)
- except KeyError:
- pass
-finally:
- f.close()
- os.unlink(TESTFN)
-
-del dict
-del dict1, dict2, dict1keys, dict2keys
diff --git a/Lib/test/test_pickle.py b/Lib/test/test_pickle.py
index 9da2cae..f52d4bd 100644
--- a/Lib/test/test_pickle.py
+++ b/Lib/test/test_pickle.py
@@ -1,5 +1,6 @@
import pickle
import io
+import collections
from test import support
@@ -7,6 +8,7 @@ from test.pickletester import AbstractPickleTests
from test.pickletester import AbstractPickleModuleTests
from test.pickletester import AbstractPersistentPicklerTests
from test.pickletester import AbstractPicklerUnpicklerObjectTests
+from test.pickletester import AbstractDispatchTableTests
from test.pickletester import BigmemPickleTests
try:
@@ -80,6 +82,18 @@ class PyPicklerUnpicklerObjectTests(AbstractPicklerUnpicklerObjectTests):
unpickler_class = pickle._Unpickler
+class PyDispatchTableTests(AbstractDispatchTableTests):
+ pickler_class = pickle._Pickler
+ def get_dispatch_table(self):
+ return pickle.dispatch_table.copy()
+
+
+class PyChainDispatchTableTests(AbstractDispatchTableTests):
+ pickler_class = pickle._Pickler
+ def get_dispatch_table(self):
+ return collections.ChainMap({}, pickle.dispatch_table)
+
+
if has_c_implementation:
class CPicklerTests(PyPicklerTests):
pickler = _pickle.Pickler
@@ -101,14 +115,26 @@ if has_c_implementation:
pickler_class = _pickle.Pickler
unpickler_class = _pickle.Unpickler
+ class CDispatchTableTests(AbstractDispatchTableTests):
+ pickler_class = pickle.Pickler
+ def get_dispatch_table(self):
+ return pickle.dispatch_table.copy()
+
+ class CChainDispatchTableTests(AbstractDispatchTableTests):
+ pickler_class = pickle.Pickler
+ def get_dispatch_table(self):
+ return collections.ChainMap({}, pickle.dispatch_table)
+
def test_main():
- tests = [PickleTests, PyPicklerTests, PyPersPicklerTests]
+ tests = [PickleTests, PyPicklerTests, PyPersPicklerTests,
+ PyDispatchTableTests, PyChainDispatchTableTests]
if has_c_implementation:
tests.extend([CPicklerTests, CPersPicklerTests,
CDumpPickle_LoadPickle, DumpPickle_CLoadPickle,
PyPicklerUnpicklerObjectTests,
CPicklerUnpicklerObjectTests,
+ CDispatchTableTests, CChainDispatchTableTests,
InMemoryPickleTests])
support.run_unittest(*tests)
support.run_doctest(pickle)
diff --git a/Lib/test/test_pty.py b/Lib/test/test_pty.py
index 4f1251c..ef95268 100644
--- a/Lib/test/test_pty.py
+++ b/Lib/test/test_pty.py
@@ -205,6 +205,7 @@ class SmallPtyTests(unittest.TestCase):
self.orig_stdout_fileno = pty.STDOUT_FILENO
self.orig_pty_select = pty.select
self.fds = [] # A list of file descriptors to close.
+ self.files = []
self.select_rfds_lengths = []
self.select_rfds_results = []
@@ -212,10 +213,15 @@ class SmallPtyTests(unittest.TestCase):
pty.STDIN_FILENO = self.orig_stdin_fileno
pty.STDOUT_FILENO = self.orig_stdout_fileno
pty.select = self.orig_pty_select
+ for file in self.files:
+ try:
+ file.close()
+ except OSError:
+ pass
for fd in self.fds:
try:
os.close(fd)
- except:
+ except OSError:
pass
def _pipe(self):
@@ -223,6 +229,11 @@ class SmallPtyTests(unittest.TestCase):
self.fds.extend(pipe_fds)
return pipe_fds
+ def _socketpair(self):
+ socketpair = socket.socketpair()
+ self.files.extend(socketpair)
+ return socketpair
+
def _mock_select(self, rfds, wfds, xfds):
# This will raise IndexError when no more expected calls exist.
self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
@@ -234,9 +245,8 @@ class SmallPtyTests(unittest.TestCase):
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
- socketpair = socket.socketpair()
+ socketpair = self._socketpair()
masters = [s.fileno() for s in socketpair]
- self.fds.extend(masters)
# Feed data. Smaller than PIPEBUF. These writes will not block.
os.write(masters[1], b'from master')
@@ -263,9 +273,8 @@ class SmallPtyTests(unittest.TestCase):
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
- socketpair = socket.socketpair()
+ socketpair = self._socketpair()
masters = [s.fileno() for s in socketpair]
- self.fds.extend(masters)
os.close(masters[1])
socketpair[1].close()
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index fdeb4c2..6be259b 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -662,7 +662,7 @@ class PendingSignalsTests(unittest.TestCase):
self.wait_helper(signal.SIGALRM, '''
def test(signum):
signal.alarm(1)
- info = signal.sigtimedwait([signum], (10, 1000))
+ info = signal.sigtimedwait([signum], 10.1000)
if info.si_signo != signum:
raise Exception('info.si_signo != %s' % signum)
''')
@@ -675,7 +675,7 @@ class PendingSignalsTests(unittest.TestCase):
def test(signum):
import os
os.kill(os.getpid(), signum)
- info = signal.sigtimedwait([signum], (0, 0))
+ info = signal.sigtimedwait([signum], 0)
if info.si_signo != signum:
raise Exception('info.si_signo != %s' % signum)
''')
@@ -685,7 +685,7 @@ class PendingSignalsTests(unittest.TestCase):
def test_sigtimedwait_timeout(self):
self.wait_helper(signal.SIGALRM, '''
def test(signum):
- received = signal.sigtimedwait([signum], (1, 0))
+ received = signal.sigtimedwait([signum], 1.0)
if received is not None:
raise Exception("received=%r" % (received,))
''')
@@ -694,9 +694,7 @@ class PendingSignalsTests(unittest.TestCase):
'need signal.sigtimedwait()')
def test_sigtimedwait_negative_timeout(self):
signum = signal.SIGALRM
- self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, -1))
- self.assertRaises(ValueError, signal.sigtimedwait, [signum], (0, -1))
- self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, 0))
+ self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
@unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
'need signal.sigwaitinfo()')
diff --git a/Lib/test/test_time.py b/Lib/test/test_time.py
index a89c511..26492c1 100644
--- a/Lib/test/test_time.py
+++ b/Lib/test/test_time.py
@@ -497,12 +497,31 @@ class TestStrftime4dyear(_TestStrftimeYear, _Test4dYear):
pass
+class TestPytime(unittest.TestCase):
+ def test_timespec(self):
+ from _testcapi import pytime_object_to_timespec
+ for obj, timespec in (
+ (0, (0, 0)),
+ (-1, (-1, 0)),
+ (-1.0, (-1, 0)),
+ (-1e-9, (-1, 999999999)),
+ (-1.2, (-2, 800000000)),
+ (1.123456789, (1, 123456789)),
+ ):
+ self.assertEqual(pytime_object_to_timespec(obj), timespec)
+
+ for invalid in (-(2 ** 100), -(2.0 ** 100.0), 2 ** 100, 2.0 ** 100.0):
+ self.assertRaises(OverflowError, pytime_object_to_timespec, invalid)
+
+
+
def test_main():
support.run_unittest(
TimeTestCase,
TestLocale,
TestAsctime4dyear,
- TestStrftime4dyear)
+ TestStrftime4dyear,
+ TestPytime)
if __name__ == "__main__":
test_main()
diff --git a/Lib/test/test_tokenize.py b/Lib/test/test_tokenize.py
index dce3c6e..db87e11 100644
--- a/Lib/test/test_tokenize.py
+++ b/Lib/test/test_tokenize.py
@@ -563,6 +563,18 @@ Non-ascii identifiers
NAME 'grün' (2, 0) (2, 4)
OP '=' (2, 5) (2, 6)
STRING "'green'" (2, 7) (2, 14)
+
+Legacy unicode literals:
+
+ >>> dump_tokens("Örter = u'places'\\ngrün = UR'green'")
+ ENCODING 'utf-8' (0, 0) (0, 0)
+ NAME 'Örter' (1, 0) (1, 5)
+ OP '=' (1, 6) (1, 7)
+ STRING "u'places'" (1, 8) (1, 17)
+ NEWLINE '\\n' (1, 17) (1, 18)
+ NAME 'grün' (2, 0) (2, 4)
+ OP '=' (2, 5) (2, 6)
+ STRING "UR'green'" (2, 7) (2, 16)
"""
from test import support
diff --git a/Lib/test/test_weakset.py b/Lib/test/test_weakset.py
index 35db7a6..4d3878f 100644
--- a/Lib/test/test_weakset.py
+++ b/Lib/test/test_weakset.py
@@ -28,6 +28,12 @@ class TestWeakSet(unittest.TestCase):
# need to keep references to them
self.items = [ustr(c) for c in ('a', 'b', 'c')]
self.items2 = [ustr(c) for c in ('x', 'y', 'z')]
+ self.ab_items = [ustr(c) for c in 'ab']
+ self.abcde_items = [ustr(c) for c in 'abcde']
+ self.def_items = [ustr(c) for c in 'def']
+ self.ab_weakset = WeakSet(self.ab_items)
+ self.abcde_weakset = WeakSet(self.abcde_items)
+ self.def_weakset = WeakSet(self.def_items)
self.letters = [ustr(c) for c in string.ascii_letters]
self.s = WeakSet(self.items)
self.d = dict.fromkeys(self.items)
@@ -71,6 +77,11 @@ class TestWeakSet(unittest.TestCase):
x = WeakSet(self.items + self.items2)
c = C(self.items2)
self.assertEqual(self.s.union(c), x)
+ del c
+ self.assertEqual(len(u), len(self.items) + len(self.items2))
+ self.items2.pop()
+ gc.collect()
+ self.assertEqual(len(u), len(self.items) + len(self.items2))
def test_or(self):
i = self.s.union(self.items2)
@@ -78,14 +89,19 @@ class TestWeakSet(unittest.TestCase):
self.assertEqual(self.s | frozenset(self.items2), i)
def test_intersection(self):
- i = self.s.intersection(self.items2)
+ s = WeakSet(self.letters)
+ i = s.intersection(self.items2)
for c in self.letters:
- self.assertEqual(c in i, c in self.d and c in self.items2)
- self.assertEqual(self.s, WeakSet(self.items))
+ self.assertEqual(c in i, c in self.items2 and c in self.letters)
+ self.assertEqual(s, WeakSet(self.letters))
self.assertEqual(type(i), WeakSet)
for C in set, frozenset, dict.fromkeys, list, tuple:
x = WeakSet([])
- self.assertEqual(self.s.intersection(C(self.items2)), x)
+ self.assertEqual(i.intersection(C(self.items)), x)
+ self.assertEqual(len(i), len(self.items2))
+ self.items2.pop()
+ gc.collect()
+ self.assertEqual(len(i), len(self.items2))
def test_isdisjoint(self):
self.assertTrue(self.s.isdisjoint(WeakSet(self.items2)))
@@ -116,6 +132,10 @@ class TestWeakSet(unittest.TestCase):
self.assertEqual(self.s, WeakSet(self.items))
self.assertEqual(type(i), WeakSet)
self.assertRaises(TypeError, self.s.symmetric_difference, [[]])
+ self.assertEqual(len(i), len(self.items) + len(self.items2))
+ self.items2.pop()
+ gc.collect()
+ self.assertEqual(len(i), len(self.items) + len(self.items2))
def test_xor(self):
i = self.s.symmetric_difference(self.items2)
@@ -123,22 +143,28 @@ class TestWeakSet(unittest.TestCase):
self.assertEqual(self.s ^ frozenset(self.items2), i)
def test_sub_and_super(self):
- pl, ql, rl = map(lambda s: [ustr(c) for c in s], ['ab', 'abcde', 'def'])
- p, q, r = map(WeakSet, (pl, ql, rl))
- self.assertTrue(p < q)
- self.assertTrue(p <= q)
- self.assertTrue(q <= q)
- self.assertTrue(q > p)
- self.assertTrue(q >= p)
- self.assertFalse(q < r)
- self.assertFalse(q <= r)
- self.assertFalse(q > r)
- self.assertFalse(q >= r)
+ self.assertTrue(self.ab_weakset <= self.abcde_weakset)
+ self.assertTrue(self.abcde_weakset <= self.abcde_weakset)
+ self.assertTrue(self.abcde_weakset >= self.ab_weakset)
+ self.assertFalse(self.abcde_weakset <= self.def_weakset)
+ self.assertFalse(self.abcde_weakset >= self.def_weakset)
self.assertTrue(set('a').issubset('abc'))
self.assertTrue(set('abc').issuperset('a'))
self.assertFalse(set('a').issubset('cbs'))
self.assertFalse(set('cbs').issuperset('a'))
+ def test_lt(self):
+ self.assertTrue(self.ab_weakset < self.abcde_weakset)
+ self.assertFalse(self.abcde_weakset < self.def_weakset)
+ self.assertFalse(self.ab_weakset < self.ab_weakset)
+ self.assertFalse(WeakSet() < WeakSet())
+
+ def test_gt(self):
+ self.assertTrue(self.abcde_weakset > self.ab_weakset)
+ self.assertFalse(self.abcde_weakset > self.def_weakset)
+ self.assertFalse(self.ab_weakset > self.ab_weakset)
+ self.assertFalse(WeakSet() > WeakSet())
+
def test_gc(self):
# Create a nest of cycles to exercise overall ref count check
s = WeakSet(Foo() for i in range(1000))
diff --git a/Lib/test/test_xml_etree.py b/Lib/test/test_xml_etree.py
index 58fdcd4..b9230b7 100644
--- a/Lib/test/test_xml_etree.py
+++ b/Lib/test/test_xml_etree.py
@@ -1855,6 +1855,102 @@ def check_issue10777():
# --------------------------------------------------------------------
+class ElementTreeTest(unittest.TestCase):
+
+ def test_istype(self):
+ self.assertIsInstance(ET.ParseError, type)
+ self.assertIsInstance(ET.QName, type)
+ self.assertIsInstance(ET.ElementTree, type)
+ self.assertIsInstance(ET.Element, type)
+ # XXX issue 14128 with C ElementTree
+ # self.assertIsInstance(ET.TreeBuilder, type)
+ # self.assertIsInstance(ET.XMLParser, type)
+
+ def test_Element_subclass_trivial(self):
+ class MyElement(ET.Element):
+ pass
+
+ mye = MyElement('foo')
+ self.assertIsInstance(mye, ET.Element)
+ self.assertIsInstance(mye, MyElement)
+ self.assertEqual(mye.tag, 'foo')
+
+ def test_Element_subclass_constructor(self):
+ class MyElement(ET.Element):
+ def __init__(self, tag, attrib={}, **extra):
+ super(MyElement, self).__init__(tag + '__', attrib, **extra)
+
+ mye = MyElement('foo', {'a': 1, 'b': 2}, c=3, d=4)
+ self.assertEqual(mye.tag, 'foo__')
+ self.assertEqual(sorted(mye.items()),
+ [('a', 1), ('b', 2), ('c', 3), ('d', 4)])
+
+ def test_Element_subclass_new_method(self):
+ class MyElement(ET.Element):
+ def newmethod(self):
+ return self.tag
+
+ mye = MyElement('joe')
+ self.assertEqual(mye.newmethod(), 'joe')
+
+
+class TreeBuilderTest(unittest.TestCase):
+
+ sample1 = ('<!DOCTYPE html PUBLIC'
+ ' "-//W3C//DTD XHTML 1.0 Transitional//EN"'
+ ' "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">'
+ '<html>text</html>')
+
+ def test_dummy_builder(self):
+ class BaseDummyBuilder:
+ def close(self):
+ return 42
+
+ class DummyBuilder(BaseDummyBuilder):
+ data = start = end = lambda *a: None
+
+ parser = ET.XMLParser(target=DummyBuilder())
+ parser.feed(self.sample1)
+ self.assertEqual(parser.close(), 42)
+
+ parser = ET.XMLParser(target=BaseDummyBuilder())
+ parser.feed(self.sample1)
+ self.assertEqual(parser.close(), 42)
+
+ parser = ET.XMLParser(target=object())
+ parser.feed(self.sample1)
+ self.assertIsNone(parser.close())
+
+
+ @unittest.expectedFailure # XXX issue 14007 with C ElementTree
+ def test_doctype(self):
+ class DoctypeParser:
+ _doctype = None
+
+ def doctype(self, name, pubid, system):
+ self._doctype = (name, pubid, system)
+
+ def close(self):
+ return self._doctype
+
+ parser = ET.XMLParser(target=DoctypeParser())
+ parser.feed(self.sample1)
+
+ self.assertEqual(parser.close(),
+ ('html', '-//W3C//DTD XHTML 1.0 Transitional//EN',
+ 'http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd'))
+
+
+class NoAcceleratorTest(unittest.TestCase):
+
+ # Test that the C accelerator was not imported for pyET
+ def test_correct_import_pyET(self):
+ self.assertEqual(pyET.Element.__module__, 'xml.etree.ElementTree')
+ self.assertEqual(pyET.SubElement.__module__, 'xml.etree.ElementTree')
+
+# --------------------------------------------------------------------
+
+
class CleanContext(object):
"""Provide default namespace mapping and path cache."""
checkwarnings = None
@@ -1873,10 +1969,7 @@ class CleanContext(object):
("This method will be removed in future versions. "
"Use .+ instead.", DeprecationWarning),
("This method will be removed in future versions. "
- "Use .+ instead.", PendingDeprecationWarning),
- # XMLParser.doctype() is deprecated.
- ("This method of XMLParser is deprecated. Define doctype.. "
- "method on the TreeBuilder target.", DeprecationWarning))
+ "Use .+ instead.", PendingDeprecationWarning))
self.checkwarnings = support.check_warnings(*deprecations, quiet=quiet)
def __enter__(self):
@@ -1898,19 +1991,18 @@ class CleanContext(object):
self.checkwarnings.__exit__(*args)
-class TestAcceleratorNotImported(unittest.TestCase):
- # Test that the C accelerator was not imported for pyET
- def test_correct_import_pyET(self):
- self.assertEqual(pyET.Element.__module__, 'xml.etree.ElementTree')
-
-
def test_main(module=pyET):
from test import test_xml_etree
# The same doctests are used for both the Python and the C implementations
test_xml_etree.ET = module
- support.run_unittest(TestAcceleratorNotImported)
+ test_classes = [ElementTreeTest, TreeBuilderTest]
+ if module is pyET:
+ # Run the tests specific to the Python implementation
+ test_classes += [NoAcceleratorTest]
+
+ support.run_unittest(*test_classes)
# XXX the C module should give the same warnings as the Python module
with CleanContext(quiet=(module is not pyET)):
diff --git a/Lib/test/test_xml_etree_c.py b/Lib/test/test_xml_etree_c.py
index a73d0c4..10416d2 100644
--- a/Lib/test/test_xml_etree_c.py
+++ b/Lib/test/test_xml_etree_c.py
@@ -47,13 +47,20 @@ class MiscTests(unittest.TestCase):
data = None
@unittest.skipUnless(cET, 'requires _elementtree')
+class TestAliasWorking(unittest.TestCase):
+ # Test that the cET alias module is alive
+ def test_alias_working(self):
+ e = cET_alias.Element('foo')
+ self.assertEqual(e.tag, 'foo')
+
+@unittest.skipUnless(cET, 'requires _elementtree')
class TestAcceleratorImported(unittest.TestCase):
# Test that the C accelerator was imported, as expected
def test_correct_import_cET(self):
- self.assertEqual(cET.Element.__module__, '_elementtree')
+ self.assertEqual(cET.SubElement.__module__, '_elementtree')
def test_correct_import_cET_alias(self):
- self.assertEqual(cET_alias.Element.__module__, '_elementtree')
+ self.assertEqual(cET_alias.SubElement.__module__, '_elementtree')
def test_main():
@@ -61,13 +68,15 @@ def test_main():
# Run the tests specific to the C implementation
support.run_doctest(test_xml_etree_c, verbosity=True)
-
- support.run_unittest(MiscTests, TestAcceleratorImported)
+ support.run_unittest(
+ MiscTests,
+ TestAliasWorking,
+ TestAcceleratorImported
+ )
# Run the same test suite as the Python module
test_xml_etree.test_main(module=cET)
- # Exercise the deprecated alias
- test_xml_etree.test_main(module=cET_alias)
+
if __name__ == '__main__':
test_main()