summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/json_tests/test_indent.py4
-rw-r--r--Lib/test/mock_socket.py8
-rwxr-xr-xLib/test/regrtest.py364
-rw-r--r--Lib/test/support.py20
-rw-r--r--Lib/test/test_abc.py13
-rw-r--r--Lib/test/test_argparse.py32
-rw-r--r--Lib/test/test_ast.py13
-rw-r--r--Lib/test/test_builtin.py5
-rw-r--r--Lib/test/test_bytes.py23
-rw-r--r--Lib/test/test_bz2.py136
-rw-r--r--Lib/test/test_cmd_line.py19
-rw-r--r--Lib/test/test_cmd_line_script.py9
-rw-r--r--Lib/test/test_complex.py2
-rw-r--r--Lib/test/test_concurrent_futures.py22
-rw-r--r--Lib/test/test_devpoll.py2
-rw-r--r--Lib/test/test_doctest.py47
-rw-r--r--Lib/test/test_enumerate.py10
-rw-r--r--Lib/test/test_epoll.py7
-rw-r--r--Lib/test/test_exceptions.py17
-rw-r--r--Lib/test/test_fcntl.py15
-rw-r--r--Lib/test/test_fileio.py2
-rw-r--r--Lib/test/test_format.py16
-rw-r--r--Lib/test/test_fractions.py32
-rw-r--r--Lib/test/test_functools.py208
-rw-r--r--Lib/test/test_gc.py26
-rw-r--r--Lib/test/test_generators.py21
-rw-r--r--Lib/test/test_genericpath.py24
-rw-r--r--Lib/test/test_hashlib.py127
-rw-r--r--Lib/test/test_httpservers.py11
-rw-r--r--Lib/test/test_imp.py14
-rw-r--r--Lib/test/test_importlib/import_/test_fromlist.py2
-rw-r--r--Lib/test/test_importlib/source/test_abc_loader.py518
-rw-r--r--Lib/test/test_importlib/test_abc.py18
-rw-r--r--Lib/test/test_io.py10
-rw-r--r--Lib/test/test_iterlen.py62
-rw-r--r--Lib/test/test_itertools.py5
-rw-r--r--Lib/test/test_logging.py165
-rw-r--r--Lib/test/test_mailbox.py2
-rw-r--r--Lib/test/test_mimetypes.py2
-rw-r--r--Lib/test/test_mmap.py4
-rw-r--r--Lib/test/test_multiprocessing.py172
-rw-r--r--Lib/test/test_openpty.py2
-rw-r--r--Lib/test/test_operator.py25
-rw-r--r--Lib/test/test_os.py98
-rw-r--r--Lib/test/test_pep277.py4
-rw-r--r--Lib/test/test_poll.py21
-rw-r--r--Lib/test/test_poplib.py157
-rw-r--r--Lib/test/test_random.py33
-rw-r--r--Lib/test/test_range.py2
-rw-r--r--Lib/test/test_select.py4
-rw-r--r--Lib/test/test_set.py2
-rw-r--r--Lib/test/test_shelve.py13
-rw-r--r--Lib/test/test_shutil.py17
-rw-r--r--Lib/test/test_signal.py9
-rw-r--r--Lib/test/test_site.py6
-rw-r--r--Lib/test/test_slice.py114
-rw-r--r--Lib/test/test_smtplib.py10
-rw-r--r--Lib/test/test_socketserver.py16
-rw-r--r--Lib/test/test_subprocess.py18
-rw-r--r--Lib/test/test_sundry.py34
-rw-r--r--Lib/test/test_syntax.py4
-rw-r--r--Lib/test/test_sys.py34
-rw-r--r--Lib/test/test_sysconfig.py44
-rw-r--r--Lib/test/test_tarfile.py4
-rw-r--r--Lib/test/test_tempfile.py8
-rw-r--r--Lib/test/test_thread.py2
-rw-r--r--Lib/test/test_threading.py6
-rw-r--r--Lib/test/test_threadsignals.py2
-rw-r--r--Lib/test/test_unicode.py30
-rw-r--r--Lib/test/test_unicodedata.py2
-rw-r--r--Lib/test/test_urllib.py92
-rw-r--r--Lib/test/test_urllib2.py44
-rw-r--r--Lib/test/test_urllib2net.py2
-rw-r--r--Lib/test/test_venv.py59
-rw-r--r--Lib/test/test_wait3.py12
-rw-r--r--Lib/test/test_weakref.py140
-rw-r--r--Lib/test/test_winreg.py12
77 files changed, 1880 insertions, 1411 deletions
diff --git a/Lib/test/json_tests/test_indent.py b/Lib/test/json_tests/test_indent.py
index 4c70646..4eb4f89 100644
--- a/Lib/test/json_tests/test_indent.py
+++ b/Lib/test/json_tests/test_indent.py
@@ -32,6 +32,8 @@ class TestIndent:
d1 = self.dumps(h)
d2 = self.dumps(h, indent=2, sort_keys=True, separators=(',', ': '))
d3 = self.dumps(h, indent='\t', sort_keys=True, separators=(',', ': '))
+ d4 = self.dumps(h, indent=2, sort_keys=True)
+ d5 = self.dumps(h, indent='\t', sort_keys=True)
h1 = self.loads(d1)
h2 = self.loads(d2)
@@ -42,6 +44,8 @@ class TestIndent:
self.assertEqual(h3, h)
self.assertEqual(d2, expect.expandtabs(2))
self.assertEqual(d3, expect)
+ self.assertEqual(d4, d2)
+ self.assertEqual(d5, d3)
def test_indent0(self):
h = {3: 1}
diff --git a/Lib/test/mock_socket.py b/Lib/test/mock_socket.py
index d09e78c..8ef0ec8 100644
--- a/Lib/test/mock_socket.py
+++ b/Lib/test/mock_socket.py
@@ -140,12 +140,8 @@ def gethostbyname(name):
return ""
-class gaierror(Exception):
- pass
-
-
-class error(Exception):
- pass
+gaierror = socket_module.gaierror
+error = socket_module.error
# Constants
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py
index 8f3b689..5d9d82d 100755
--- a/Lib/test/regrtest.py
+++ b/Lib/test/regrtest.py
@@ -615,7 +615,7 @@ def main(tests=None, testdir=None, verbose=0, quiet=False,
sys.exit(2)
from queue import Queue
from subprocess import Popen, PIPE
- debug_output_pat = re.compile(r"\[\d+ refs\]$")
+ debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$")
output = Queue()
pending = MultiprocessTests(tests)
opt_args = support.args_from_interpreter_flags()
@@ -763,20 +763,6 @@ def main(tests=None, testdir=None, verbose=0, quiet=False,
print(count(len(skipped), "test"), "skipped:")
printlist(skipped)
- e = _ExpectedSkips()
- plat = sys.platform
- if e.isvalid():
- surprise = set(skipped) - e.getexpected() - set(resource_denieds)
- if surprise:
- print(count(len(surprise), "skip"), \
- "unexpected on", plat + ":")
- printlist(surprise)
- else:
- print("Those skips are all expected on", plat + ".")
- else:
- print("Ask someone to teach regrtest.py about which tests are")
- print("expected to get skipped on", plat + ".")
-
if verbose2 and bad:
print("Re-running failed tests in verbose mode")
for test in bad:
@@ -1210,8 +1196,7 @@ def runtest_inner(test, verbose, quiet,
abstest = 'test.' + test
with saved_test_environment(test, verbose, quiet) as environment:
start_time = time.time()
- the_package = __import__(abstest, globals(), locals(), [])
- the_module = getattr(the_package, test)
+ the_module = importlib.import_module(abstest)
# If the test has a test_main, that will run the appropriate
# tests. If not, use normal unittest test loading.
test_runner = getattr(the_module, "test_main", None)
@@ -1335,33 +1320,50 @@ def dash_R(the_module, test, indirect_test, huntrleaks):
del sys.modules[the_module.__name__]
exec('import ' + the_module.__name__)
- deltas = []
nwarmup, ntracked, fname = huntrleaks
fname = os.path.join(support.SAVEDCWD, fname)
repcount = nwarmup + ntracked
+ rc_deltas = [0] * repcount
+ alloc_deltas = [0] * repcount
+
print("beginning", repcount, "repetitions", file=sys.stderr)
print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr)
sys.stderr.flush()
- dash_R_cleanup(fs, ps, pic, zdc, abcs)
for i in range(repcount):
- rc_before = sys.gettotalrefcount()
run_the_test()
+ alloc_after, rc_after = dash_R_cleanup(fs, ps, pic, zdc, abcs)
sys.stderr.write('.')
sys.stderr.flush()
- dash_R_cleanup(fs, ps, pic, zdc, abcs)
- rc_after = sys.gettotalrefcount()
if i >= nwarmup:
- deltas.append(rc_after - rc_before)
+ rc_deltas[i] = rc_after - rc_before
+ alloc_deltas[i] = alloc_after - alloc_before
+ alloc_before, rc_before = alloc_after, rc_after
print(file=sys.stderr)
- if any(deltas):
- msg = '%s leaked %s references, sum=%s' % (test, deltas, sum(deltas))
- print(msg, file=sys.stderr)
- sys.stderr.flush()
- with open(fname, "a") as refrep:
- print(msg, file=refrep)
- refrep.flush()
- return True
- return False
+ # These checkers return False on success, True on failure
+ def check_rc_deltas(deltas):
+ return any(deltas)
+ def check_alloc_deltas(deltas):
+ # At least 1/3rd of 0s
+ if 3 * deltas.count(0) < len(deltas):
+ return True
+ # Nothing else than 1s, 0s and -1s
+ if not set(deltas) <= {1,0,-1}:
+ return True
+ return False
+ failed = False
+ for deltas, item_name, checker in [
+ (rc_deltas, 'references', check_rc_deltas),
+ (alloc_deltas, 'memory blocks', check_alloc_deltas)]:
+ if checker(deltas):
+ msg = '%s leaked %s %s, sum=%s' % (
+ test, deltas[nwarmup:], item_name, sum(deltas))
+ print(msg, file=sys.stderr)
+ sys.stderr.flush()
+ with open(fname, "a") as refrep:
+ print(msg, file=refrep)
+ refrep.flush()
+ failed = True
+ return failed
def dash_R_cleanup(fs, ps, pic, zdc, abcs):
import gc, copyreg
@@ -1427,8 +1429,11 @@ def dash_R_cleanup(fs, ps, pic, zdc, abcs):
else:
ctypes._reset_cache()
- # Collect cyclic trash.
+ # Collect cyclic trash and read memory statistics immediately after.
+ func1 = sys.getallocatedblocks
+ func2 = sys.gettotalrefcount
gc.collect()
+ return func1(), func2()
def warm_caches():
# char cache
@@ -1471,299 +1476,6 @@ def printlist(x, width=70, indent=4):
print(fill(' '.join(str(elt) for elt in sorted(x)), width,
initial_indent=blanks, subsequent_indent=blanks))
-# Map sys.platform to a string containing the basenames of tests
-# expected to be skipped on that platform.
-#
-# Special cases:
-# test_pep277
-# The _ExpectedSkips constructor adds this to the set of expected
-# skips if not os.path.supports_unicode_filenames.
-# test_timeout
-# Controlled by test_timeout.skip_expected. Requires the network
-# resource and a socket module.
-#
-# Tests that are expected to be skipped everywhere except on one platform
-# are also handled separately.
-
-_expectations = (
- ('win32',
- """
- test__locale
- test_crypt
- test_curses
- test_dbm
- test_devpoll
- test_fcntl
- test_fork1
- test_epoll
- test_dbm_gnu
- test_dbm_ndbm
- test_grp
- test_ioctl
- test_largefile
- test_kqueue
- test_openpty
- test_ossaudiodev
- test_pipes
- test_poll
- test_posix
- test_pty
- test_pwd
- test_resource
- test_signal
- test_syslog
- test_threadsignals
- test_wait3
- test_wait4
- """),
- ('linux',
- """
- test_curses
- test_devpoll
- test_largefile
- test_kqueue
- test_ossaudiodev
- """),
- ('unixware',
- """
- test_epoll
- test_largefile
- test_kqueue
- test_minidom
- test_openpty
- test_pyexpat
- test_sax
- test_sundry
- """),
- ('openunix',
- """
- test_epoll
- test_largefile
- test_kqueue
- test_minidom
- test_openpty
- test_pyexpat
- test_sax
- test_sundry
- """),
- ('sco_sv',
- """
- test_asynchat
- test_fork1
- test_epoll
- test_gettext
- test_largefile
- test_locale
- test_kqueue
- test_minidom
- test_openpty
- test_pyexpat
- test_queue
- test_sax
- test_sundry
- test_thread
- test_threaded_import
- test_threadedtempfile
- test_threading
- """),
- ('darwin',
- """
- test__locale
- test_curses
- test_devpoll
- test_epoll
- test_dbm_gnu
- test_gdb
- test_largefile
- test_locale
- test_minidom
- test_ossaudiodev
- test_poll
- """),
- ('sunos',
- """
- test_curses
- test_dbm
- test_epoll
- test_kqueue
- test_dbm_gnu
- test_gzip
- test_openpty
- test_zipfile
- test_zlib
- """),
- ('hp-ux',
- """
- test_curses
- test_epoll
- test_dbm_gnu
- test_gzip
- test_largefile
- test_locale
- test_kqueue
- test_minidom
- test_openpty
- test_pyexpat
- test_sax
- test_zipfile
- test_zlib
- """),
- ('cygwin',
- """
- test_curses
- test_dbm
- test_devpoll
- test_epoll
- test_ioctl
- test_kqueue
- test_largefile
- test_locale
- test_ossaudiodev
- test_socketserver
- """),
- ('os2emx',
- """
- test_audioop
- test_curses
- test_epoll
- test_kqueue
- test_largefile
- test_mmap
- test_openpty
- test_ossaudiodev
- test_pty
- test_resource
- test_signal
- """),
- ('freebsd',
- """
- test_devpoll
- test_epoll
- test_dbm_gnu
- test_locale
- test_ossaudiodev
- test_pep277
- test_pty
- test_socketserver
- test_tcl
- test_tk
- test_ttk_guionly
- test_ttk_textonly
- test_timeout
- test_urllibnet
- test_multiprocessing
- """),
- ('aix',
- """
- test_bz2
- test_epoll
- test_dbm_gnu
- test_gzip
- test_kqueue
- test_ossaudiodev
- test_tcl
- test_tk
- test_ttk_guionly
- test_ttk_textonly
- test_zipimport
- test_zlib
- """),
- ('openbsd',
- """
- test_ctypes
- test_devpoll
- test_epoll
- test_dbm_gnu
- test_locale
- test_normalization
- test_ossaudiodev
- test_pep277
- test_tcl
- test_tk
- test_ttk_guionly
- test_ttk_textonly
- test_multiprocessing
- """),
- ('netbsd',
- """
- test_ctypes
- test_curses
- test_devpoll
- test_epoll
- test_dbm_gnu
- test_locale
- test_ossaudiodev
- test_pep277
- test_tcl
- test_tk
- test_ttk_guionly
- test_ttk_textonly
- test_multiprocessing
- """),
-)
-
-class _ExpectedSkips:
- def __init__(self):
- import os.path
- from test import test_timeout
-
- self.valid = False
- expected = None
- for item in _expectations:
- if sys.platform.startswith(item[0]):
- expected = item[1]
- break
- if expected is not None:
- self.expected = set(expected.split())
-
- # These are broken tests, for now skipped on every platform.
- # XXX Fix these!
- self.expected.add('test_nis')
-
- # expected to be skipped on every platform, even Linux
- if not os.path.supports_unicode_filenames:
- self.expected.add('test_pep277')
-
- # doctest, profile and cProfile tests fail when the codec for the
- # fs encoding isn't built in because PyUnicode_Decode() adds two
- # calls into Python.
- encs = ("utf-8", "latin-1", "ascii", "mbcs", "utf-16", "utf-32")
- if sys.getfilesystemencoding().lower() not in encs:
- self.expected.add('test_profile')
- self.expected.add('test_cProfile')
- self.expected.add('test_doctest')
-
- if test_timeout.skip_expected:
- self.expected.add('test_timeout')
-
- if sys.platform != "win32":
- # test_sqlite is only reliable on Windows where the library
- # is distributed with Python
- WIN_ONLY = {"test_unicode_file", "test_winreg",
- "test_winsound", "test_startfile",
- "test_sqlite", "test_msilib"}
- self.expected |= WIN_ONLY
-
- if sys.platform != 'sunos5':
- self.expected.add('test_nis')
-
- if support.python_is_optimized():
- self.expected.add("test_gdb")
-
- self.valid = True
-
- def isvalid(self):
- "Return true iff _ExpectedSkips knows about the current platform."
- return self.valid
-
- def getexpected(self):
- """Return set of test names we expect to skip on current platform.
-
- self.isvalid() must be true.
- """
-
- assert self.isvalid()
- return self.expected
def _make_temp_dir_for_build(TEMPDIR):
# When tests are run from the Python build directory, it is best practice
diff --git a/Lib/test/support.py b/Lib/test/support.py
index d0a37ea..b98f2bf 100644
--- a/Lib/test/support.py
+++ b/Lib/test/support.py
@@ -93,7 +93,8 @@ def _ignore_deprecated_imports(ignore=True):
"""Context manager to suppress package and module deprecation
warnings when importing them.
- If ignore is False, this context manager has no effect."""
+ If ignore is False, this context manager has no effect.
+ """
if ignore:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", ".+ (module|package)",
@@ -103,23 +104,29 @@ def _ignore_deprecated_imports(ignore=True):
yield
-def import_module(name, deprecated=False):
+def import_module(name, deprecated=False, *, required_on=()):
"""Import and return the module to be tested, raising SkipTest if
it is not available.
If deprecated is True, any module or package deprecation messages
- will be suppressed."""
+ will be suppressed. If a module is required on a platform but optional for
+ others, set required_on to an iterable of platform prefixes which will be
+ compared against sys.platform.
+ """
with _ignore_deprecated_imports(deprecated):
try:
return importlib.import_module(name)
except ImportError as msg:
+ if sys.platform.startswith(tuple(required_on)):
+ raise
raise unittest.SkipTest(str(msg))
def _save_and_remove_module(name, orig_modules):
"""Helper function to save and remove a module from sys.modules
- Raise ImportError if the module can't be imported."""
+ Raise ImportError if the module can't be imported.
+ """
# try to import the module and raise an error if it can't be imported
if name not in sys.modules:
__import__(name)
@@ -710,6 +717,9 @@ for name in (
b'\xae\xd5'
# undecodable from UTF-8 (UNIX and Mac OS X)
b'\xed\xb2\x80', b'\xed\xb4\x80',
+ # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
+ # cp1253, cp1254, cp1255, cp1257, cp1258
+ b'\x81\x98',
):
try:
name.decode(TESTFN_ENCODING)
@@ -1762,7 +1772,7 @@ def strip_python_stderr(stderr):
This will typically be run on the result of the communicate() method
of a subprocess.Popen object.
"""
- stderr = re.sub(br"\[\d+ refs\]\r?\n?", b"", stderr).strip()
+ stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip()
return stderr
def args_from_interpreter_flags():
diff --git a/Lib/test/test_abc.py b/Lib/test/test_abc.py
index 653c957..3498524 100644
--- a/Lib/test/test_abc.py
+++ b/Lib/test/test_abc.py
@@ -96,6 +96,19 @@ class TestLegacyAPI(unittest.TestCase):
class TestABC(unittest.TestCase):
+ def test_ABC_helper(self):
+ # create an ABC using the helper class and perform basic checks
+ class C(abc.ABC):
+ @classmethod
+ @abc.abstractmethod
+ def foo(cls): return cls.__name__
+ self.assertEqual(type(C), abc.ABCMeta)
+ self.assertRaises(TypeError, C)
+ class D(C):
+ @classmethod
+ def foo(cls): return super().foo()
+ self.assertEqual(D.foo(), 'D')
+
def test_abstractmethod_basics(self):
@abc.abstractmethod
def foo(self): pass
diff --git a/Lib/test/test_argparse.py b/Lib/test/test_argparse.py
index c06c940..00cde2e 100644
--- a/Lib/test/test_argparse.py
+++ b/Lib/test/test_argparse.py
@@ -14,6 +14,7 @@ import argparse
from io import StringIO
from test import support
+from unittest import mock
class StdIOBuffer(StringIO):
pass
@@ -1421,6 +1422,19 @@ class TestFileTypeRepr(TestCase):
type = argparse.FileType('wb', 1)
self.assertEqual("FileType('wb', 1)", repr(type))
+ def test_r_latin(self):
+ type = argparse.FileType('r', encoding='latin_1')
+ self.assertEqual("FileType('r', encoding='latin_1')", repr(type))
+
+ def test_w_big5_ignore(self):
+ type = argparse.FileType('w', encoding='big5', errors='ignore')
+ self.assertEqual("FileType('w', encoding='big5', errors='ignore')",
+ repr(type))
+
+ def test_r_1_replace(self):
+ type = argparse.FileType('r', 1, errors='replace')
+ self.assertEqual("FileType('r', 1, errors='replace')", repr(type))
+
class RFile(object):
seen = {}
@@ -1557,6 +1571,24 @@ class TestFileTypeWB(TempDirMixin, ParserTestCase):
]
+class TestFileTypeOpenArgs(TestCase):
+ """Test that open (the builtin) is correctly called"""
+
+ def test_open_args(self):
+ FT = argparse.FileType
+ cases = [
+ (FT('rb'), ('rb', -1, None, None)),
+ (FT('w', 1), ('w', 1, None, None)),
+ (FT('w', errors='replace'), ('w', -1, None, 'replace')),
+ (FT('wb', encoding='big5'), ('wb', -1, 'big5', None)),
+ (FT('w', 0, 'l1', 'strict'), ('w', 0, 'l1', 'strict')),
+ ]
+ with mock.patch('builtins.open') as m:
+ for type, args in cases:
+ type('foo')
+ m.assert_called_with('foo', *args)
+
+
class TestTypeCallable(ParserTestCase):
"""Test some callables as option/argument types"""
diff --git a/Lib/test/test_ast.py b/Lib/test/test_ast.py
index dc24126..36907b4 100644
--- a/Lib/test/test_ast.py
+++ b/Lib/test/test_ast.py
@@ -928,6 +928,9 @@ class ASTValidatorTests(unittest.TestCase):
def test_tuple(self):
self._sequence(ast.Tuple)
+ def test_nameconstant(self):
+ self.expr(ast.NameConstant(4), "singleton must be True, False, or None")
+
def test_stdlib_validates(self):
stdlib = os.path.dirname(ast.__file__)
tests = [fn for fn in os.listdir(stdlib) if fn.endswith(".py")]
@@ -959,13 +962,13 @@ def main():
#### EVERYTHING BELOW IS GENERATED #####
exec_results = [
-('Module', [('Expr', (1, 0), ('Name', (1, 0), 'None', ('Load',)))]),
+('Module', [('Expr', (1, 0), ('NameConstant', (1, 0), None))]),
('Module', [('FunctionDef', (1, 0), 'f', ('arguments', [], None, None, [], None, None, [], []), [('Pass', (1, 9))], [], None)]),
('Module', [('FunctionDef', (1, 0), 'f', ('arguments', [('arg', 'a', None)], None, None, [], None, None, [], []), [('Pass', (1, 10))], [], None)]),
('Module', [('FunctionDef', (1, 0), 'f', ('arguments', [('arg', 'a', None)], None, None, [], None, None, [('Num', (1, 8), 0)], []), [('Pass', (1, 12))], [], None)]),
('Module', [('FunctionDef', (1, 0), 'f', ('arguments', [], 'args', None, [], None, None, [], []), [('Pass', (1, 14))], [], None)]),
('Module', [('FunctionDef', (1, 0), 'f', ('arguments', [], None, None, [], 'kwargs', None, [], []), [('Pass', (1, 17))], [], None)]),
-('Module', [('FunctionDef', (1, 0), 'f', ('arguments', [('arg', 'a', None), ('arg', 'b', None), ('arg', 'c', None), ('arg', 'd', None), ('arg', 'e', None)], 'args', None, [], 'kwargs', None, [('Num', (1, 11), 1), ('Name', (1, 16), 'None', ('Load',)), ('List', (1, 24), [], ('Load',)), ('Dict', (1, 30), [], [])], []), [('Pass', (1, 52))], [], None)]),
+('Module', [('FunctionDef', (1, 0), 'f', ('arguments', [('arg', 'a', None), ('arg', 'b', None), ('arg', 'c', None), ('arg', 'd', None), ('arg', 'e', None)], 'args', None, [], 'kwargs', None, [('Num', (1, 11), 1), ('NameConstant', (1, 16), None), ('List', (1, 24), [], ('Load',)), ('Dict', (1, 30), [], [])], []), [('Pass', (1, 52))], [], None)]),
('Module', [('ClassDef', (1, 0), 'C', [], [], None, None, [('Pass', (1, 8))], [])]),
('Module', [('ClassDef', (1, 0), 'C', [('Name', (1, 8), 'object', ('Load',))], [], None, None, [('Pass', (1, 17))], [])]),
('Module', [('FunctionDef', (1, 0), 'f', ('arguments', [], None, None, [], None, None, [], []), [('Return', (1, 8), ('Num', (1, 15), 1))], [], None)]),
@@ -1002,14 +1005,14 @@ single_results = [
('Interactive', [('Expr', (1, 0), ('BinOp', (1, 0), ('Num', (1, 0), 1), ('Add',), ('Num', (1, 2), 2)))]),
]
eval_results = [
-('Expression', ('Name', (1, 0), 'None', ('Load',))),
+('Expression', ('NameConstant', (1, 0), None)),
('Expression', ('BoolOp', (1, 0), ('And',), [('Name', (1, 0), 'a', ('Load',)), ('Name', (1, 6), 'b', ('Load',))])),
('Expression', ('BinOp', (1, 0), ('Name', (1, 0), 'a', ('Load',)), ('Add',), ('Name', (1, 4), 'b', ('Load',)))),
('Expression', ('UnaryOp', (1, 0), ('Not',), ('Name', (1, 4), 'v', ('Load',)))),
-('Expression', ('Lambda', (1, 0), ('arguments', [], None, None, [], None, None, [], []), ('Name', (1, 7), 'None', ('Load',)))),
+('Expression', ('Lambda', (1, 0), ('arguments', [], None, None, [], None, None, [], []), ('NameConstant', (1, 7), None))),
('Expression', ('Dict', (1, 0), [('Num', (1, 2), 1)], [('Num', (1, 4), 2)])),
('Expression', ('Dict', (1, 0), [], [])),
-('Expression', ('Set', (1, 0), [('Name', (1, 1), 'None', ('Load',))])),
+('Expression', ('Set', (1, 0), [('NameConstant', (1, 1), None)])),
('Expression', ('Dict', (1, 0), [('Num', (2, 6), 1)], [('Num', (4, 10), 2)])),
('Expression', ('ListComp', (1, 1), ('Name', (1, 1), 'a', ('Load',)), [('comprehension', ('Name', (1, 7), 'b', ('Store',)), ('Name', (1, 12), 'c', ('Load',)), [('Name', (1, 17), 'd', ('Load',))])])),
('Expression', ('GeneratorExp', (1, 1), ('Name', (1, 1), 'a', ('Load',)), [('comprehension', ('Name', (1, 7), 'b', ('Store',)), ('Name', (1, 12), 'c', ('Load',)), [('Name', (1, 17), 'd', ('Load',))])])),
diff --git a/Lib/test/test_builtin.py b/Lib/test/test_builtin.py
index 19d7c70..2c5201e 100644
--- a/Lib/test/test_builtin.py
+++ b/Lib/test/test_builtin.py
@@ -462,6 +462,11 @@ class BuiltinTest(unittest.TestCase):
self.assertRaises(TypeError, eval, ())
self.assertRaises(SyntaxError, eval, bom[:2] + b'a')
+ class X:
+ def __getitem__(self, key):
+ raise ValueError
+ self.assertRaises(ValueError, eval, "foo", {}, X())
+
def test_general_eval(self):
# Tests that general mappings can be used for the locals argument
diff --git a/Lib/test/test_bytes.py b/Lib/test/test_bytes.py
index fe6e939..8ae90e4 100644
--- a/Lib/test/test_bytes.py
+++ b/Lib/test/test_bytes.py
@@ -288,8 +288,22 @@ class BaseBytesTest(unittest.TestCase):
self.assertEqual(self.type2test(b"").join(lst), b"abc")
self.assertEqual(self.type2test(b"").join(tuple(lst)), b"abc")
self.assertEqual(self.type2test(b"").join(iter(lst)), b"abc")
- self.assertEqual(self.type2test(b".").join([b"ab", b"cd"]), b"ab.cd")
- # XXX more...
+ dot_join = self.type2test(b".:").join
+ self.assertEqual(dot_join([b"ab", b"cd"]), b"ab.:cd")
+ self.assertEqual(dot_join([memoryview(b"ab"), b"cd"]), b"ab.:cd")
+ self.assertEqual(dot_join([b"ab", memoryview(b"cd")]), b"ab.:cd")
+ self.assertEqual(dot_join([bytearray(b"ab"), b"cd"]), b"ab.:cd")
+ self.assertEqual(dot_join([b"ab", bytearray(b"cd")]), b"ab.:cd")
+ # Stress it with many items
+ seq = [b"abc"] * 1000
+ expected = b"abc" + b".:abc" * 999
+ self.assertEqual(dot_join(seq), expected)
+ # Error handling and cleanup when some item in the middle of the
+ # sequence has the wrong type.
+ with self.assertRaises(TypeError):
+ dot_join([bytearray(b"ab"), "cd", b"ef"])
+ with self.assertRaises(TypeError):
+ dot_join([memoryview(b"ab"), "cd", b"ef"])
def test_count(self):
b = self.type2test(b'mississippi')
@@ -1267,6 +1281,11 @@ class BytearrayPEP3137Test(unittest.TestCase,
self.assertEqual(val, newval)
self.assertTrue(val is not newval,
expr+' returned val on a mutable object')
+ sep = self.marshal(b'')
+ newval = sep.join([val])
+ self.assertEqual(val, newval)
+ self.assertIsNot(val, newval)
+
class FixedStringTest(test.string_tests.BaseTest):
diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py
index f4e81db..24a1813 100644
--- a/Lib/test/test_bz2.py
+++ b/Lib/test/test_bz2.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
from test import support
-from test.support import TESTFN, bigmemtest, _4G
+from test.support import bigmemtest, _4G
import unittest
from io import BytesIO
@@ -18,10 +18,10 @@ except ImportError:
bz2 = support.import_module('bz2')
from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
-has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx")
class BaseTest(unittest.TestCase):
"Base for other testcases."
+
TEXT_LINES = [
b'root:x:0:0:root:/root:/bin/bash\n',
b'bin:x:1:1:bin:/bin:\n',
@@ -49,13 +49,17 @@ class BaseTest(unittest.TestCase):
DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`'
def setUp(self):
- self.filename = TESTFN
+ self.filename = support.TESTFN
def tearDown(self):
if os.path.isfile(self.filename):
os.unlink(self.filename)
- if has_cmdline_bunzip2:
+ if sys.platform == "win32":
+ # bunzip2 isn't available to run on Windows.
+ def decompress(self, data):
+ return bz2.decompress(data)
+ else:
def decompress(self, data):
pop = subprocess.Popen("bunzip2", shell=True,
stdin=subprocess.PIPE,
@@ -69,31 +73,21 @@ class BaseTest(unittest.TestCase):
ret = bz2.decompress(data)
return ret
- else:
- # bunzip2 isn't available to run on Windows.
- def decompress(self, data):
- return bz2.decompress(data)
class BZ2FileTest(BaseTest):
- "Test BZ2File type miscellaneous methods."
+ "Test the BZ2File class."
def createTempFile(self, streams=1):
with open(self.filename, "wb") as f:
f.write(self.DATA * streams)
def testBadArgs(self):
- with self.assertRaises(TypeError):
- BZ2File(123.456)
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", "z")
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", "rx")
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", "rbt")
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", compresslevel=0)
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", compresslevel=10)
+ self.assertRaises(TypeError, BZ2File, 123.456)
+ self.assertRaises(ValueError, BZ2File, "/dev/null", "z")
+ self.assertRaises(ValueError, BZ2File, "/dev/null", "rx")
+ self.assertRaises(ValueError, BZ2File, "/dev/null", "rbt")
+ self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=0)
+ self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=10)
def testRead(self):
self.createTempFile()
@@ -214,9 +208,8 @@ class BZ2FileTest(BaseTest):
self.createTempFile()
bz2f = BZ2File(self.filename)
bz2f.close()
- self.assertRaises(ValueError, bz2f.__next__)
- # This call will deadlock if the above .__next__ call failed to
- # release the lock.
+ self.assertRaises(ValueError, next, bz2f)
+ # This call will deadlock if the above call failed to release the lock.
self.assertRaises(ValueError, bz2f.readlines)
def testWrite(self):
@@ -379,7 +372,7 @@ class BZ2FileTest(BaseTest):
bz2f.close()
self.assertRaises(ValueError, bz2f.seekable)
- bz2f = BZ2File(BytesIO(), mode="w")
+ bz2f = BZ2File(BytesIO(), "w")
try:
self.assertFalse(bz2f.seekable())
finally:
@@ -405,7 +398,7 @@ class BZ2FileTest(BaseTest):
bz2f.close()
self.assertRaises(ValueError, bz2f.readable)
- bz2f = BZ2File(BytesIO(), mode="w")
+ bz2f = BZ2File(BytesIO(), "w")
try:
self.assertFalse(bz2f.readable())
finally:
@@ -422,7 +415,7 @@ class BZ2FileTest(BaseTest):
bz2f.close()
self.assertRaises(ValueError, bz2f.writable)
- bz2f = BZ2File(BytesIO(), mode="w")
+ bz2f = BZ2File(BytesIO(), "w")
try:
self.assertTrue(bz2f.writable())
finally:
@@ -476,7 +469,7 @@ class BZ2FileTest(BaseTest):
# Issue #7205: Using a BZ2File from several threads shouldn't deadlock.
data = b"1" * 2**20
nthreads = 10
- with bz2.BZ2File(self.filename, 'wb') as f:
+ with BZ2File(self.filename, 'wb') as f:
def comp():
for i in range(5):
f.write(data)
@@ -487,28 +480,27 @@ class BZ2FileTest(BaseTest):
t.join()
def testWithoutThreading(self):
- bz2 = support.import_fresh_module("bz2", blocked=("threading",))
- with bz2.BZ2File(self.filename, "wb") as f:
+ module = support.import_fresh_module("bz2", blocked=("threading",))
+ with module.BZ2File(self.filename, "wb") as f:
f.write(b"abc")
- with bz2.BZ2File(self.filename, "rb") as f:
+ with module.BZ2File(self.filename, "rb") as f:
self.assertEqual(f.read(), b"abc")
def testMixedIterationAndReads(self):
self.createTempFile()
linelen = len(self.TEXT_LINES[0])
halflen = linelen // 2
- with bz2.BZ2File(self.filename) as bz2f:
+ with BZ2File(self.filename) as bz2f:
bz2f.read(halflen)
self.assertEqual(next(bz2f), self.TEXT_LINES[0][halflen:])
self.assertEqual(bz2f.read(), self.TEXT[linelen:])
- with bz2.BZ2File(self.filename) as bz2f:
+ with BZ2File(self.filename) as bz2f:
bz2f.readline()
self.assertEqual(next(bz2f), self.TEXT_LINES[1])
self.assertEqual(bz2f.readline(), self.TEXT_LINES[2])
- with bz2.BZ2File(self.filename) as bz2f:
+ with BZ2File(self.filename) as bz2f:
bz2f.readlines()
- with self.assertRaises(StopIteration):
- next(bz2f)
+ self.assertRaises(StopIteration, next, bz2f)
self.assertEqual(bz2f.readlines(), [])
def testMultiStreamOrdering(self):
@@ -576,6 +568,7 @@ class BZ2FileTest(BaseTest):
bz2f.seek(-150, 1)
self.assertEqual(bz2f.read(), self.TEXT[500-150:])
+
class BZ2CompressorTest(BaseTest):
def testCompress(self):
bz2c = BZ2Compressor()
@@ -688,97 +681,102 @@ class CompressDecompressTest(BaseTest):
class OpenTest(BaseTest):
+ "Test the open function."
+
+ def open(self, *args, **kwargs):
+ return bz2.open(*args, **kwargs)
+
def test_binary_modes(self):
- with bz2.open(self.filename, "wb") as f:
+ with self.open(self.filename, "wb") as f:
f.write(self.TEXT)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read())
+ file_data = self.decompress(f.read())
self.assertEqual(file_data, self.TEXT)
- with bz2.open(self.filename, "rb") as f:
+ with self.open(self.filename, "rb") as f:
self.assertEqual(f.read(), self.TEXT)
- with bz2.open(self.filename, "ab") as f:
+ with self.open(self.filename, "ab") as f:
f.write(self.TEXT)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read())
+ file_data = self.decompress(f.read())
self.assertEqual(file_data, self.TEXT * 2)
def test_implicit_binary_modes(self):
# Test implicit binary modes (no "b" or "t" in mode string).
- with bz2.open(self.filename, "w") as f:
+ with self.open(self.filename, "w") as f:
f.write(self.TEXT)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read())
+ file_data = self.decompress(f.read())
self.assertEqual(file_data, self.TEXT)
- with bz2.open(self.filename, "r") as f:
+ with self.open(self.filename, "r") as f:
self.assertEqual(f.read(), self.TEXT)
- with bz2.open(self.filename, "a") as f:
+ with self.open(self.filename, "a") as f:
f.write(self.TEXT)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read())
+ file_data = self.decompress(f.read())
self.assertEqual(file_data, self.TEXT * 2)
def test_text_modes(self):
text = self.TEXT.decode("ascii")
text_native_eol = text.replace("\n", os.linesep)
- with bz2.open(self.filename, "wt") as f:
+ with self.open(self.filename, "wt") as f:
f.write(text)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read()).decode("ascii")
+ file_data = self.decompress(f.read()).decode("ascii")
self.assertEqual(file_data, text_native_eol)
- with bz2.open(self.filename, "rt") as f:
+ with self.open(self.filename, "rt") as f:
self.assertEqual(f.read(), text)
- with bz2.open(self.filename, "at") as f:
+ with self.open(self.filename, "at") as f:
f.write(text)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read()).decode("ascii")
+ file_data = self.decompress(f.read()).decode("ascii")
self.assertEqual(file_data, text_native_eol * 2)
def test_fileobj(self):
- with bz2.open(BytesIO(self.DATA), "r") as f:
+ with self.open(BytesIO(self.DATA), "r") as f:
self.assertEqual(f.read(), self.TEXT)
- with bz2.open(BytesIO(self.DATA), "rb") as f:
+ with self.open(BytesIO(self.DATA), "rb") as f:
self.assertEqual(f.read(), self.TEXT)
text = self.TEXT.decode("ascii")
- with bz2.open(BytesIO(self.DATA), "rt") as f:
+ with self.open(BytesIO(self.DATA), "rt") as f:
self.assertEqual(f.read(), text)
def test_bad_params(self):
# Test invalid parameter combinations.
- with self.assertRaises(ValueError):
- bz2.open(self.filename, "wbt")
- with self.assertRaises(ValueError):
- bz2.open(self.filename, "rb", encoding="utf-8")
- with self.assertRaises(ValueError):
- bz2.open(self.filename, "rb", errors="ignore")
- with self.assertRaises(ValueError):
- bz2.open(self.filename, "rb", newline="\n")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "wbt")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "rb", encoding="utf-8")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "rb", errors="ignore")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "rb", newline="\n")
def test_encoding(self):
# Test non-default encoding.
text = self.TEXT.decode("ascii")
text_native_eol = text.replace("\n", os.linesep)
- with bz2.open(self.filename, "wt", encoding="utf-16-le") as f:
+ with self.open(self.filename, "wt", encoding="utf-16-le") as f:
f.write(text)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read()).decode("utf-16-le")
+ file_data = self.decompress(f.read()).decode("utf-16-le")
self.assertEqual(file_data, text_native_eol)
- with bz2.open(self.filename, "rt", encoding="utf-16-le") as f:
+ with self.open(self.filename, "rt", encoding="utf-16-le") as f:
self.assertEqual(f.read(), text)
def test_encoding_error_handler(self):
# Test with non-default encoding error handler.
- with bz2.open(self.filename, "wb") as f:
+ with self.open(self.filename, "wb") as f:
f.write(b"foo\xffbar")
- with bz2.open(self.filename, "rt", encoding="ascii", errors="ignore") \
+ with self.open(self.filename, "rt", encoding="ascii", errors="ignore") \
as f:
self.assertEqual(f.read(), "foobar")
def test_newline(self):
# Test with explicit newline (universal newline mode disabled).
text = self.TEXT.decode("ascii")
- with bz2.open(self.filename, "wt", newline="\n") as f:
+ with self.open(self.filename, "wt", newline="\n") as f:
f.write(text)
- with bz2.open(self.filename, "rt", newline="\r") as f:
+ with self.open(self.filename, "rt", newline="\r") as f:
self.assertEqual(f.readlines(), [text])
diff --git a/Lib/test/test_cmd_line.py b/Lib/test/test_cmd_line.py
index a89d7e4..6684e51 100644
--- a/Lib/test/test_cmd_line.py
+++ b/Lib/test/test_cmd_line.py
@@ -213,6 +213,23 @@ class CmdLineTest(unittest.TestCase):
self.assertIn(path1.encode('ascii'), out)
self.assertIn(path2.encode('ascii'), out)
+ def test_empty_PYTHONPATH_issue16309(self):
+ # On Posix, it is documented that setting PATH to the
+ # empty string is equivalent to not setting PATH at all,
+ # which is an exception to the rule that in a string like
+ # "/bin::/usr/bin" the empty string in the middle gets
+ # interpreted as '.'
+ code = """if 1:
+ import sys
+ path = ":".join(sys.path)
+ path = path.encode("ascii", "backslashreplace")
+ sys.stdout.buffer.write(path)"""
+ rc1, out1, err1 = assert_python_ok('-c', code, PYTHONPATH="")
+ rc2, out2, err2 = assert_python_ok('-c', code)
+ # regarding to Posix specification, outputs should be equal
+ # for empty and unset PYTHONPATH
+ self.assertEqual(out1, out2)
+
def test_displayhook_unencodable(self):
for encoding in ('ascii', 'latin-1', 'utf-8'):
env = os.environ.copy()
@@ -290,7 +307,7 @@ class CmdLineTest(unittest.TestCase):
rc, out, err = assert_python_ok('-c', code)
self.assertEqual(b'', out)
self.assertRegex(err.decode('ascii', 'ignore'),
- 'Exception OSError: .* ignored')
+ 'Exception ignored in.*\nOSError: .*')
def test_closed_stdout(self):
# Issue #13444: if stdout has been explicitly closed, we should
diff --git a/Lib/test/test_cmd_line_script.py b/Lib/test/test_cmd_line_script.py
index f066204..6051e18 100644
--- a/Lib/test/test_cmd_line_script.py
+++ b/Lib/test/test_cmd_line_script.py
@@ -367,11 +367,10 @@ class CmdLineTest(unittest.TestCase):
# Mac OS X denies the creation of a file with an invalid UTF-8 name.
# Windows allows to create a name with an arbitrary bytes name, but
# Python cannot a undecodable bytes argument to a subprocess.
- #if (support.TESTFN_UNDECODABLE
- #and sys.platform not in ('win32', 'darwin')):
- # name = os.fsdecode(support.TESTFN_UNDECODABLE)
- #elif support.TESTFN_NONASCII:
- if support.TESTFN_NONASCII:
+ if (support.TESTFN_UNDECODABLE
+ and sys.platform not in ('win32', 'darwin')):
+ name = os.fsdecode(support.TESTFN_UNDECODABLE)
+ elif support.TESTFN_NONASCII:
name = support.TESTFN_NONASCII
else:
self.skipTest("need support.TESTFN_NONASCII")
diff --git a/Lib/test/test_complex.py b/Lib/test/test_complex.py
index 6b34ddc..2a85bf4 100644
--- a/Lib/test/test_complex.py
+++ b/Lib/test/test_complex.py
@@ -221,6 +221,8 @@ class ComplexTest(unittest.TestCase):
self.assertRaises(TypeError, complex, OS(None))
self.assertRaises(TypeError, complex, NS(None))
self.assertRaises(TypeError, complex, {})
+ self.assertRaises(TypeError, complex, NS(1.5))
+ self.assertRaises(TypeError, complex, NS(1))
self.assertAlmostEqual(complex("1+10j"), 1+10j)
self.assertAlmostEqual(complex(10), 10+0j)
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 6ae450d..4ad3309 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -15,6 +15,7 @@ import sys
import threading
import time
import unittest
+import weakref
from concurrent import futures
from concurrent.futures._base import (
@@ -52,6 +53,11 @@ def sleep_and_print(t, msg):
sys.stdout.flush()
+class MyObject(object):
+ def my_method(self):
+ pass
+
+
class ExecutorMixin:
worker_count = 5
@@ -396,6 +402,22 @@ class ExecutorTest(unittest.TestCase):
self.executor.map(str, [2] * (self.worker_count + 1))
self.executor.shutdown()
+ @test.support.cpython_only
+ def test_no_stale_references(self):
+ # Issue #16284: check that the executors don't unnecessarily hang onto
+ # references.
+ my_object = MyObject()
+ my_object_collected = threading.Event()
+ my_object_callback = weakref.ref(
+ my_object, lambda obj: my_object_collected.set())
+ # Deliberately discarding the future.
+ self.executor.submit(my_object.my_method)
+ del my_object
+
+ collected = my_object_collected.wait(timeout=5.0)
+ self.assertTrue(collected,
+ "Stale reference not collected within timeout.")
+
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
def test_map_submits_without_iteration(self):
diff --git a/Lib/test/test_devpoll.py b/Lib/test/test_devpoll.py
index bef4e18..ec350cd 100644
--- a/Lib/test/test_devpoll.py
+++ b/Lib/test/test_devpoll.py
@@ -8,7 +8,7 @@ from test.support import TESTFN, run_unittest
try:
select.devpoll
except AttributeError:
- raise unittest.SkipTest("select.devpoll not defined -- skipping test_devpoll")
+ raise unittest.SkipTest("select.devpoll not defined")
def find_ready_matching(ready, flag):
diff --git a/Lib/test/test_doctest.py b/Lib/test/test_doctest.py
index 8f8c7c7..3e36f19 100644
--- a/Lib/test/test_doctest.py
+++ b/Lib/test/test_doctest.py
@@ -1409,8 +1409,40 @@ However, output from `report_start` is not suppressed:
2
TestResults(failed=3, attempted=5)
-For the purposes of REPORT_ONLY_FIRST_FAILURE, unexpected exceptions
-count as failures:
+The FAIL_FAST flag causes the runner to exit after the first failing example,
+so subsequent examples are not even attempted:
+
+ >>> flags = doctest.FAIL_FAST
+ >>> doctest.DocTestRunner(verbose=False, optionflags=flags).run(test)
+ ... # doctest: +ELLIPSIS
+ **********************************************************************
+ File ..., line 5, in f
+ Failed example:
+ print(2) # first failure
+ Expected:
+ 200
+ Got:
+ 2
+ TestResults(failed=1, attempted=2)
+
+Specifying both FAIL_FAST and REPORT_ONLY_FIRST_FAILURE is equivalent to
+FAIL_FAST only:
+
+ >>> flags = doctest.FAIL_FAST | doctest.REPORT_ONLY_FIRST_FAILURE
+ >>> doctest.DocTestRunner(verbose=False, optionflags=flags).run(test)
+ ... # doctest: +ELLIPSIS
+ **********************************************************************
+ File ..., line 5, in f
+ Failed example:
+ print(2) # first failure
+ Expected:
+ 200
+ Got:
+ 2
+ TestResults(failed=1, attempted=2)
+
+For the purposes of both REPORT_ONLY_FIRST_FAILURE and FAIL_FAST, unexpected
+exceptions count as failures:
>>> def f(x):
... r'''
@@ -1437,6 +1469,17 @@ count as failures:
...
ValueError: 2
TestResults(failed=3, attempted=5)
+ >>> flags = doctest.FAIL_FAST
+ >>> doctest.DocTestRunner(verbose=False, optionflags=flags).run(test)
+ ... # doctest: +ELLIPSIS
+ **********************************************************************
+ File ..., line 5, in f
+ Failed example:
+ raise ValueError(2) # first failure
+ Exception raised:
+ ...
+ ValueError: 2
+ TestResults(failed=1, attempted=2)
New option flags can also be registered, via register_optionflag(). Here
we reach into doctest's internals a bit.
diff --git a/Lib/test/test_enumerate.py b/Lib/test/test_enumerate.py
index 2e904cf..a2d18d0 100644
--- a/Lib/test/test_enumerate.py
+++ b/Lib/test/test_enumerate.py
@@ -1,4 +1,5 @@
import unittest
+import operator
import sys
import pickle
@@ -168,15 +169,12 @@ class TestReversed(unittest.TestCase, PickleTest):
x = range(1)
self.assertEqual(type(reversed(x)), type(iter(x)))
- @support.cpython_only
def test_len(self):
- # This is an implementation detail, not an interface requirement
- from test.test_iterlen import len
for s in ('hello', tuple('hello'), list('hello'), range(5)):
- self.assertEqual(len(reversed(s)), len(s))
+ self.assertEqual(operator.length_hint(reversed(s)), len(s))
r = reversed(s)
list(r)
- self.assertEqual(len(r), 0)
+ self.assertEqual(operator.length_hint(r), 0)
class SeqWithWeirdLen:
called = False
def __len__(self):
@@ -187,7 +185,7 @@ class TestReversed(unittest.TestCase, PickleTest):
def __getitem__(self, index):
return index
r = reversed(SeqWithWeirdLen())
- self.assertRaises(ZeroDivisionError, len, r)
+ self.assertRaises(ZeroDivisionError, operator.length_hint, r)
def test_gc(self):
diff --git a/Lib/test/test_epoll.py b/Lib/test/test_epoll.py
index 7f9547f..b9b1492 100644
--- a/Lib/test/test_epoll.py
+++ b/Lib/test/test_epoll.py
@@ -87,6 +87,13 @@ class TestEPoll(unittest.TestCase):
self.assertRaises(TypeError, select.epoll, ['foo'])
self.assertRaises(TypeError, select.epoll, {})
+ def test_context_manager(self):
+ with select.epoll(16) as ep:
+ self.assertGreater(ep.fileno(), 0)
+ self.assertFalse(ep.closed)
+ self.assertTrue(ep.closed)
+ self.assertRaises(ValueError, ep.fileno)
+
def test_add(self):
server, client = self._connected_pair()
diff --git a/Lib/test/test_exceptions.py b/Lib/test/test_exceptions.py
index 1ad7f97..6fa1f5b 100644
--- a/Lib/test/test_exceptions.py
+++ b/Lib/test/test_exceptions.py
@@ -8,8 +8,8 @@ import weakref
import errno
from test.support import (TESTFN, captured_output, check_impl_detail,
- cpython_only, gc_collect, run_unittest, no_tracing,
- unlink)
+ check_warnings, cpython_only, gc_collect, run_unittest,
+ no_tracing, unlink)
class NaiveException(Exception):
def __init__(self, x):
@@ -255,11 +255,11 @@ class ExceptionTests(unittest.TestCase):
'errno' : 'foo', 'strerror' : 'bar'}),
(IOError, ('foo', 'bar', 'baz', 'quux'),
{'args' : ('foo', 'bar', 'baz', 'quux')}),
- (EnvironmentError, ('errnoStr', 'strErrorStr', 'filenameStr'),
+ (OSError, ('errnoStr', 'strErrorStr', 'filenameStr'),
{'args' : ('errnoStr', 'strErrorStr'),
'strerror' : 'strErrorStr', 'errno' : 'errnoStr',
'filename' : 'filenameStr'}),
- (EnvironmentError, (1, 'strErrorStr', 'filenameStr'),
+ (OSError, (1, 'strErrorStr', 'filenameStr'),
{'args' : (1, 'strErrorStr'), 'errno' : 1,
'strerror' : 'strErrorStr', 'filename' : 'filenameStr'}),
(SyntaxError, (), {'msg' : None, 'text' : None,
@@ -409,7 +409,7 @@ class ExceptionTests(unittest.TestCase):
self.assertIsNone(e.__context__)
self.assertIsNone(e.__cause__)
- class MyException(EnvironmentError):
+ class MyException(OSError):
pass
e = MyException()
@@ -947,9 +947,10 @@ class ImportErrorTests(unittest.TestCase):
def test_non_str_argument(self):
# Issue #15778
- arg = b'abc'
- exc = ImportError(arg)
- self.assertEqual(str(arg), str(exc))
+ with check_warnings(('', BytesWarning), quiet=True):
+ arg = b'abc'
+ exc = ImportError(arg)
+ self.assertEqual(str(arg), str(exc))
def test_main():
diff --git a/Lib/test/test_fcntl.py b/Lib/test/test_fcntl.py
index 29af99c..6d9ee0b 100644
--- a/Lib/test/test_fcntl.py
+++ b/Lib/test/test_fcntl.py
@@ -1,7 +1,4 @@
"""Test program for the fcntl C module.
-
-OS/2+EMX doesn't support the file locking operations.
-
"""
import os
import struct
@@ -35,8 +32,6 @@ def get_lockdata():
fcntl.F_WRLCK, 0)
elif sys.platform in ['aix3', 'aix4', 'hp-uxB', 'unixware7']:
lockdata = struct.pack('hhlllii', fcntl.F_WRLCK, 0, 0, 0, 0, 0, 0)
- elif sys.platform in ['os2emx']:
- lockdata = None
else:
lockdata = struct.pack('hh'+start_len+'hh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
if lockdata:
@@ -62,18 +57,16 @@ class TestFcntl(unittest.TestCase):
rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
if verbose:
print('Status from fcntl with O_NONBLOCK: ', rv)
- if sys.platform not in ['os2emx']:
- rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETLKW, lockdata)
- if verbose:
- print('String from fcntl with F_SETLKW: ', repr(rv))
+ rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETLKW, lockdata)
+ if verbose:
+ print('String from fcntl with F_SETLKW: ', repr(rv))
self.f.close()
def test_fcntl_file_descriptor(self):
# again, but pass the file rather than numeric descriptor
self.f = open(TESTFN, 'wb')
rv = fcntl.fcntl(self.f, fcntl.F_SETFL, os.O_NONBLOCK)
- if sys.platform not in ['os2emx']:
- rv = fcntl.fcntl(self.f, fcntl.F_SETLKW, lockdata)
+ rv = fcntl.fcntl(self.f, fcntl.F_SETLKW, lockdata)
self.f.close()
def test_fcntl_64_bit(self):
diff --git a/Lib/test/test_fileio.py b/Lib/test/test_fileio.py
index 0847961..9615c3a 100644
--- a/Lib/test/test_fileio.py
+++ b/Lib/test/test_fileio.py
@@ -285,7 +285,7 @@ class OtherFileTests(unittest.TestCase):
if sys.platform != "win32":
try:
f = _FileIO("/dev/tty", "a")
- except EnvironmentError:
+ except OSError:
# When run in a cron job there just aren't any
# ttys, so skip the test. This also handles other
# OS'es that don't support /dev/tty.
diff --git a/Lib/test/test_format.py b/Lib/test/test_format.py
index b6e2540..e6b0d20 100644
--- a/Lib/test/test_format.py
+++ b/Lib/test/test_format.py
@@ -307,6 +307,22 @@ class FormatTest(unittest.TestCase):
finally:
locale.setlocale(locale.LC_ALL, oldloc)
+ @support.cpython_only
+ def test_optimisations(self):
+ text = "abcde" # 5 characters
+
+ self.assertIs("%s" % text, text)
+ self.assertIs("%.5s" % text, text)
+ self.assertIs("%.10s" % text, text)
+ self.assertIs("%1s" % text, text)
+ self.assertIs("%5s" % text, text)
+
+ self.assertIs("{0}".format(text), text)
+ self.assertIs("{0:s}".format(text), text)
+ self.assertIs("{0:.5s}".format(text), text)
+ self.assertIs("{0:.10s}".format(text), text)
+ self.assertIs("{0:1s}".format(text), text)
+ self.assertIs("{0:5s}".format(text), text)
def test_main():
diff --git a/Lib/test/test_fractions.py b/Lib/test/test_fractions.py
index 1fad921..3336532 100644
--- a/Lib/test/test_fractions.py
+++ b/Lib/test/test_fractions.py
@@ -146,9 +146,10 @@ class FractionTest(unittest.TestCase):
self.assertEqual((0, 1), _components(F(-0.0)))
self.assertEqual((3602879701896397, 36028797018963968),
_components(F(0.1)))
- self.assertRaises(TypeError, F, float('nan'))
- self.assertRaises(TypeError, F, float('inf'))
- self.assertRaises(TypeError, F, float('-inf'))
+ # bug 16469: error types should be consistent with float -> int
+ self.assertRaises(ValueError, F, float('nan'))
+ self.assertRaises(OverflowError, F, float('inf'))
+ self.assertRaises(OverflowError, F, float('-inf'))
def testInitFromDecimal(self):
self.assertEqual((11, 10),
@@ -157,10 +158,11 @@ class FractionTest(unittest.TestCase):
_components(F(Decimal('3.5e-2'))))
self.assertEqual((0, 1),
_components(F(Decimal('.000e20'))))
- self.assertRaises(TypeError, F, Decimal('nan'))
- self.assertRaises(TypeError, F, Decimal('snan'))
- self.assertRaises(TypeError, F, Decimal('inf'))
- self.assertRaises(TypeError, F, Decimal('-inf'))
+ # bug 16469: error types should be consistent with decimal -> int
+ self.assertRaises(ValueError, F, Decimal('nan'))
+ self.assertRaises(ValueError, F, Decimal('snan'))
+ self.assertRaises(OverflowError, F, Decimal('inf'))
+ self.assertRaises(OverflowError, F, Decimal('-inf'))
def testFromString(self):
self.assertEqual((5, 1), _components(F("5")))
@@ -248,14 +250,15 @@ class FractionTest(unittest.TestCase):
inf = 1e1000
nan = inf - inf
+ # bug 16469: error types should be consistent with float -> int
self.assertRaisesMessage(
- TypeError, "Cannot convert inf to Fraction.",
+ OverflowError, "Cannot convert inf to Fraction.",
F.from_float, inf)
self.assertRaisesMessage(
- TypeError, "Cannot convert -inf to Fraction.",
+ OverflowError, "Cannot convert -inf to Fraction.",
F.from_float, -inf)
self.assertRaisesMessage(
- TypeError, "Cannot convert nan to Fraction.",
+ ValueError, "Cannot convert nan to Fraction.",
F.from_float, nan)
def testFromDecimal(self):
@@ -268,17 +271,18 @@ class FractionTest(unittest.TestCase):
self.assertEqual(1 - F(1, 10**30),
F.from_decimal(Decimal("0." + "9" * 30)))
+ # bug 16469: error types should be consistent with decimal -> int
self.assertRaisesMessage(
- TypeError, "Cannot convert Infinity to Fraction.",
+ OverflowError, "Cannot convert Infinity to Fraction.",
F.from_decimal, Decimal("inf"))
self.assertRaisesMessage(
- TypeError, "Cannot convert -Infinity to Fraction.",
+ OverflowError, "Cannot convert -Infinity to Fraction.",
F.from_decimal, Decimal("-inf"))
self.assertRaisesMessage(
- TypeError, "Cannot convert NaN to Fraction.",
+ ValueError, "Cannot convert NaN to Fraction.",
F.from_decimal, Decimal("nan"))
self.assertRaisesMessage(
- TypeError, "Cannot convert sNaN to Fraction.",
+ ValueError, "Cannot convert sNaN to Fraction.",
F.from_decimal, Decimal("snan"))
def testLimitDenominator(self):
diff --git a/Lib/test/test_functools.py b/Lib/test/test_functools.py
index c4910a7..ee1f5db 100644
--- a/Lib/test/test_functools.py
+++ b/Lib/test/test_functools.py
@@ -1,4 +1,3 @@
-import functools
import collections
import sys
import unittest
@@ -7,17 +6,31 @@ from weakref import proxy
import pickle
from random import choice
-@staticmethod
-def PythonPartial(func, *args, **keywords):
- 'Pure Python approximation of partial()'
- def newfunc(*fargs, **fkeywords):
- newkeywords = keywords.copy()
- newkeywords.update(fkeywords)
- return func(*(args + fargs), **newkeywords)
- newfunc.func = func
- newfunc.args = args
- newfunc.keywords = keywords
- return newfunc
+import functools
+
+original_functools = functools
+py_functools = support.import_fresh_module('functools', blocked=['_functools'])
+c_functools = support.import_fresh_module('functools', fresh=['_functools'])
+
+class BaseTest(unittest.TestCase):
+
+ """Base class required for testing C and Py implementations."""
+
+ def setUp(self):
+
+ # The module must be explicitly set so that the proper
+ # interaction between the c module and the python module
+ # can be controlled.
+ self.partial = self.module.partial
+ super(BaseTest, self).setUp()
+
+class BaseTestC(BaseTest):
+ module = c_functools
+
+class BaseTestPy(BaseTest):
+ module = py_functools
+
+PythonPartial = py_functools.partial
def capture(*args, **kw):
"""capture all positional and keyword arguments"""
@@ -27,31 +40,32 @@ def signature(part):
""" return the signature of a partial object """
return (part.func, part.args, part.keywords, part.__dict__)
-class TestPartial(unittest.TestCase):
+class TestPartial(object):
- thetype = functools.partial
+ partial = functools.partial
def test_basic_examples(self):
- p = self.thetype(capture, 1, 2, a=10, b=20)
+ p = self.partial(capture, 1, 2, a=10, b=20)
+ self.assertTrue(callable(p))
self.assertEqual(p(3, 4, b=30, c=40),
((1, 2, 3, 4), dict(a=10, b=30, c=40)))
- p = self.thetype(map, lambda x: x*10)
+ p = self.partial(map, lambda x: x*10)
self.assertEqual(list(p([1,2,3,4])), [10, 20, 30, 40])
def test_attributes(self):
- p = self.thetype(capture, 1, 2, a=10, b=20)
+ p = self.partial(capture, 1, 2, a=10, b=20)
# attributes should be readable
self.assertEqual(p.func, capture)
self.assertEqual(p.args, (1, 2))
self.assertEqual(p.keywords, dict(a=10, b=20))
# attributes should not be writable
- if not isinstance(self.thetype, type):
+ if not isinstance(self.partial, type):
return
self.assertRaises(AttributeError, setattr, p, 'func', map)
self.assertRaises(AttributeError, setattr, p, 'args', (1, 2))
self.assertRaises(AttributeError, setattr, p, 'keywords', dict(a=1, b=2))
- p = self.thetype(hex)
+ p = self.partial(hex)
try:
del p.__dict__
except TypeError:
@@ -60,9 +74,9 @@ class TestPartial(unittest.TestCase):
self.fail('partial object allowed __dict__ to be deleted')
def test_argument_checking(self):
- self.assertRaises(TypeError, self.thetype) # need at least a func arg
+ self.assertRaises(TypeError, self.partial) # need at least a func arg
try:
- self.thetype(2)()
+ self.partial(2)()
except TypeError:
pass
else:
@@ -73,7 +87,7 @@ class TestPartial(unittest.TestCase):
def func(a=10, b=20):
return a
d = {'a':3}
- p = self.thetype(func, a=5)
+ p = self.partial(func, a=5)
self.assertEqual(p(**d), 3)
self.assertEqual(d, {'a':3})
p(b=7)
@@ -82,20 +96,20 @@ class TestPartial(unittest.TestCase):
def test_arg_combinations(self):
# exercise special code paths for zero args in either partial
# object or the caller
- p = self.thetype(capture)
+ p = self.partial(capture)
self.assertEqual(p(), ((), {}))
self.assertEqual(p(1,2), ((1,2), {}))
- p = self.thetype(capture, 1, 2)
+ p = self.partial(capture, 1, 2)
self.assertEqual(p(), ((1,2), {}))
self.assertEqual(p(3,4), ((1,2,3,4), {}))
def test_kw_combinations(self):
# exercise special code paths for no keyword args in
# either the partial object or the caller
- p = self.thetype(capture)
+ p = self.partial(capture)
self.assertEqual(p(), ((), {}))
self.assertEqual(p(a=1), ((), {'a':1}))
- p = self.thetype(capture, a=1)
+ p = self.partial(capture, a=1)
self.assertEqual(p(), ((), {'a':1}))
self.assertEqual(p(b=2), ((), {'a':1, 'b':2}))
# keyword args in the call override those in the partial object
@@ -104,7 +118,7 @@ class TestPartial(unittest.TestCase):
def test_positional(self):
# make sure positional arguments are captured correctly
for args in [(), (0,), (0,1), (0,1,2), (0,1,2,3)]:
- p = self.thetype(capture, *args)
+ p = self.partial(capture, *args)
expected = args + ('x',)
got, empty = p('x')
self.assertTrue(expected == got and empty == {})
@@ -112,14 +126,14 @@ class TestPartial(unittest.TestCase):
def test_keyword(self):
# make sure keyword arguments are captured correctly
for a in ['a', 0, None, 3.5]:
- p = self.thetype(capture, a=a)
+ p = self.partial(capture, a=a)
expected = {'a':a,'x':None}
empty, got = p(x=None)
self.assertTrue(expected == got and empty == ())
def test_no_side_effects(self):
# make sure there are no side effects that affect subsequent calls
- p = self.thetype(capture, 0, a=1)
+ p = self.partial(capture, 0, a=1)
args1, kw1 = p(1, b=2)
self.assertTrue(args1 == (0,1) and kw1 == {'a':1,'b':2})
args2, kw2 = p()
@@ -128,13 +142,13 @@ class TestPartial(unittest.TestCase):
def test_error_propagation(self):
def f(x, y):
x / y
- self.assertRaises(ZeroDivisionError, self.thetype(f, 1, 0))
- self.assertRaises(ZeroDivisionError, self.thetype(f, 1), 0)
- self.assertRaises(ZeroDivisionError, self.thetype(f), 1, 0)
- self.assertRaises(ZeroDivisionError, self.thetype(f, y=0), 1)
+ self.assertRaises(ZeroDivisionError, self.partial(f, 1, 0))
+ self.assertRaises(ZeroDivisionError, self.partial(f, 1), 0)
+ self.assertRaises(ZeroDivisionError, self.partial(f), 1, 0)
+ self.assertRaises(ZeroDivisionError, self.partial(f, y=0), 1)
def test_weakref(self):
- f = self.thetype(int, base=16)
+ f = self.partial(int, base=16)
p = proxy(f)
self.assertEqual(f.func, p.func)
f = None
@@ -142,9 +156,9 @@ class TestPartial(unittest.TestCase):
def test_with_bound_and_unbound_methods(self):
data = list(map(str, range(10)))
- join = self.thetype(str.join, '')
+ join = self.partial(str.join, '')
self.assertEqual(join(data), '0123456789')
- join = self.thetype(''.join)
+ join = self.partial(''.join)
self.assertEqual(join(data), '0123456789')
def test_repr(self):
@@ -152,49 +166,57 @@ class TestPartial(unittest.TestCase):
args_repr = ', '.join(repr(a) for a in args)
kwargs = {'a': object(), 'b': object()}
kwargs_repr = ', '.join("%s=%r" % (k, v) for k, v in kwargs.items())
- if self.thetype is functools.partial:
+ if self.partial is functools.partial:
name = 'functools.partial'
else:
- name = self.thetype.__name__
+ name = self.partial.__name__
- f = self.thetype(capture)
+ f = self.partial(capture)
self.assertEqual('{}({!r})'.format(name, capture),
repr(f))
- f = self.thetype(capture, *args)
+ f = self.partial(capture, *args)
self.assertEqual('{}({!r}, {})'.format(name, capture, args_repr),
repr(f))
- f = self.thetype(capture, **kwargs)
+ f = self.partial(capture, **kwargs)
self.assertEqual('{}({!r}, {})'.format(name, capture, kwargs_repr),
repr(f))
- f = self.thetype(capture, *args, **kwargs)
+ f = self.partial(capture, *args, **kwargs)
self.assertEqual('{}({!r}, {}, {})'.format(name, capture, args_repr, kwargs_repr),
repr(f))
def test_pickle(self):
- f = self.thetype(signature, 'asdf', bar=True)
+ f = self.partial(signature, 'asdf', bar=True)
f.add_something_to__dict__ = True
f_copy = pickle.loads(pickle.dumps(f))
self.assertEqual(signature(f), signature(f_copy))
-class PartialSubclass(functools.partial):
+class TestPartialC(BaseTestC, TestPartial):
pass
-class TestPartialSubclass(TestPartial):
+class TestPartialPy(BaseTestPy, TestPartial):
- thetype = PartialSubclass
+ def test_pickle(self):
+ raise unittest.SkipTest("Python implementation of partial isn't picklable")
+
+ def test_repr(self):
+ raise unittest.SkipTest("Python implementation of partial uses own repr")
+
+class TestPartialCSubclass(BaseTestC, TestPartial):
+
+ class PartialSubclass(c_functools.partial):
+ pass
-class TestPythonPartial(TestPartial):
+ partial = staticmethod(PartialSubclass)
- thetype = PythonPartial
+class TestPartialPySubclass(TestPartialPy):
- # the python version hasn't a nice repr
- def test_repr(self): pass
+ class PartialSubclass(c_functools.partial):
+ pass
- # the python version isn't picklable
- def test_pickle(self): pass
+ partial = staticmethod(PartialSubclass)
class TestUpdateWrapper(unittest.TestCase):
@@ -320,7 +342,7 @@ class TestWraps(TestUpdateWrapper):
self.assertEqual(wrapper.__qualname__, f.__qualname__)
self.assertEqual(wrapper.attr, 'This is also a test')
- @unittest.skipIf(not sys.flags.optimize <= 1,
+ @unittest.skipIf(sys.flags.optimize >= 2,
"Docstrings are omitted with -O2 and above")
def test_default_update_doc(self):
wrapper, _ = self._default_update()
@@ -441,24 +463,28 @@ class TestReduce(unittest.TestCase):
d = {"one": 1, "two": 2, "three": 3}
self.assertEqual(self.func(add, d), "".join(d.keys()))
-class TestCmpToKey(unittest.TestCase):
+class TestCmpToKey(object):
def test_cmp_to_key(self):
def cmp1(x, y):
return (x > y) - (x < y)
- key = functools.cmp_to_key(cmp1)
+ key = self.cmp_to_key(cmp1)
self.assertEqual(key(3), key(3))
self.assertGreater(key(3), key(1))
+ self.assertGreaterEqual(key(3), key(3))
+
def cmp2(x, y):
return int(x) - int(y)
- key = functools.cmp_to_key(cmp2)
+ key = self.cmp_to_key(cmp2)
self.assertEqual(key(4.0), key('4'))
self.assertLess(key(2), key('35'))
+ self.assertLessEqual(key(2), key('35'))
+ self.assertNotEqual(key(2), key('35'))
def test_cmp_to_key_arguments(self):
def cmp1(x, y):
return (x > y) - (x < y)
- key = functools.cmp_to_key(mycmp=cmp1)
+ key = self.cmp_to_key(mycmp=cmp1)
self.assertEqual(key(obj=3), key(obj=3))
self.assertGreater(key(obj=3), key(obj=1))
with self.assertRaises((TypeError, AttributeError)):
@@ -466,10 +492,10 @@ class TestCmpToKey(unittest.TestCase):
with self.assertRaises((TypeError, AttributeError)):
1 < key(3) # lhs is not a K object
with self.assertRaises(TypeError):
- key = functools.cmp_to_key() # too few args
+ key = self.cmp_to_key() # too few args
with self.assertRaises(TypeError):
- key = functools.cmp_to_key(cmp1, None) # too many args
- key = functools.cmp_to_key(cmp1)
+ key = self.module.cmp_to_key(cmp1, None) # too many args
+ key = self.cmp_to_key(cmp1)
with self.assertRaises(TypeError):
key() # too few args
with self.assertRaises(TypeError):
@@ -478,7 +504,7 @@ class TestCmpToKey(unittest.TestCase):
def test_bad_cmp(self):
def cmp1(x, y):
raise ZeroDivisionError
- key = functools.cmp_to_key(cmp1)
+ key = self.cmp_to_key(cmp1)
with self.assertRaises(ZeroDivisionError):
key(3) > key(1)
@@ -493,13 +519,13 @@ class TestCmpToKey(unittest.TestCase):
def test_obj_field(self):
def cmp1(x, y):
return (x > y) - (x < y)
- key = functools.cmp_to_key(mycmp=cmp1)
+ key = self.cmp_to_key(mycmp=cmp1)
self.assertEqual(key(50).obj, 50)
def test_sort_int(self):
def mycmp(x, y):
return y - x
- self.assertEqual(sorted(range(5), key=functools.cmp_to_key(mycmp)),
+ self.assertEqual(sorted(range(5), key=self.cmp_to_key(mycmp)),
[4, 3, 2, 1, 0])
def test_sort_int_str(self):
@@ -507,18 +533,24 @@ class TestCmpToKey(unittest.TestCase):
x, y = int(x), int(y)
return (x > y) - (x < y)
values = [5, '3', 7, 2, '0', '1', 4, '10', 1]
- values = sorted(values, key=functools.cmp_to_key(mycmp))
+ values = sorted(values, key=self.cmp_to_key(mycmp))
self.assertEqual([int(value) for value in values],
[0, 1, 1, 2, 3, 4, 5, 7, 10])
def test_hash(self):
def mycmp(x, y):
return y - x
- key = functools.cmp_to_key(mycmp)
+ key = self.cmp_to_key(mycmp)
k = key(10)
self.assertRaises(TypeError, hash, k)
self.assertNotIsInstance(k, collections.Hashable)
+class TestCmpToKeyC(BaseTestC, TestCmpToKey):
+ cmp_to_key = c_functools.cmp_to_key
+
+class TestCmpToKeyPy(BaseTestPy, TestCmpToKey):
+ cmp_to_key = staticmethod(py_functools.cmp_to_key)
+
class TestTotalOrdering(unittest.TestCase):
def test_total_ordering_lt(self):
@@ -623,7 +655,7 @@ class TestLRU(unittest.TestCase):
def test_lru(self):
def orig(x, y):
- return 3*x+y
+ return 3 * x + y
f = functools.lru_cache(maxsize=20)(orig)
hits, misses, maxsize, currsize = f.cache_info()
self.assertEqual(maxsize, 20)
@@ -728,7 +760,7 @@ class TestLRU(unittest.TestCase):
# Verify that user_function exceptions get passed through without
# creating a hard-to-read chained exception.
# http://bugs.python.org/issue13177
- for maxsize in (None, 100):
+ for maxsize in (None, 128):
@functools.lru_cache(maxsize)
def func(i):
return 'abc'[i]
@@ -741,7 +773,7 @@ class TestLRU(unittest.TestCase):
func(15)
def test_lru_with_types(self):
- for maxsize in (None, 100):
+ for maxsize in (None, 128):
@functools.lru_cache(maxsize=maxsize, typed=True)
def square(x):
return x * x
@@ -756,14 +788,46 @@ class TestLRU(unittest.TestCase):
self.assertEqual(square.cache_info().hits, 4)
self.assertEqual(square.cache_info().misses, 4)
+ def test_lru_with_keyword_args(self):
+ @functools.lru_cache()
+ def fib(n):
+ if n < 2:
+ return n
+ return fib(n=n-1) + fib(n=n-2)
+ self.assertEqual(
+ [fib(n=number) for number in range(16)],
+ [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610]
+ )
+ self.assertEqual(fib.cache_info(),
+ functools._CacheInfo(hits=28, misses=16, maxsize=128, currsize=16))
+ fib.cache_clear()
+ self.assertEqual(fib.cache_info(),
+ functools._CacheInfo(hits=0, misses=0, maxsize=128, currsize=0))
+
+ def test_lru_with_keyword_args_maxsize_none(self):
+ @functools.lru_cache(maxsize=None)
+ def fib(n):
+ if n < 2:
+ return n
+ return fib(n=n-1) + fib(n=n-2)
+ self.assertEqual([fib(n=number) for number in range(16)],
+ [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610])
+ self.assertEqual(fib.cache_info(),
+ functools._CacheInfo(hits=28, misses=16, maxsize=None, currsize=16))
+ fib.cache_clear()
+ self.assertEqual(fib.cache_info(),
+ functools._CacheInfo(hits=0, misses=0, maxsize=None, currsize=0))
+
def test_main(verbose=None):
test_classes = (
- TestPartial,
- TestPartialSubclass,
- TestPythonPartial,
+ TestPartialC,
+ TestPartialPy,
+ TestPartialCSubclass,
+ TestPartialPySubclass,
TestUpdateWrapper,
TestTotalOrdering,
- TestCmpToKey,
+ TestCmpToKeyC,
+ TestCmpToKeyPy,
TestWraps,
TestReduce,
TestLRU,
diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py
index c59b72e..85dbc97 100644
--- a/Lib/test/test_gc.py
+++ b/Lib/test/test_gc.py
@@ -610,6 +610,32 @@ class GCTests(unittest.TestCase):
stderr = run_command(code % "gc.DEBUG_SAVEALL")
self.assertNotIn(b"uncollectable objects at shutdown", stderr)
+ def test_get_stats(self):
+ stats = gc.get_stats()
+ self.assertEqual(len(stats), 3)
+ for st in stats:
+ self.assertIsInstance(st, dict)
+ self.assertEqual(set(st),
+ {"collected", "collections", "uncollectable"})
+ self.assertGreaterEqual(st["collected"], 0)
+ self.assertGreaterEqual(st["collections"], 0)
+ self.assertGreaterEqual(st["uncollectable"], 0)
+ # Check that collection counts are incremented correctly
+ if gc.isenabled():
+ self.addCleanup(gc.enable)
+ gc.disable()
+ old = gc.get_stats()
+ gc.collect(0)
+ new = gc.get_stats()
+ self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
+ self.assertEqual(new[1]["collections"], old[1]["collections"])
+ self.assertEqual(new[2]["collections"], old[2]["collections"])
+ gc.collect(2)
+ new = gc.get_stats()
+ self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
+ self.assertEqual(new[1]["collections"], old[1]["collections"])
+ self.assertEqual(new[2]["collections"], old[2]["collections"] + 1)
+
class GCCallbackTests(unittest.TestCase):
def setUp(self):
diff --git a/Lib/test/test_generators.py b/Lib/test/test_generators.py
index 06f67c2..f5c1a7d 100644
--- a/Lib/test/test_generators.py
+++ b/Lib/test/test_generators.py
@@ -1728,9 +1728,7 @@ Our ill-behaved code should be invoked during GC:
>>> g = f()
>>> next(g)
>>> del g
->>> sys.stderr.getvalue().startswith(
-... "Exception RuntimeError: 'generator ignored GeneratorExit' in "
-... )
+>>> "RuntimeError: generator ignored GeneratorExit" in sys.stderr.getvalue()
True
>>> sys.stderr = old
@@ -1840,22 +1838,23 @@ to test.
... sys.stderr = io.StringIO()
... class Leaker:
... def __del__(self):
-... raise RuntimeError
+... def invoke(message):
+... raise RuntimeError(message)
+... invoke("test")
...
... l = Leaker()
... del l
... err = sys.stderr.getvalue().strip()
-... err.startswith(
-... "Exception RuntimeError: RuntimeError() in <"
-... )
-... err.endswith("> ignored")
-... len(err.splitlines())
+... "Exception ignored in" in err
+... "RuntimeError: test" in err
+... "Traceback" in err
+... "in invoke" in err
... finally:
... sys.stderr = old
True
True
-1
-
+True
+True
These refleak tests should perhaps be in a testfile of their own,
diff --git a/Lib/test/test_genericpath.py b/Lib/test/test_genericpath.py
index 3eadd58..084dbb2 100644
--- a/Lib/test/test_genericpath.py
+++ b/Lib/test/test_genericpath.py
@@ -308,19 +308,19 @@ class CommonTest(GenericTest):
for path in ('', 'fuu', 'f\xf9\xf9', '/fuu', 'U:\\'):
self.assertIsInstance(abspath(path), str)
- @unittest.skipIf(sys.platform == 'darwin',
- "Mac OS X denies the creation of a directory with an invalid utf8 name")
def test_nonascii_abspath(self):
- name = b'\xe7w\xf0'
- if sys.platform == 'win32':
- try:
- os.fsdecode(name)
- except UnicodeDecodeError:
- self.skipTest("the filename %a is not decodable "
- "from the ANSI code page %s"
- % (name, sys.getfilesystemencoding()))
-
- # Test non-ASCII, non-UTF8 bytes in the path.
+ if (support.TESTFN_UNDECODABLE
+ # Mac OS X denies the creation of a directory with an invalid
+ # UTF-8 name. Windows allows to create a directory with an
+ # arbitrary bytes name, but fails to enter this directory
+ # (when the bytes name is used).
+ and sys.platform not in ('win32', 'darwin')):
+ name = support.TESTFN_UNDECODABLE
+ elif support.TESTFN_NONASCII:
+ name = support.TESTFN_NONASCII
+ else:
+ self.skipTest("need support.TESTFN_NONASCII")
+
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
with support.temp_cwd(name):
diff --git a/Lib/test/test_hashlib.py b/Lib/test/test_hashlib.py
index 32f85e9..54201a1 100644
--- a/Lib/test/test_hashlib.py
+++ b/Lib/test/test_hashlib.py
@@ -36,7 +36,10 @@ def hexstr(s):
class HashLibTestCase(unittest.TestCase):
supported_hash_names = ( 'md5', 'MD5', 'sha1', 'SHA1',
'sha224', 'SHA224', 'sha256', 'SHA256',
- 'sha384', 'SHA384', 'sha512', 'SHA512' )
+ 'sha384', 'SHA384', 'sha512', 'SHA512',
+ 'sha3_224', 'sha3_256', 'sha3_384',
+ 'sha3_512', 'SHA3_224', 'SHA3_256',
+ 'SHA3_384', 'SHA3_512' )
# Issue #14693: fallback modules are always compiled under POSIX
_warn_on_extension_import = os.name == 'posix' or COMPILED_WITH_PYDEBUG
@@ -93,6 +96,12 @@ class HashLibTestCase(unittest.TestCase):
if _sha512:
self.constructors_to_test['sha384'].add(_sha512.sha384)
self.constructors_to_test['sha512'].add(_sha512.sha512)
+ _sha3 = self._conditional_import_module('_sha3')
+ if _sha3:
+ self.constructors_to_test['sha3_224'].add(_sha3.sha3_224)
+ self.constructors_to_test['sha3_256'].add(_sha3.sha3_256)
+ self.constructors_to_test['sha3_384'].add(_sha3.sha3_384)
+ self.constructors_to_test['sha3_512'].add(_sha3.sha3_512)
super(HashLibTestCase, self).__init__(*args, **kwargs)
@@ -158,6 +167,7 @@ class HashLibTestCase(unittest.TestCase):
self.assertEqual(m1.digest(), m2.digest())
def check(self, name, data, digest):
+ digest = digest.lower()
constructors = self.constructors_to_test[name]
# 2 is for hashlib.name(...) and hashlib.new(name, ...)
self.assertGreaterEqual(len(constructors), 2)
@@ -183,6 +193,10 @@ class HashLibTestCase(unittest.TestCase):
self.check_no_unicode('sha256')
self.check_no_unicode('sha384')
self.check_no_unicode('sha512')
+ self.check_no_unicode('sha3_224')
+ self.check_no_unicode('sha3_256')
+ self.check_no_unicode('sha3_384')
+ self.check_no_unicode('sha3_512')
def test_case_md5_0(self):
self.check('md5', b'', 'd41d8cd98f00b204e9800998ecf8427e')
@@ -318,11 +332,122 @@ class HashLibTestCase(unittest.TestCase):
"e718483d0ce769644e2e42c7bc15b4638e1f98b13b2044285632a803afa973eb"+
"de0ff244877ea60a4cb0432ce577c31beb009c5c2c49aa2e4eadb217ad8cc09b")
+ # SHA-3 family
+ def test_case_sha3_224_0(self):
+ self.check('sha3_224', b"",
+ "F71837502BA8E10837BDD8D365ADB85591895602FC552B48B7390ABD")
+
+ def test_case_sha3_224_1(self):
+ self.check('sha3_224', bytes.fromhex("CC"),
+ "A9CAB59EB40A10B246290F2D6086E32E3689FAF1D26B470C899F2802")
+
+ def test_case_sha3_224_2(self):
+ self.check('sha3_224', bytes.fromhex("41FB"),
+ "615BA367AFDC35AAC397BC7EB5D58D106A734B24986D5D978FEFD62C")
+
+ def test_case_sha3_224_3(self):
+ self.check('sha3_224', bytes.fromhex(
+ "433C5303131624C0021D868A30825475E8D0BD3052A022180398F4CA4423B9"+
+ "8214B6BEAAC21C8807A2C33F8C93BD42B092CC1B06CEDF3224D5ED1EC29784"+
+ "444F22E08A55AA58542B524B02CD3D5D5F6907AFE71C5D7462224A3F9D9E53"+
+ "E7E0846DCBB4CE"),
+ "62B10F1B6236EBC2DA72957742A8D4E48E213B5F8934604BFD4D2C3A")
+
+ @bigmemtest(size=_4G + 5, memuse=1)
+ def test_case_sha3_224_huge(self, size):
+ if size == _4G + 5:
+ try:
+ self.check('sha3_224', b'A'*size,
+ '58ef60057c9dddb6a87477e9ace5a26f0d9db01881cf9b10a9f8c224')
+ except OverflowError:
+ pass # 32-bit arch
+
+
+ def test_case_sha3_256_0(self):
+ self.check('sha3_256', b"",
+ "C5D2460186F7233C927E7DB2DCC703C0E500B653CA82273B7BFAD8045D85A470")
+
+ def test_case_sha3_256_1(self):
+ self.check('sha3_256', bytes.fromhex("CC"),
+ "EEAD6DBFC7340A56CAEDC044696A168870549A6A7F6F56961E84A54BD9970B8A")
+
+ def test_case_sha3_256_2(self):
+ self.check('sha3_256', bytes.fromhex("41FB"),
+ "A8EACEDA4D47B3281A795AD9E1EA2122B407BAF9AABCB9E18B5717B7873537D2")
+
+ def test_case_sha3_256_3(self):
+ self.check('sha3_256', bytes.fromhex(
+ "433C5303131624C0021D868A30825475E8D0BD3052A022180398F4CA4423B9"+
+ "8214B6BEAAC21C8807A2C33F8C93BD42B092CC1B06CEDF3224D5ED1EC29784"+
+ "444F22E08A55AA58542B524B02CD3D5D5F6907AFE71C5D7462224A3F9D9E53"+
+ "E7E0846DCBB4CE"),
+ "CE87A5173BFFD92399221658F801D45C294D9006EE9F3F9D419C8D427748DC41")
+
+
+ def test_case_sha3_384_0(self):
+ self.check('sha3_384', b"",
+ "2C23146A63A29ACF99E73B88F8C24EAA7DC60AA771780CCC006AFBFA8FE2479B"+
+ "2DD2B21362337441AC12B515911957FF")
+
+ def test_case_sha3_384_1(self):
+ self.check('sha3_384', bytes.fromhex("CC"),
+ "1B84E62A46E5A201861754AF5DC95C4A1A69CAF4A796AE405680161E29572641"+
+ "F5FA1E8641D7958336EE7B11C58F73E9")
+
+ def test_case_sha3_384_2(self):
+ self.check('sha3_384', bytes.fromhex("41FB"),
+ "495CCE2714CD72C8C53C3363D22C58B55960FE26BE0BF3BBC7A3316DD563AD1D"+
+ "B8410E75EEFEA655E39D4670EC0B1792")
+
+ def test_case_sha3_384_3(self):
+ self.check('sha3_384', bytes.fromhex(
+ "433C5303131624C0021D868A30825475E8D0BD3052A022180398F4CA4423B9"+
+ "8214B6BEAAC21C8807A2C33F8C93BD42B092CC1B06CEDF3224D5ED1EC29784"+
+ "444F22E08A55AA58542B524B02CD3D5D5F6907AFE71C5D7462224A3F9D9E53"+
+ "E7E0846DCBB4CE"),
+ "135114508DD63E279E709C26F7817C0482766CDE49132E3EDF2EEDD8996F4E35"+
+ "96D184100B384868249F1D8B8FDAA2C9")
+
+
+ def test_case_sha3_512_0(self):
+ self.check('sha3_512', b"",
+ "0EAB42DE4C3CEB9235FC91ACFFE746B29C29A8C366B7C60E4E67C466F36A4304"+
+ "C00FA9CAF9D87976BA469BCBE06713B435F091EF2769FB160CDAB33D3670680E")
+
+ def test_case_sha3_512_1(self):
+ self.check('sha3_512', bytes.fromhex("CC"),
+ "8630C13CBD066EA74BBE7FE468FEC1DEE10EDC1254FB4C1B7C5FD69B646E4416"+
+ "0B8CE01D05A0908CA790DFB080F4B513BC3B6225ECE7A810371441A5AC666EB9")
+
+ def test_case_sha3_512_2(self):
+ self.check('sha3_512', bytes.fromhex("41FB"),
+ "551DA6236F8B96FCE9F97F1190E901324F0B45E06DBBB5CDB8355D6ED1DC34B3"+
+ "F0EAE7DCB68622FF232FA3CECE0D4616CDEB3931F93803662A28DF1CD535B731")
+
+ def test_case_sha3_512_3(self):
+ self.check('sha3_512', bytes.fromhex(
+ "433C5303131624C0021D868A30825475E8D0BD3052A022180398F4CA4423B9"+
+ "8214B6BEAAC21C8807A2C33F8C93BD42B092CC1B06CEDF3224D5ED1EC29784"+
+ "444F22E08A55AA58542B524B02CD3D5D5F6907AFE71C5D7462224A3F9D9E53"+
+ "E7E0846DCBB4CE"),
+ "527D28E341E6B14F4684ADB4B824C496C6482E51149565D3D17226828884306B"+
+ "51D6148A72622C2B75F5D3510B799D8BDC03EAEDE453676A6EC8FE03A1AD0EAB")
+
+
def test_gil(self):
# Check things work fine with an input larger than the size required
# for multithreaded operation (which is hardwired to 2048).
gil_minsize = 2048
+ for name in self.supported_hash_names:
+ m = hashlib.new(name)
+ m.update(b'1')
+ m.update(b'#' * gil_minsize)
+ m.update(b'1')
+
+ m = hashlib.new(name, b'x' * gil_minsize)
+ m.update(b'1')
+
m = hashlib.md5()
m.update(b'1')
m.update(b'#' * gil_minsize)
diff --git a/Lib/test/test_httpservers.py b/Lib/test/test_httpservers.py
index 75133c9..c5db620 100644
--- a/Lib/test/test_httpservers.py
+++ b/Lib/test/test_httpservers.py
@@ -92,6 +92,9 @@ class BaseHTTPServerTestCase(BaseTestCase):
def do_KEYERROR(self):
self.send_error(999)
+ def do_NOTFOUND(self):
+ self.send_error(404)
+
def do_CUSTOM(self):
self.send_response(999)
self.send_header('Content-Type', 'text/html')
@@ -211,6 +214,14 @@ class BaseHTTPServerTestCase(BaseTestCase):
self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind')
self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8'))
+ def test_error_content_length(self):
+ # Issue #16088: standard error responses should have a content-length
+ self.con.request('NOTFOUND', '/')
+ res = self.con.getresponse()
+ self.assertEqual(res.status, 404)
+ data = res.read()
+ self.assertEqual(int(res.getheader('Content-Length')), len(data))
+
class SimpleHTTPServerTestCase(BaseTestCase):
class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
diff --git a/Lib/test/test_imp.py b/Lib/test/test_imp.py
index 65c9f25..52c4399 100644
--- a/Lib/test/test_imp.py
+++ b/Lib/test/test_imp.py
@@ -217,6 +217,20 @@ class ImportTests(unittest.TestCase):
mod = imp.load_module(example, *x)
self.assertEqual(mod.__name__, example)
+ def test_issue16421_multiple_modules_in_one_dll(self):
+ # Issue 16421: loading several modules from the same compiled file fails
+ m = '_testimportmultiple'
+ fileobj, pathname, description = imp.find_module(m)
+ fileobj.close()
+ mod0 = imp.load_dynamic(m, pathname)
+ mod1 = imp.load_dynamic('_testimportmultiple_foo', pathname)
+ mod2 = imp.load_dynamic('_testimportmultiple_bar', pathname)
+ self.assertEqual(mod0.__name__, m)
+ self.assertEqual(mod1.__name__, '_testimportmultiple_foo')
+ self.assertEqual(mod2.__name__, '_testimportmultiple_bar')
+ with self.assertRaises(ImportError):
+ imp.load_dynamic('nonexistent', pathname)
+
def test_load_dynamic_ImportError_path(self):
# Issue #1559549 added `name` and `path` attributes to ImportError
# in order to provide better detail. Issue #10854 implemented those
diff --git a/Lib/test/test_importlib/import_/test_fromlist.py b/Lib/test/test_importlib/import_/test_fromlist.py
index 27b2270..c16c337 100644
--- a/Lib/test/test_importlib/import_/test_fromlist.py
+++ b/Lib/test/test_importlib/import_/test_fromlist.py
@@ -80,7 +80,7 @@ class HandlingFromlist(unittest.TestCase):
with util.import_state(meta_path=[importer]):
with self.assertRaises(ImportError) as exc:
import_util.import_('pkg', fromlist=['mod'])
- self.assertEquals('i_do_not_exist', exc.exception.name)
+ self.assertEqual('i_do_not_exist', exc.exception.name)
def test_empty_string(self):
with util.mock_modules('pkg.__init__', 'pkg.mod') as importer:
diff --git a/Lib/test/test_importlib/source/test_abc_loader.py b/Lib/test/test_importlib/source/test_abc_loader.py
index 0d912b6..cce7b07 100644
--- a/Lib/test/test_importlib/source/test_abc_loader.py
+++ b/Lib/test/test_importlib/source/test_abc_loader.py
@@ -70,483 +70,9 @@ class SourceLoaderMock(SourceOnlyLoaderMock):
return path == self.bytecode_path
-class PyLoaderMock(abc.PyLoader):
-
- # Globals that should be defined for all modules.
- source = (b"_ = '::'.join([__name__, __file__, __package__, "
- b"repr(__loader__)])")
-
- def __init__(self, data):
- """Take a dict of 'module_name: path' pairings.
-
- Paths should have no file extension, allowing packages to be denoted by
- ending in '__init__'.
-
- """
- self.module_paths = data
- self.path_to_module = {val:key for key,val in data.items()}
-
- def get_data(self, path):
- if path not in self.path_to_module:
- raise IOError
- return self.source
-
- def is_package(self, name):
- filename = os.path.basename(self.get_filename(name))
- return os.path.splitext(filename)[0] == '__init__'
-
- def source_path(self, name):
- try:
- return self.module_paths[name]
- except KeyError:
- raise ImportError
-
- def get_filename(self, name):
- """Silence deprecation warning."""
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter("always")
- path = super().get_filename(name)
- assert len(w) == 1
- assert issubclass(w[0].category, DeprecationWarning)
- return path
-
- def module_repr(self):
- return '<module>'
-
-
-class PyLoaderCompatMock(PyLoaderMock):
-
- """Mock that matches what is suggested to have a loader that is compatible
- from Python 3.1 onwards."""
-
- def get_filename(self, fullname):
- try:
- return self.module_paths[fullname]
- except KeyError:
- raise ImportError
-
- def source_path(self, fullname):
- try:
- return self.get_filename(fullname)
- except ImportError:
- return None
-
-
-class PyPycLoaderMock(abc.PyPycLoader, PyLoaderMock):
-
- default_mtime = 1
-
- def __init__(self, source, bc={}):
- """Initialize mock.
-
- 'bc' is a dict keyed on a module's name. The value is dict with
- possible keys of 'path', 'mtime', 'magic', and 'bc'. Except for 'path',
- each of those keys control if any part of created bytecode is to
- deviate from default values.
-
- """
- super().__init__(source)
- self.module_bytecode = {}
- self.path_to_bytecode = {}
- self.bytecode_to_path = {}
- for name, data in bc.items():
- self.path_to_bytecode[data['path']] = name
- self.bytecode_to_path[name] = data['path']
- magic = data.get('magic', imp.get_magic())
- mtime = importlib._w_long(data.get('mtime', self.default_mtime))
- source_size = importlib._w_long(len(self.source) & 0xFFFFFFFF)
- if 'bc' in data:
- bc = data['bc']
- else:
- bc = self.compile_bc(name)
- self.module_bytecode[name] = magic + mtime + source_size + bc
-
- def compile_bc(self, name):
- source_path = self.module_paths.get(name, '<test>') or '<test>'
- code = compile(self.source, source_path, 'exec')
- return marshal.dumps(code)
-
- def source_mtime(self, name):
- if name in self.module_paths:
- return self.default_mtime
- elif name in self.module_bytecode:
- return None
- else:
- raise ImportError
-
- def bytecode_path(self, name):
- try:
- return self.bytecode_to_path[name]
- except KeyError:
- if name in self.module_paths:
- return None
- else:
- raise ImportError
-
- def write_bytecode(self, name, bytecode):
- self.module_bytecode[name] = bytecode
- return True
-
- def get_data(self, path):
- if path in self.path_to_module:
- return super().get_data(path)
- elif path in self.path_to_bytecode:
- name = self.path_to_bytecode[path]
- return self.module_bytecode[name]
- else:
- raise IOError
-
- def is_package(self, name):
- try:
- return super().is_package(name)
- except TypeError:
- return '__init__' in self.bytecode_to_path[name]
-
- def get_code(self, name):
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter("always")
- code_object = super().get_code(name)
- assert len(w) == 1
- assert issubclass(w[0].category, DeprecationWarning)
- return code_object
-
-class PyLoaderTests(testing_abc.LoaderTests):
-
- """Tests for importlib.abc.PyLoader."""
-
- mocker = PyLoaderMock
-
- def eq_attrs(self, ob, **kwargs):
- for attr, val in kwargs.items():
- found = getattr(ob, attr)
- self.assertEqual(found, val,
- "{} attribute: {} != {}".format(attr, found, val))
-
- def test_module(self):
- name = '<module>'
- path = os.path.join('', 'path', 'to', 'module')
- mock = self.mocker({name: path})
- with util.uncache(name):
- module = mock.load_module(name)
- self.assertIn(name, sys.modules)
- self.eq_attrs(module, __name__=name, __file__=path, __package__='',
- __loader__=mock)
- self.assertTrue(not hasattr(module, '__path__'))
- return mock, name
-
- def test_package(self):
- name = '<pkg>'
- path = os.path.join('path', 'to', name, '__init__')
- mock = self.mocker({name: path})
- with util.uncache(name):
- module = mock.load_module(name)
- self.assertIn(name, sys.modules)
- self.eq_attrs(module, __name__=name, __file__=path,
- __path__=[os.path.dirname(path)], __package__=name,
- __loader__=mock)
- return mock, name
-
- def test_lacking_parent(self):
- name = 'pkg.mod'
- path = os.path.join('path', 'to', 'pkg', 'mod')
- mock = self.mocker({name: path})
- with util.uncache(name):
- module = mock.load_module(name)
- self.assertIn(name, sys.modules)
- self.eq_attrs(module, __name__=name, __file__=path, __package__='pkg',
- __loader__=mock)
- self.assertFalse(hasattr(module, '__path__'))
- return mock, name
-
- def test_module_reuse(self):
- name = 'mod'
- path = os.path.join('path', 'to', 'mod')
- module = imp.new_module(name)
- mock = self.mocker({name: path})
- with util.uncache(name):
- sys.modules[name] = module
- loaded_module = mock.load_module(name)
- self.assertIs(loaded_module, module)
- self.assertIs(sys.modules[name], module)
- return mock, name
-
- def test_state_after_failure(self):
- name = "mod"
- module = imp.new_module(name)
- module.blah = None
- mock = self.mocker({name: os.path.join('path', 'to', 'mod')})
- mock.source = b"1/0"
- with util.uncache(name):
- sys.modules[name] = module
- with self.assertRaises(ZeroDivisionError):
- mock.load_module(name)
- self.assertIs(sys.modules[name], module)
- self.assertTrue(hasattr(module, 'blah'))
- return mock
-
- def test_unloadable(self):
- name = "mod"
- mock = self.mocker({name: os.path.join('path', 'to', 'mod')})
- mock.source = b"1/0"
- with util.uncache(name):
- with self.assertRaises(ZeroDivisionError):
- mock.load_module(name)
- self.assertNotIn(name, sys.modules)
- return mock
-
-
-class PyLoaderCompatTests(PyLoaderTests):
-
- """Test that the suggested code to make a loader that is compatible from
- Python 3.1 forward works."""
-
- mocker = PyLoaderCompatMock
-
-
-class PyLoaderInterfaceTests(unittest.TestCase):
-
- """Tests for importlib.abc.PyLoader to make sure that when source_path()
- doesn't return a path everything works as expected."""
-
- def test_no_source_path(self):
- # No source path should lead to ImportError.
- name = 'mod'
- mock = PyLoaderMock({})
- with util.uncache(name), self.assertRaises(ImportError):
- mock.load_module(name)
-
- def test_source_path_is_None(self):
- name = 'mod'
- mock = PyLoaderMock({name: None})
- with util.uncache(name), self.assertRaises(ImportError):
- mock.load_module(name)
-
- def test_get_filename_with_source_path(self):
- # get_filename() should return what source_path() returns.
- name = 'mod'
- path = os.path.join('path', 'to', 'source')
- mock = PyLoaderMock({name: path})
- with util.uncache(name):
- self.assertEqual(mock.get_filename(name), path)
-
- def test_get_filename_no_source_path(self):
- # get_filename() should raise ImportError if source_path returns None.
- name = 'mod'
- mock = PyLoaderMock({name: None})
- with util.uncache(name), self.assertRaises(ImportError):
- mock.get_filename(name)
-
-
-class PyPycLoaderTests(PyLoaderTests):
-
- """Tests for importlib.abc.PyPycLoader."""
-
- mocker = PyPycLoaderMock
-
- @source_util.writes_bytecode_files
- def verify_bytecode(self, mock, name):
- assert name in mock.module_paths
- self.assertIn(name, mock.module_bytecode)
- magic = mock.module_bytecode[name][:4]
- self.assertEqual(magic, imp.get_magic())
- mtime = importlib._r_long(mock.module_bytecode[name][4:8])
- self.assertEqual(mtime, 1)
- source_size = mock.module_bytecode[name][8:12]
- self.assertEqual(len(mock.source) & 0xFFFFFFFF,
- importlib._r_long(source_size))
- bc = mock.module_bytecode[name][12:]
- self.assertEqual(bc, mock.compile_bc(name))
-
- def test_module(self):
- mock, name = super().test_module()
- self.verify_bytecode(mock, name)
-
- def test_package(self):
- mock, name = super().test_package()
- self.verify_bytecode(mock, name)
-
- def test_lacking_parent(self):
- mock, name = super().test_lacking_parent()
- self.verify_bytecode(mock, name)
-
- def test_module_reuse(self):
- mock, name = super().test_module_reuse()
- self.verify_bytecode(mock, name)
-
- def test_state_after_failure(self):
- super().test_state_after_failure()
-
- def test_unloadable(self):
- super().test_unloadable()
-
-
-class PyPycLoaderInterfaceTests(unittest.TestCase):
-
- """Test for the interface of importlib.abc.PyPycLoader."""
-
- def get_filename_check(self, src_path, bc_path, expect):
- name = 'mod'
- mock = PyPycLoaderMock({name: src_path}, {name: {'path': bc_path}})
- with util.uncache(name):
- assert mock.source_path(name) == src_path
- assert mock.bytecode_path(name) == bc_path
- self.assertEqual(mock.get_filename(name), expect)
-
- def test_filename_with_source_bc(self):
- # When source and bytecode paths present, return the source path.
- self.get_filename_check('source_path', 'bc_path', 'source_path')
-
- def test_filename_with_source_no_bc(self):
- # With source but no bc, return source path.
- self.get_filename_check('source_path', None, 'source_path')
-
- def test_filename_with_no_source_bc(self):
- # With not source but bc, return the bc path.
- self.get_filename_check(None, 'bc_path', 'bc_path')
-
- def test_filename_with_no_source_or_bc(self):
- # With no source or bc, raise ImportError.
- name = 'mod'
- mock = PyPycLoaderMock({name: None}, {name: {'path': None}})
- with util.uncache(name), self.assertRaises(ImportError):
- mock.get_filename(name)
-
-
-class SkipWritingBytecodeTests(unittest.TestCase):
-
- """Test that bytecode is properly handled based on
- sys.dont_write_bytecode."""
-
- @source_util.writes_bytecode_files
- def run_test(self, dont_write_bytecode):
- name = 'mod'
- mock = PyPycLoaderMock({name: os.path.join('path', 'to', 'mod')})
- sys.dont_write_bytecode = dont_write_bytecode
- with util.uncache(name):
- mock.load_module(name)
- self.assertIsNot(name in mock.module_bytecode, dont_write_bytecode)
-
- def test_no_bytecode_written(self):
- self.run_test(True)
-
- def test_bytecode_written(self):
- self.run_test(False)
-
-
-class RegeneratedBytecodeTests(unittest.TestCase):
-
- """Test that bytecode is regenerated as expected."""
-
- @source_util.writes_bytecode_files
- def test_different_magic(self):
- # A different magic number should lead to new bytecode.
- name = 'mod'
- bad_magic = b'\x00\x00\x00\x00'
- assert bad_magic != imp.get_magic()
- mock = PyPycLoaderMock({name: os.path.join('path', 'to', 'mod')},
- {name: {'path': os.path.join('path', 'to',
- 'mod.bytecode'),
- 'magic': bad_magic}})
- with util.uncache(name):
- mock.load_module(name)
- self.assertIn(name, mock.module_bytecode)
- magic = mock.module_bytecode[name][:4]
- self.assertEqual(magic, imp.get_magic())
-
- @source_util.writes_bytecode_files
- def test_old_mtime(self):
- # Bytecode with an older mtime should be regenerated.
- name = 'mod'
- old_mtime = PyPycLoaderMock.default_mtime - 1
- mock = PyPycLoaderMock({name: os.path.join('path', 'to', 'mod')},
- {name: {'path': 'path/to/mod.bytecode', 'mtime': old_mtime}})
- with util.uncache(name):
- mock.load_module(name)
- self.assertIn(name, mock.module_bytecode)
- mtime = importlib._r_long(mock.module_bytecode[name][4:8])
- self.assertEqual(mtime, PyPycLoaderMock.default_mtime)
-
-
-class BadBytecodeFailureTests(unittest.TestCase):
-
- """Test import failures when there is no source and parts of the bytecode
- is bad."""
-
- def test_bad_magic(self):
- # A bad magic number should lead to an ImportError.
- name = 'mod'
- bad_magic = b'\x00\x00\x00\x00'
- bc = {name:
- {'path': os.path.join('path', 'to', 'mod'),
- 'magic': bad_magic}}
- mock = PyPycLoaderMock({name: None}, bc)
- with util.uncache(name), self.assertRaises(ImportError) as cm:
- mock.load_module(name)
- self.assertEqual(cm.exception.name, name)
-
- def test_no_bytecode(self):
- # Missing code object bytecode should lead to an EOFError.
- name = 'mod'
- bc = {name: {'path': os.path.join('path', 'to', 'mod'), 'bc': b''}}
- mock = PyPycLoaderMock({name: None}, bc)
- with util.uncache(name), self.assertRaises(EOFError):
- mock.load_module(name)
-
- def test_bad_bytecode(self):
- # Malformed code object bytecode should lead to a ValueError.
- name = 'mod'
- bc = {name: {'path': os.path.join('path', 'to', 'mod'), 'bc': b'1234'}}
- mock = PyPycLoaderMock({name: None}, bc)
- with util.uncache(name), self.assertRaises(ValueError):
- mock.load_module(name)
-
-
def raise_ImportError(*args, **kwargs):
raise ImportError
-class MissingPathsTests(unittest.TestCase):
-
- """Test what happens when a source or bytecode path does not exist (either
- from *_path returning None or raising ImportError)."""
-
- def test_source_path_None(self):
- # Bytecode should be used when source_path returns None, along with
- # __file__ being set to the bytecode path.
- name = 'mod'
- bytecode_path = 'path/to/mod'
- mock = PyPycLoaderMock({name: None}, {name: {'path': bytecode_path}})
- with util.uncache(name):
- module = mock.load_module(name)
- self.assertEqual(module.__file__, bytecode_path)
-
- # Testing for bytecode_path returning None handled by all tests where no
- # bytecode initially exists.
-
- def test_all_paths_None(self):
- # If all *_path methods return None, raise ImportError.
- name = 'mod'
- mock = PyPycLoaderMock({name: None})
- with util.uncache(name), self.assertRaises(ImportError) as cm:
- mock.load_module(name)
- self.assertEqual(cm.exception.name, name)
-
- def test_source_path_ImportError(self):
- # An ImportError from source_path should trigger an ImportError.
- name = 'mod'
- mock = PyPycLoaderMock({}, {name: {'path': os.path.join('path', 'to',
- 'mod')}})
- with util.uncache(name), self.assertRaises(ImportError):
- mock.load_module(name)
-
- def test_bytecode_path_ImportError(self):
- # An ImportError from bytecode_path should trigger an ImportError.
- name = 'mod'
- mock = PyPycLoaderMock({name: os.path.join('path', 'to', 'mod')})
- bad_meth = types.MethodType(raise_ImportError, mock)
- mock.bytecode_path = bad_meth
- with util.uncache(name), self.assertRaises(ImportError) as cm:
- mock.load_module(name)
-
class SourceLoaderTestHarness(unittest.TestCase):
@@ -622,6 +148,11 @@ class SourceOnlyLoaderTests(SourceLoaderTestHarness):
code_object = self.loader.get_code(self.name)
self.verify_code(code_object)
+ def test_source_to_code(self):
+ # Verify the compiled code object.
+ code = self.loader.source_to_code(self.loader.source, self.path)
+ self.verify_code(code)
+
def test_load_module(self):
# Loading a module should set __name__, __loader__, __package__,
# __path__ (for packages), __file__, and __cached__.
@@ -801,6 +332,7 @@ class AbstractMethodImplTests(unittest.TestCase):
class Loader(abc.Loader):
def load_module(self, fullname):
super().load_module(fullname)
+
def module_repr(self, module):
super().module_repr(module)
@@ -825,20 +357,6 @@ class AbstractMethodImplTests(unittest.TestCase):
class SourceLoader(ResourceLoader, ExecutionLoader, abc.SourceLoader):
pass
- class PyLoader(ResourceLoader, InspectLoader, abc.PyLoader):
- def source_path(self, _):
- super().source_path(_)
-
- class PyPycLoader(PyLoader, abc.PyPycLoader):
- def bytecode_path(self, _):
- super().bytecode_path(_)
-
- def source_mtime(self, _):
- super().source_mtime(_)
-
- def write_bytecode(self, _, _2):
- super().write_bytecode(_, _2)
-
def raises_NotImplementedError(self, ins, *args):
for method_name in args:
method = getattr(ins, method_name)
@@ -877,29 +395,15 @@ class AbstractMethodImplTests(unittest.TestCase):
# Required abstractmethods.
self.raises_NotImplementedError(ins, 'get_filename', 'get_data')
# Optional abstractmethods.
- self.raises_NotImplementedError(ins,'path_stats', 'set_data')
-
- def test_PyLoader(self):
- self.raises_NotImplementedError(self.PyLoader(), 'source_path',
- 'get_data', 'is_package')
-
- def test_PyPycLoader(self):
- self.raises_NotImplementedError(self.PyPycLoader(), 'source_path',
- 'source_mtime', 'bytecode_path',
- 'write_bytecode')
+ self.raises_NotImplementedError(ins, 'path_stats', 'set_data')
def test_main():
from test.support import run_unittest
- run_unittest(PyLoaderTests, PyLoaderCompatTests,
- PyLoaderInterfaceTests,
- PyPycLoaderTests, PyPycLoaderInterfaceTests,
- SkipWritingBytecodeTests, RegeneratedBytecodeTests,
- BadBytecodeFailureTests, MissingPathsTests,
- SourceOnlyLoaderTests,
- SourceLoaderBytecodeTests,
- SourceLoaderGetSourceTests,
- AbstractMethodImplTests)
+ run_unittest(SourceOnlyLoaderTests,
+ SourceLoaderBytecodeTests,
+ SourceLoaderGetSourceTests,
+ AbstractMethodImplTests)
if __name__ == '__main__':
diff --git a/Lib/test/test_importlib/test_abc.py b/Lib/test/test_importlib/test_abc.py
index c620c37..a8d8c2e 100644
--- a/Lib/test/test_importlib/test_abc.py
+++ b/Lib/test/test_importlib/test_abc.py
@@ -43,11 +43,6 @@ class PathEntryFinder(InheritanceTests, unittest.TestCase):
subclasses = [machinery.FileFinder]
-class Loader(InheritanceTests, unittest.TestCase):
-
- subclasses = [abc.PyLoader]
-
-
class ResourceLoader(InheritanceTests, unittest.TestCase):
superclasses = [abc.Loader]
@@ -56,14 +51,13 @@ class ResourceLoader(InheritanceTests, unittest.TestCase):
class InspectLoader(InheritanceTests, unittest.TestCase):
superclasses = [abc.Loader]
- subclasses = [abc.PyLoader, machinery.BuiltinImporter,
+ subclasses = [machinery.BuiltinImporter,
machinery.FrozenImporter, machinery.ExtensionFileLoader]
class ExecutionLoader(InheritanceTests, unittest.TestCase):
superclasses = [abc.InspectLoader]
- subclasses = [abc.PyLoader]
class FileLoader(InheritanceTests, unittest.TestCase):
@@ -78,16 +72,6 @@ class SourceLoader(InheritanceTests, unittest.TestCase):
subclasses = [machinery.SourceFileLoader]
-class PyLoader(InheritanceTests, unittest.TestCase):
-
- superclasses = [abc.Loader, abc.ResourceLoader, abc.ExecutionLoader]
-
-
-class PyPycLoader(InheritanceTests, unittest.TestCase):
-
- superclasses = [abc.PyLoader]
-
-
def test_main():
from test.support import run_unittest
classes = []
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 555ad74..c99ba34 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -815,6 +815,14 @@ class SizeofTest:
bufio = self.tp(rawio, buffer_size=bufsize2)
self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
+ @support.cpython_only
+ def test_buffer_freeing(self) :
+ bufsize = 4096
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio, buffer_size=bufsize)
+ size = sys.getsizeof(bufio) - bufsize
+ bufio.close()
+ self.assertEqual(sys.getsizeof(bufio), size)
class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
read_mode = "rb"
@@ -2817,7 +2825,7 @@ class MiscIOTest(unittest.TestCase):
for fd in fds:
try:
os.close(fd)
- except EnvironmentError as e:
+ except OSError as e:
if e.errno != errno.EBADF:
raise
self.addCleanup(cleanup_fds)
diff --git a/Lib/test/test_iterlen.py b/Lib/test/test_iterlen.py
index 7469a31..57f7101 100644
--- a/Lib/test/test_iterlen.py
+++ b/Lib/test/test_iterlen.py
@@ -45,31 +45,21 @@ import unittest
from test import support
from itertools import repeat
from collections import deque
-from builtins import len as _len
+from operator import length_hint
n = 10
-def len(obj):
- try:
- return _len(obj)
- except TypeError:
- try:
- # note: this is an internal undocumented API,
- # don't rely on it in your own programs
- return obj.__length_hint__()
- except AttributeError:
- raise TypeError
class TestInvariantWithoutMutations(unittest.TestCase):
def test_invariant(self):
it = self.it
for i in reversed(range(1, n+1)):
- self.assertEqual(len(it), i)
+ self.assertEqual(length_hint(it), i)
next(it)
- self.assertEqual(len(it), 0)
+ self.assertEqual(length_hint(it), 0)
self.assertRaises(StopIteration, next, it)
- self.assertEqual(len(it), 0)
+ self.assertEqual(length_hint(it), 0)
class TestTemporarilyImmutable(TestInvariantWithoutMutations):
@@ -78,12 +68,12 @@ class TestTemporarilyImmutable(TestInvariantWithoutMutations):
# length immutability during iteration
it = self.it
- self.assertEqual(len(it), n)
+ self.assertEqual(length_hint(it), n)
next(it)
- self.assertEqual(len(it), n-1)
+ self.assertEqual(length_hint(it), n-1)
self.mutate()
self.assertRaises(RuntimeError, next, it)
- self.assertEqual(len(it), 0)
+ self.assertEqual(length_hint(it), 0)
## ------- Concrete Type Tests -------
@@ -92,10 +82,6 @@ class TestRepeat(TestInvariantWithoutMutations):
def setUp(self):
self.it = repeat(None, n)
- def test_no_len_for_infinite_repeat(self):
- # The repeat() object can also be infinite
- self.assertRaises(TypeError, len, repeat(None))
-
class TestXrange(TestInvariantWithoutMutations):
def setUp(self):
@@ -167,14 +153,15 @@ class TestList(TestInvariantWithoutMutations):
it = iter(d)
next(it)
next(it)
- self.assertEqual(len(it), n-2)
+ self.assertEqual(length_hint(it), n - 2)
d.append(n)
- self.assertEqual(len(it), n-1) # grow with append
+ self.assertEqual(length_hint(it), n - 1) # grow with append
d[1:] = []
- self.assertEqual(len(it), 0)
+ self.assertEqual(length_hint(it), 0)
self.assertEqual(list(it), [])
d.extend(range(20))
- self.assertEqual(len(it), 0)
+ self.assertEqual(length_hint(it), 0)
+
class TestListReversed(TestInvariantWithoutMutations):
@@ -186,32 +173,41 @@ class TestListReversed(TestInvariantWithoutMutations):
it = reversed(d)
next(it)
next(it)
- self.assertEqual(len(it), n-2)
+ self.assertEqual(length_hint(it), n - 2)
d.append(n)
- self.assertEqual(len(it), n-2) # ignore append
+ self.assertEqual(length_hint(it), n - 2) # ignore append
d[1:] = []
- self.assertEqual(len(it), 0)
+ self.assertEqual(length_hint(it), 0)
self.assertEqual(list(it), []) # confirm invariant
d.extend(range(20))
- self.assertEqual(len(it), 0)
+ self.assertEqual(length_hint(it), 0)
## -- Check to make sure exceptions are not suppressed by __length_hint__()
class BadLen(object):
- def __iter__(self): return iter(range(10))
+ def __iter__(self):
+ return iter(range(10))
+
def __len__(self):
raise RuntimeError('hello')
+
class BadLengthHint(object):
- def __iter__(self): return iter(range(10))
+ def __iter__(self):
+ return iter(range(10))
+
def __length_hint__(self):
raise RuntimeError('hello')
+
class NoneLengthHint(object):
- def __iter__(self): return iter(range(10))
+ def __iter__(self):
+ return iter(range(10))
+
def __length_hint__(self):
- return None
+ return NotImplemented
+
class TestLengthHintExceptions(unittest.TestCase):
diff --git a/Lib/test/test_itertools.py b/Lib/test/test_itertools.py
index 90e85a9..ece7f99 100644
--- a/Lib/test/test_itertools.py
+++ b/Lib/test/test_itertools.py
@@ -1723,9 +1723,8 @@ class TestVariousIteratorArgs(unittest.TestCase):
class LengthTransparency(unittest.TestCase):
def test_repeat(self):
- from test.test_iterlen import len
- self.assertEqual(len(repeat(None, 50)), 50)
- self.assertRaises(TypeError, len, repeat(None))
+ self.assertEqual(operator.length_hint(repeat(None, 50)), 50)
+ self.assertEqual(operator.length_hint(repeat(None), 12), 12)
class RegressionTests(unittest.TestCase):
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index cb908fb..9b76055 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -26,6 +26,7 @@ import logging.handlers
import logging.config
import codecs
+import configparser
import datetime
import pickle
import io
@@ -81,7 +82,7 @@ class BaseTest(unittest.TestCase):
"""Base class for logging tests."""
log_format = "%(name)s -> %(levelname)s: %(message)s"
- expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
+ expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$"
message_num = 0
def setUp(self):
@@ -150,14 +151,17 @@ class BaseTest(unittest.TestCase):
finally:
logging._releaseLock()
- def assert_log_lines(self, expected_values, stream=None):
+ def assert_log_lines(self, expected_values, stream=None, pat=None):
"""Match the collected log lines against the regular expression
self.expected_log_pat, and compare the extracted group values to
the expected_values list of tuples."""
stream = stream or self.stream
- pat = re.compile(self.expected_log_pat)
+ pat = re.compile(pat or self.expected_log_pat)
try:
- stream.reset()
+ if hasattr(stream, 'reset'):
+ stream.reset()
+ elif hasattr(stream, 'seek'):
+ stream.seek(0)
actual_lines = stream.readlines()
except AttributeError:
# StringIO.StringIO lacks a reset() method.
@@ -435,7 +439,7 @@ class CustomLevelsAndFiltersTest(BaseTest):
"""Test various filtering possibilities with custom logging levels."""
# Skip the logger name group.
- expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+ expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
def setUp(self):
BaseTest.setUp(self)
@@ -769,7 +773,7 @@ if threading:
"""
try:
asyncore.loop(poll_interval, map=self.sockmap)
- except select.error:
+ except OSError:
# On FreeBSD 8, closing the server repeatably
# raises this error. We swallow it if the
# server has been closed.
@@ -996,7 +1000,7 @@ class MemoryHandlerTest(BaseTest):
"""Tests for the MemoryHandler."""
# Do not bother with a logger name group.
- expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+ expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
def setUp(self):
BaseTest.setUp(self)
@@ -1049,7 +1053,7 @@ class ConfigFileTest(BaseTest):
"""Reading logging config from a .ini-style config file."""
- expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
+ expected_log_pat = r"^(\w+) \+\+ (\w+)$"
# config0 is a standard configuration.
config0 = """
@@ -1276,6 +1280,24 @@ class ConfigFileTest(BaseTest):
# Original logger output is empty.
self.assert_log_lines([])
+ def test_config0_using_cp_ok(self):
+ # A simple config file which overrides the default settings.
+ with captured_stdout() as output:
+ file = io.StringIO(textwrap.dedent(self.config0))
+ cp = configparser.ConfigParser()
+ cp.read_file(file)
+ logging.config.fileConfig(cp)
+ logger = logging.getLogger()
+ # Won't output anything
+ logger.info(self.next_message())
+ # Outputs a message
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
def test_config1_ok(self, config=config1):
# A config file defining a sub-parser as well.
with captured_stdout() as output:
@@ -1419,16 +1441,19 @@ class SocketHandlerTest(BaseTest):
self.assertEqual(self.log_output, "spam\neggs\n")
def test_noserver(self):
+ # Avoid timing-related failures due to SocketHandler's own hard-wired
+ # one-second timeout on socket.create_connection() (issue #16264).
+ self.sock_hdlr.retryStart = 2.5
# Kill the server
self.server.stop(2.0)
- #The logging call should try to connect, which should fail
+ # The logging call should try to connect, which should fail
try:
raise RuntimeError('Deliberate mistake')
except RuntimeError:
self.root_logger.exception('Never sent')
self.root_logger.error('Never sent, either')
now = time.time()
- self.assertTrue(self.sock_hdlr.retryTime > now)
+ self.assertGreater(self.sock_hdlr.retryTime, now)
time.sleep(self.sock_hdlr.retryTime - now + 0.001)
self.root_logger.error('Nor this')
@@ -1754,7 +1779,7 @@ class WarningsTest(BaseTest):
logger.removeHandler(h)
s = stream.getvalue()
h.close()
- self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
+ self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0)
#See if an explicit file uses the original implementation
a_file = io.StringIO()
@@ -1792,7 +1817,7 @@ class ConfigDictTest(BaseTest):
"""Reading logging config from a dictionary."""
- expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
+ expected_log_pat = r"^(\w+) \+\+ (\w+)$"
# config0 is a standard configuration.
config0 = {
@@ -2364,6 +2389,32 @@ class ConfigDictTest(BaseTest):
},
}
+ # As config0, but with properties
+ config14 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ '.': {
+ 'foo': 'bar',
+ 'terminator': '!\n',
+ }
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ 'handlers' : ['hand1'],
+ },
+ }
+
def apply_config(self, conf):
logging.config.dictConfig(conf)
@@ -2600,11 +2651,20 @@ class ConfigDictTest(BaseTest):
def test_config13_failure(self):
self.assertRaises(Exception, self.apply_config, self.config13)
+ def test_config14_ok(self):
+ with captured_stdout() as output:
+ self.apply_config(self.config14)
+ h = logging._handlers['hand1']
+ self.assertEqual(h.foo, 'bar')
+ self.assertEqual(h.terminator, '!\n')
+ logging.warning('Exclamation')
+ self.assertTrue(output.getvalue().endswith('Exclamation!\n'))
+
@unittest.skipUnless(threading, 'listen() needs threading to work')
- def setup_via_listener(self, text):
+ def setup_via_listener(self, text, verify=None):
text = text.encode("utf-8")
# Ask for a randomly assigned port (by using port 0)
- t = logging.config.listen(0)
+ t = logging.config.listen(0, verify)
t.start()
t.ready.wait()
# Now get the port allocated
@@ -2664,6 +2724,69 @@ class ConfigDictTest(BaseTest):
# Original logger output is empty.
self.assert_log_lines([])
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ def test_listen_verify(self):
+
+ def verify_fail(stuff):
+ return None
+
+ def verify_reverse(stuff):
+ return stuff[::-1]
+
+ logger = logging.getLogger("compiler.parser")
+ to_send = textwrap.dedent(ConfigFileTest.config1)
+ # First, specify a verification function that will fail.
+ # We expect to see no output, since our configuration
+ # never took effect.
+ with captured_stdout() as output:
+ self.setup_via_listener(to_send, verify_fail)
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([], stream=output)
+ # Original logger output has the stuff we logged.
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
+
+ # Now, perform no verification. Our configuration
+ # should take effect.
+
+ with captured_stdout() as output:
+ self.setup_via_listener(to_send) # no verify callable specified
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '3'),
+ ('ERROR', '4'),
+ ], stream=output)
+ # Original logger output still has the stuff we logged before.
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
+
+ # Now, perform verification which transforms the bytes.
+
+ with captured_stdout() as output:
+ self.setup_via_listener(to_send[::-1], verify_reverse)
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '5'),
+ ('ERROR', '6'),
+ ], stream=output)
+ # Original logger output still has the stuff we logged before.
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
+
def test_baseconfig(self):
d = {
'atuple': (1, 2, 3),
@@ -2716,14 +2839,14 @@ class ChildLoggerTest(BaseTest):
l2 = logging.getLogger('def.ghi')
c1 = r.getChild('xyz')
c2 = r.getChild('uvw.xyz')
- self.assertTrue(c1 is logging.getLogger('xyz'))
- self.assertTrue(c2 is logging.getLogger('uvw.xyz'))
+ self.assertIs(c1, logging.getLogger('xyz'))
+ self.assertIs(c2, logging.getLogger('uvw.xyz'))
c1 = l1.getChild('def')
c2 = c1.getChild('ghi')
c3 = l1.getChild('def.ghi')
- self.assertTrue(c1 is logging.getLogger('abc.def'))
- self.assertTrue(c2 is logging.getLogger('abc.def.ghi'))
- self.assertTrue(c2 is c3)
+ self.assertIs(c1, logging.getLogger('abc.def'))
+ self.assertIs(c2, logging.getLogger('abc.def.ghi'))
+ self.assertIs(c2, c3)
class DerivedLogRecord(logging.LogRecord):
@@ -2766,7 +2889,7 @@ class LogRecordFactoryTest(BaseTest):
class QueueHandlerTest(BaseTest):
# Do not bother with a logger name group.
- expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+ expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
def setUp(self):
BaseTest.setUp(self)
@@ -3804,7 +3927,7 @@ class NTEventLogHandlerTest(BaseTest):
h.handle(r)
h.close()
# Now see if the event is recorded
- self.assertTrue(num_recs < win32evtlog.GetNumberOfEventLogRecords(elh))
+ self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh))
flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
win32evtlog.EVENTLOG_SEQUENTIAL_READ
found = False
diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py
index 6b1e933..d1cc92e 100644
--- a/Lib/test/test_mailbox.py
+++ b/Lib/test/test_mailbox.py
@@ -596,7 +596,7 @@ class TestMaildir(TestMailbox, unittest.TestCase):
def setUp(self):
TestMailbox.setUp(self)
- if os.name in ('nt', 'os2') or sys.platform == 'cygwin':
+ if (os.name == 'nt') or (sys.platform == 'cygwin'):
self._box.colon = '!'
def assertMailboxEmpty(self):
diff --git a/Lib/test/test_mimetypes.py b/Lib/test/test_mimetypes.py
index 91da289..593fdb0 100644
--- a/Lib/test/test_mimetypes.py
+++ b/Lib/test/test_mimetypes.py
@@ -22,6 +22,8 @@ class MimeTypesTestCase(unittest.TestCase):
eq(self.db.guess_type("foo.tgz"), ("application/x-tar", "gzip"))
eq(self.db.guess_type("foo.tar.gz"), ("application/x-tar", "gzip"))
eq(self.db.guess_type("foo.tar.Z"), ("application/x-tar", "compress"))
+ eq(self.db.guess_type("foo.tar.bz2"), ("application/x-tar", "bzip2"))
+ eq(self.db.guess_type("foo.tar.xz"), ("application/x-tar", "xz"))
def test_data_urls(self):
eq = self.assertEqual
diff --git a/Lib/test/test_mmap.py b/Lib/test/test_mmap.py
index a3de398..e3b7909 100644
--- a/Lib/test/test_mmap.py
+++ b/Lib/test/test_mmap.py
@@ -245,7 +245,7 @@ class MmapTests(unittest.TestCase):
def test_bad_file_desc(self):
# Try opening a bad file descriptor...
- self.assertRaises(mmap.error, mmap.mmap, -2, 4096)
+ self.assertRaises(OSError, mmap.mmap, -2, 4096)
def test_tougher_find(self):
# Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2,
@@ -673,7 +673,7 @@ class MmapTests(unittest.TestCase):
# parameters to _get_osfhandle.
s = socket.socket()
try:
- with self.assertRaises(mmap.error):
+ with self.assertRaises(OSError):
m = mmap.mmap(s.fileno(), 10)
finally:
s.close()
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index b2a964c..9c7a202 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -19,6 +19,7 @@ import socket
import random
import logging
import struct
+import operator
import test.support
import test.script_helper
@@ -1617,6 +1618,18 @@ def mul(x, y):
class _TestPool(BaseTestCase):
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.pool = cls.Pool(4)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.pool.terminate()
+ cls.pool.join()
+ cls.pool = None
+ super().tearDownClass()
+
def test_apply(self):
papply = self.pool.apply
self.assertEqual(papply(sqr, (5,)), sqr(5))
@@ -1708,15 +1721,6 @@ class _TestPool(BaseTestCase):
p.join()
def test_terminate(self):
- if self.TYPE == 'manager':
- # On Unix a forked process increfs each shared object to
- # which its parent process held a reference. If the
- # forked process gets terminated then there is likely to
- # be a reference leak. So to prevent
- # _TestZZZNumberOfObjects from failing we skip this test
- # when using a manager.
- return
-
result = self.pool.map_async(
time.sleep, [0.1 for i in range(10000)], chunksize=1
)
@@ -1838,35 +1842,6 @@ class _TestPoolWorkerLifetime(BaseTestCase):
for (j, res) in enumerate(results):
self.assertEqual(res.get(), sqr(j))
-
-#
-# Test that manager has expected number of shared objects left
-#
-
-class _TestZZZNumberOfObjects(BaseTestCase):
- # Because test cases are sorted alphabetically, this one will get
- # run after all the other tests for the manager. It tests that
- # there have been no "reference leaks" for the manager's shared
- # objects. Note the comment in _TestPool.test_terminate().
-
- # If some other test using ManagerMixin.manager fails, then the
- # raised exception may keep alive a frame which holds a reference
- # to a managed object. This will cause test_number_of_objects to
- # also fail.
- ALLOWED_TYPES = ('manager',)
-
- def test_number_of_objects(self):
- EXPECTED_NUMBER = 1 # the pool object is still alive
- multiprocessing.active_children() # discard dead process objs
- gc.collect() # do garbage collection
- refs = self.manager._number_of_objects()
- debug_info = self.manager._debug_info()
- if refs != EXPECTED_NUMBER:
- print(self.manager._debug_info())
- print(debug_info)
-
- self.assertEqual(refs, EXPECTED_NUMBER)
-
#
# Test of creating a customized manager class
#
@@ -2903,15 +2878,6 @@ class TestInvalidHandle(unittest.TestCase):
# Functions used to create test cases from the base ones in this module
#
-def get_attributes(Source, names):
- d = {}
- for name in names:
- obj = getattr(Source, name)
- if type(obj) == type(get_attributes):
- obj = staticmethod(obj)
- d[name] = obj
- return d
-
def create_test_cases(Mixin, type):
result = {}
glob = globals()
@@ -2924,10 +2890,10 @@ def create_test_cases(Mixin, type):
assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
if type in base.ALLOWED_TYPES:
newname = 'With' + Type + name[1:]
- class Temp(base, unittest.TestCase, Mixin):
+ class Temp(base, Mixin, unittest.TestCase):
pass
result[newname] = Temp
- Temp.__name__ = newname
+ Temp.__name__ = Temp.__qualname__ = newname
Temp.__module__ = Mixin.__module__
return result
@@ -2938,12 +2904,24 @@ def create_test_cases(Mixin, type):
class ProcessesMixin(object):
TYPE = 'processes'
Process = multiprocessing.Process
- locals().update(get_attributes(multiprocessing, (
- 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
- 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue',
- 'RawArray', 'current_process', 'active_children', 'Pipe',
- 'connection', 'JoinableQueue', 'Pool'
- )))
+ connection = multiprocessing.connection
+ current_process = staticmethod(multiprocessing.current_process)
+ active_children = staticmethod(multiprocessing.active_children)
+ Pool = staticmethod(multiprocessing.Pool)
+ Pipe = staticmethod(multiprocessing.Pipe)
+ Queue = staticmethod(multiprocessing.Queue)
+ JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
+ Lock = staticmethod(multiprocessing.Lock)
+ RLock = staticmethod(multiprocessing.RLock)
+ Semaphore = staticmethod(multiprocessing.Semaphore)
+ BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
+ Condition = staticmethod(multiprocessing.Condition)
+ Event = staticmethod(multiprocessing.Event)
+ Barrier = staticmethod(multiprocessing.Barrier)
+ Value = staticmethod(multiprocessing.Value)
+ Array = staticmethod(multiprocessing.Array)
+ RawValue = staticmethod(multiprocessing.RawValue)
+ RawArray = staticmethod(multiprocessing.RawArray)
testcases_processes = create_test_cases(ProcessesMixin, type='processes')
globals().update(testcases_processes)
@@ -2952,12 +2930,42 @@ globals().update(testcases_processes)
class ManagerMixin(object):
TYPE = 'manager'
Process = multiprocessing.Process
- manager = object.__new__(multiprocessing.managers.SyncManager)
- locals().update(get_attributes(manager, (
- 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
- 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'list', 'dict',
- 'Namespace', 'JoinableQueue', 'Pool'
- )))
+ Queue = property(operator.attrgetter('manager.Queue'))
+ JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
+ Lock = property(operator.attrgetter('manager.Lock'))
+ RLock = property(operator.attrgetter('manager.RLock'))
+ Semaphore = property(operator.attrgetter('manager.Semaphore'))
+ BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
+ Condition = property(operator.attrgetter('manager.Condition'))
+ Event = property(operator.attrgetter('manager.Event'))
+ Barrier = property(operator.attrgetter('manager.Barrier'))
+ Value = property(operator.attrgetter('manager.Value'))
+ Array = property(operator.attrgetter('manager.Array'))
+ list = property(operator.attrgetter('manager.list'))
+ dict = property(operator.attrgetter('manager.dict'))
+ Namespace = property(operator.attrgetter('manager.Namespace'))
+
+ @classmethod
+ def Pool(cls, *args, **kwds):
+ return cls.manager.Pool(*args, **kwds)
+
+ @classmethod
+ def setUpClass(cls):
+ cls.manager = multiprocessing.Manager()
+
+ @classmethod
+ def tearDownClass(cls):
+ multiprocessing.active_children() # discard dead process objs
+ gc.collect() # do garbage collection
+ if cls.manager._number_of_objects() != 0:
+ # This is not really an error since some tests do not
+ # ensure that all processes which hold a reference to a
+ # managed object have been joined.
+ print('Shared objects which still exist at manager shutdown:')
+ print(cls.manager._debug_info())
+ cls.manager.shutdown()
+ cls.manager.join()
+ cls.manager = None
testcases_manager = create_test_cases(ManagerMixin, type='manager')
globals().update(testcases_manager)
@@ -2966,16 +2974,27 @@ globals().update(testcases_manager)
class ThreadsMixin(object):
TYPE = 'threads'
Process = multiprocessing.dummy.Process
- locals().update(get_attributes(multiprocessing.dummy, (
- 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
- 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process',
- 'active_children', 'Pipe', 'connection', 'dict', 'list',
- 'Namespace', 'JoinableQueue', 'Pool'
- )))
+ connection = multiprocessing.dummy.connection
+ current_process = staticmethod(multiprocessing.dummy.current_process)
+ active_children = staticmethod(multiprocessing.dummy.active_children)
+ Pool = staticmethod(multiprocessing.Pool)
+ Pipe = staticmethod(multiprocessing.dummy.Pipe)
+ Queue = staticmethod(multiprocessing.dummy.Queue)
+ JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
+ Lock = staticmethod(multiprocessing.dummy.Lock)
+ RLock = staticmethod(multiprocessing.dummy.RLock)
+ Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
+ BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
+ Condition = staticmethod(multiprocessing.dummy.Condition)
+ Event = staticmethod(multiprocessing.dummy.Event)
+ Barrier = staticmethod(multiprocessing.dummy.Barrier)
+ Value = staticmethod(multiprocessing.dummy.Value)
+ Array = staticmethod(multiprocessing.dummy.Array)
testcases_threads = create_test_cases(ThreadsMixin, type='threads')
globals().update(testcases_threads)
+
class OtherTest(unittest.TestCase):
# TODO: add more tests for deliver/answer challenge.
def test_deliver_challenge_auth_failure(self):
@@ -3407,12 +3426,6 @@ def test_main(run=None):
multiprocessing.get_logger().setLevel(LOG_LEVEL)
- ProcessesMixin.pool = multiprocessing.Pool(4)
- ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
- ManagerMixin.manager.__init__()
- ManagerMixin.manager.start()
- ManagerMixin.pool = ManagerMixin.manager.Pool(4)
-
testcases = (
sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
@@ -3422,18 +3435,7 @@ def test_main(run=None):
loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
- try:
- run(suite)
- finally:
- ThreadsMixin.pool.terminate()
- ProcessesMixin.pool.terminate()
- ManagerMixin.pool.terminate()
- ManagerMixin.pool.join()
- ManagerMixin.manager.shutdown()
- ManagerMixin.manager.join()
- ThreadsMixin.pool.join()
- ProcessesMixin.pool.join()
- del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
+ run(suite)
def main():
test_main(unittest.TextTestRunner(verbosity=2).run)
diff --git a/Lib/test/test_openpty.py b/Lib/test/test_openpty.py
index e8175ff..8713d34 100644
--- a/Lib/test/test_openpty.py
+++ b/Lib/test/test_openpty.py
@@ -4,7 +4,7 @@ import os, unittest
from test.support import run_unittest
if not hasattr(os, "openpty"):
- raise unittest.SkipTest("No openpty() available.")
+ raise unittest.SkipTest("os.openpty() not available.")
class OpenptyTest(unittest.TestCase):
diff --git a/Lib/test/test_operator.py b/Lib/test/test_operator.py
index fa608b9..b445ee0 100644
--- a/Lib/test/test_operator.py
+++ b/Lib/test/test_operator.py
@@ -410,6 +410,31 @@ class OperatorTestCase(unittest.TestCase):
self.assertEqual(operator.__ixor__ (c, 5), "ixor")
self.assertEqual(operator.__iconcat__ (c, c), "iadd")
+ def test_length_hint(self):
+ class X(object):
+ def __init__(self, value):
+ self.value = value
+
+ def __length_hint__(self):
+ if type(self.value) is type:
+ raise self.value
+ else:
+ return self.value
+
+ self.assertEqual(operator.length_hint([], 2), 0)
+ self.assertEqual(operator.length_hint(iter([1, 2, 3])), 3)
+
+ self.assertEqual(operator.length_hint(X(2)), 2)
+ self.assertEqual(operator.length_hint(X(NotImplemented), 4), 4)
+ self.assertEqual(operator.length_hint(X(TypeError), 12), 12)
+ with self.assertRaises(TypeError):
+ operator.length_hint(X("abc"))
+ with self.assertRaises(ValueError):
+ operator.length_hint(X(-2))
+ with self.assertRaises(LookupError):
+ operator.length_hint(X(LookupError))
+
+
def test_main(verbose=None):
import sys
test_classes = (
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index ede5f3f..4f86ef5 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -2050,6 +2050,103 @@ class TermsizeTests(unittest.TestCase):
self.assertEqual(expected, actual)
+class OSErrorTests(unittest.TestCase):
+ def setUp(self):
+ class Str(str):
+ pass
+
+ self.bytes_filenames = []
+ self.unicode_filenames = []
+ if support.TESTFN_UNENCODABLE is not None:
+ decoded = support.TESTFN_UNENCODABLE
+ else:
+ decoded = support.TESTFN
+ self.unicode_filenames.append(decoded)
+ self.unicode_filenames.append(Str(decoded))
+ if support.TESTFN_UNDECODABLE is not None:
+ encoded = support.TESTFN_UNDECODABLE
+ else:
+ encoded = os.fsencode(support.TESTFN)
+ self.bytes_filenames.append(encoded)
+ self.bytes_filenames.append(memoryview(encoded))
+
+ self.filenames = self.bytes_filenames + self.unicode_filenames
+
+ def test_oserror_filename(self):
+ funcs = [
+ (self.filenames, os.chdir,),
+ (self.filenames, os.chmod, 0o777),
+ (self.filenames, os.lstat,),
+ (self.filenames, os.open, os.O_RDONLY),
+ (self.filenames, os.rmdir,),
+ (self.filenames, os.stat,),
+ (self.filenames, os.unlink,),
+ ]
+ if sys.platform == "win32":
+ funcs.extend((
+ (self.bytes_filenames, os.rename, b"dst"),
+ (self.bytes_filenames, os.replace, b"dst"),
+ (self.unicode_filenames, os.rename, "dst"),
+ (self.unicode_filenames, os.replace, "dst"),
+ # Issue #16414: Don't test undecodable names with listdir()
+ # because of a Windows bug.
+ #
+ # With the ANSI code page 932, os.listdir(b'\xe7') return an
+ # empty list (instead of failing), whereas os.listdir(b'\xff')
+ # raises a FileNotFoundError. It looks like a Windows bug:
+ # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
+ # fails with ERROR_FILE_NOT_FOUND (2), instead of
+ # ERROR_PATH_NOT_FOUND (3).
+ (self.unicode_filenames, os.listdir,),
+ ))
+ else:
+ funcs.extend((
+ (self.filenames, os.listdir,),
+ (self.filenames, os.rename, "dst"),
+ (self.filenames, os.replace, "dst"),
+ ))
+ if hasattr(os, "chown"):
+ funcs.append((self.filenames, os.chown, 0, 0))
+ if hasattr(os, "lchown"):
+ funcs.append((self.filenames, os.lchown, 0, 0))
+ if hasattr(os, "truncate"):
+ funcs.append((self.filenames, os.truncate, 0))
+ if hasattr(os, "chflags"):
+ funcs.append((self.filenames, os.chflags, 0))
+ if hasattr(os, "lchflags"):
+ funcs.append((self.filenames, os.lchflags, 0))
+ if hasattr(os, "chroot"):
+ funcs.append((self.filenames, os.chroot,))
+ if hasattr(os, "link"):
+ if sys.platform == "win32":
+ funcs.append((self.bytes_filenames, os.link, b"dst"))
+ funcs.append((self.unicode_filenames, os.link, "dst"))
+ else:
+ funcs.append((self.filenames, os.link, "dst"))
+ if hasattr(os, "listxattr"):
+ funcs.extend((
+ (self.filenames, os.listxattr,),
+ (self.filenames, os.getxattr, "user.test"),
+ (self.filenames, os.setxattr, "user.test", b'user'),
+ (self.filenames, os.removexattr, "user.test"),
+ ))
+ if hasattr(os, "lchmod"):
+ funcs.append((self.filenames, os.lchmod, 0o777))
+ if hasattr(os, "readlink"):
+ if sys.platform == "win32":
+ funcs.append((self.unicode_filenames, os.readlink,))
+ else:
+ funcs.append((self.filenames, os.readlink,))
+
+ for filenames, func, *func_args in funcs:
+ for name in filenames:
+ try:
+ func(name, *func_args)
+ except OSError as err:
+ self.assertIs(err.filename, name)
+ else:
+ self.fail("No exception thrown by {}".format(func))
+
@support.reap_threads
def test_main():
support.run_unittest(
@@ -2078,6 +2175,7 @@ def test_main():
ExtendedAttributeTests,
Win32DeprecatedBytesAPI,
TermsizeTests,
+ OSErrorTests,
)
if __name__ == "__main__":
diff --git a/Lib/test/test_pep277.py b/Lib/test/test_pep277.py
index 4b16cbb..9bae6dc 100644
--- a/Lib/test/test_pep277.py
+++ b/Lib/test/test_pep277.py
@@ -99,10 +99,6 @@ class UnicodeFileTests(unittest.TestCase):
with self.assertRaises(expected_exception) as c:
fn(filename)
exc_filename = c.exception.filename
- # listdir may append a wildcard to the filename
- if fn is os.listdir and sys.platform == 'win32':
- exc_filename, _, wildcard = exc_filename.rpartition(os.sep)
- self.assertEqual(wildcard, '*.*')
if check_filename:
self.assertEqual(exc_filename, filename, "Function '%s(%a) failed "
"with bad filename in the exception: %a" %
diff --git a/Lib/test/test_poll.py b/Lib/test/test_poll.py
index 7ab3b84..3b7926d 100644
--- a/Lib/test/test_poll.py
+++ b/Lib/test/test_poll.py
@@ -1,12 +1,12 @@
# Test case for the os.poll() function
-import os, select, random, unittest
+import os, select, random, unittest, subprocess
from test.support import TESTFN, run_unittest
try:
select.poll
except AttributeError:
- raise unittest.SkipTest("select.poll not defined -- skipping test_poll")
+ raise unittest.SkipTest("select.poll not defined")
def find_ready_matching(ready, flag):
@@ -67,13 +67,11 @@ class PollTests(unittest.TestCase):
self.assertEqual(bufs, [MSG] * NUM_PIPES)
- def poll_unit_tests(self):
+ def test_poll_unit_tests(self):
# returns NVAL for invalid file descriptor
- FD = 42
- try:
- os.close(FD)
- except OSError:
- pass
+ FD, w = os.pipe()
+ os.close(FD)
+ os.close(w)
p = select.poll()
p.register(FD)
r = p.poll()
@@ -116,7 +114,9 @@ class PollTests(unittest.TestCase):
def test_poll2(self):
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
- p = os.popen(cmd, 'r')
+ proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
+ bufsize=0)
+ p = proc.stdout
pollster = select.poll()
pollster.register( p, select.POLLIN )
for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
@@ -126,7 +126,7 @@ class PollTests(unittest.TestCase):
fd, flags = fdlist[0]
if flags & select.POLLHUP:
line = p.readline()
- if line != "":
+ if line != b"":
self.fail('error: pipe seems to be closed, but still returns data')
continue
@@ -134,6 +134,7 @@ class PollTests(unittest.TestCase):
line = p.readline()
if not line:
break
+ self.assertEqual(line, b'testing...\n')
continue
else:
self.fail('Unexpected return value from select.poll: %s' % fdlist)
diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py
index c0929a0..e85dd2a 100644
--- a/Lib/test/test_poplib.py
+++ b/Lib/test/test_poplib.py
@@ -18,6 +18,13 @@ threading = test_support.import_module('threading')
HOST = test_support.HOST
PORT = 0
+SUPPORTS_SSL = False
+if hasattr(poplib, 'POP3_SSL'):
+ import ssl
+
+ SUPPORTS_SSL = True
+ CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem")
+
# the dummy data returned by server when LIST and RETR commands are issued
LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
RETR_RESP = b"""From: postmaster@python.org\
@@ -33,11 +40,15 @@ line3\r\n\
class DummyPOP3Handler(asynchat.async_chat):
+ CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']}
+
def __init__(self, conn):
asynchat.async_chat.__init__(self, conn)
self.set_terminator(b"\r\n")
self.in_buffer = []
self.push('+OK dummy pop3 server ready. <timestamp>')
+ self.tls_active = False
+ self.tls_starting = False
def collect_incoming_data(self, data):
self.in_buffer.append(data)
@@ -112,6 +123,65 @@ class DummyPOP3Handler(asynchat.async_chat):
self.push('+OK closing.')
self.close_when_done()
+ def _get_capas(self):
+ _capas = dict(self.CAPAS)
+ if not self.tls_active and SUPPORTS_SSL:
+ _capas['STLS'] = []
+ return _capas
+
+ def cmd_capa(self, arg):
+ self.push('+OK Capability list follows')
+ if self._get_capas():
+ for cap, params in self._get_capas().items():
+ _ln = [cap]
+ if params:
+ _ln.extend(params)
+ self.push(' '.join(_ln))
+ self.push('.')
+
+ if SUPPORTS_SSL:
+
+ def cmd_stls(self, arg):
+ if self.tls_active is False:
+ self.push('+OK Begin TLS negotiation')
+ tls_sock = ssl.wrap_socket(self.socket, certfile=CERTFILE,
+ server_side=True,
+ do_handshake_on_connect=False,
+ suppress_ragged_eofs=False)
+ self.del_channel()
+ self.set_socket(tls_sock)
+ self.tls_active = True
+ self.tls_starting = True
+ self.in_buffer = []
+ self._do_tls_handshake()
+ else:
+ self.push('-ERR Command not permitted when TLS active')
+
+ def _do_tls_handshake(self):
+ try:
+ self.socket.do_handshake()
+ except ssl.SSLError as err:
+ if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
+ ssl.SSL_ERROR_WANT_WRITE):
+ return
+ elif err.args[0] == ssl.SSL_ERROR_EOF:
+ return self.handle_close()
+ raise
+ except socket.error as err:
+ if err.args[0] == errno.ECONNABORTED:
+ return self.handle_close()
+ else:
+ self.tls_active = True
+ self.tls_starting = False
+
+ def handle_read(self):
+ if self.tls_starting:
+ self._do_tls_handshake()
+ else:
+ try:
+ asynchat.async_chat.handle_read(self)
+ except ssl.SSLEOFError:
+ self.handle_close()
class DummyPOP3Server(asyncore.dispatcher, threading.Thread):
@@ -232,19 +302,35 @@ class TestPOP3Class(TestCase):
self.client.uidl()
self.client.uidl('foo')
+ def test_capa(self):
+ capa = self.client.capa()
+ self.assertTrue('IMPLEMENTATION' in capa.keys())
+
def test_quit(self):
resp = self.client.quit()
self.assertTrue(resp)
self.assertIsNone(self.client.sock)
self.assertIsNone(self.client.file)
+ if SUPPORTS_SSL:
-SUPPORTS_SSL = False
-if hasattr(poplib, 'POP3_SSL'):
- import ssl
+ def test_stls_capa(self):
+ capa = self.client.capa()
+ self.assertTrue('STLS' in capa.keys())
- SUPPORTS_SSL = True
- CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem")
+ def test_stls(self):
+ expected = b'+OK Begin TLS negotiation'
+ resp = self.client.stls()
+ self.assertEqual(resp, expected)
+
+ def test_stls_context(self):
+ expected = b'+OK Begin TLS negotiation'
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ resp = self.client.stls(context=ctx)
+ self.assertEqual(resp, expected)
+
+
+if SUPPORTS_SSL:
class DummyPOP3_SSLHandler(DummyPOP3Handler):
@@ -256,34 +342,13 @@ if hasattr(poplib, 'POP3_SSL'):
self.del_channel()
self.set_socket(ssl_socket)
# Must try handshake before calling push()
- self._ssl_accepting = True
- self._do_ssl_handshake()
+ self.tls_active = True
+ self.tls_starting = True
+ self._do_tls_handshake()
self.set_terminator(b"\r\n")
self.in_buffer = []
self.push('+OK dummy pop3 server ready. <timestamp>')
- def _do_ssl_handshake(self):
- try:
- self.socket.do_handshake()
- except ssl.SSLError as err:
- if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
- ssl.SSL_ERROR_WANT_WRITE):
- return
- elif err.args[0] == ssl.SSL_ERROR_EOF:
- return self.handle_close()
- raise
- except socket.error as err:
- if err.args[0] == errno.ECONNABORTED:
- return self.handle_close()
- else:
- self._ssl_accepting = False
-
- def handle_read(self):
- if self._ssl_accepting:
- self._do_ssl_handshake()
- else:
- DummyPOP3Handler.handle_read(self)
-
class TestPOP3_SSLClass(TestPOP3Class):
# repeat previous tests by using poplib.POP3_SSL
@@ -314,6 +379,39 @@ if hasattr(poplib, 'POP3_SSL'):
self.assertIs(self.client.sock.context, ctx)
self.assertTrue(self.client.noop().startswith(b'+OK'))
+ def test_stls(self):
+ self.assertRaises(poplib.error_proto, self.client.stls)
+
+ test_stls_context = test_stls
+
+ def test_stls_capa(self):
+ capa = self.client.capa()
+ self.assertFalse('STLS' in capa.keys())
+
+
+ class TestPOP3_TLSClass(TestPOP3Class):
+ # repeat previous tests by using poplib.POP3.stls()
+
+ def setUp(self):
+ self.server = DummyPOP3Server((HOST, PORT))
+ self.server.start()
+ self.client = poplib.POP3(self.server.host, self.server.port, timeout=3)
+ self.client.stls()
+
+ def tearDown(self):
+ if self.client.file is not None and self.client.sock is not None:
+ self.client.quit()
+ self.server.stop()
+
+ def test_stls(self):
+ self.assertRaises(poplib.error_proto, self.client.stls)
+
+ test_stls_context = test_stls
+
+ def test_stls_capa(self):
+ capa = self.client.capa()
+ self.assertFalse(b'STLS' in capa.keys())
+
class TestTimeouts(TestCase):
@@ -373,6 +471,7 @@ def test_main():
tests = [TestPOP3Class, TestTimeouts]
if SUPPORTS_SSL:
tests.append(TestPOP3_SSLClass)
+ tests.append(TestPOP3_TLSClass)
thread_info = test_support.threading_setup()
try:
test_support.run_unittest(*tests)
diff --git a/Lib/test/test_random.py b/Lib/test/test_random.py
index b5931ba..a9aec70 100644
--- a/Lib/test/test_random.py
+++ b/Lib/test/test_random.py
@@ -46,6 +46,39 @@ class TestBasicOps(unittest.TestCase):
self.assertRaises(TypeError, self.gen.seed, 1, 2, 3, 4)
self.assertRaises(TypeError, type(self.gen), [])
+ def test_shuffle(self):
+ shuffle = self.gen.shuffle
+ lst = []
+ shuffle(lst)
+ self.assertEqual(lst, [])
+ lst = [37]
+ shuffle(lst)
+ self.assertEqual(lst, [37])
+ seqs = [list(range(n)) for n in range(10)]
+ shuffled_seqs = [list(range(n)) for n in range(10)]
+ for shuffled_seq in shuffled_seqs:
+ shuffle(shuffled_seq)
+ for (seq, shuffled_seq) in zip(seqs, shuffled_seqs):
+ self.assertEqual(len(seq), len(shuffled_seq))
+ self.assertEqual(set(seq), set(shuffled_seq))
+
+ # The above tests all would pass if the shuffle was a
+ # no-op. The following non-deterministic test covers that. It
+ # asserts that the shuffled sequence of 1000 distinct elements
+ # must be different from the original one. Although there is
+ # mathematically a non-zero probability that this could
+ # actually happen in a genuinely random shuffle, it is
+ # completely negligible, given that the number of possible
+ # permutations of 1000 objects is 1000! (factorial of 1000),
+ # which is considerably larger than the number of atoms in the
+ # universe...
+ lst = list(range(1000))
+ shuffled_lst = list(range(1000))
+ shuffle(shuffled_lst)
+ self.assertTrue(lst != shuffled_lst)
+ shuffle(lst)
+ self.assertTrue(lst != shuffled_lst)
+
def test_choice(self):
choice = self.gen.choice
with self.assertRaises(IndexError):
diff --git a/Lib/test/test_range.py b/Lib/test/test_range.py
index 2a13bfe..f088387 100644
--- a/Lib/test/test_range.py
+++ b/Lib/test/test_range.py
@@ -313,7 +313,7 @@ class RangeTest(unittest.TestCase):
self.assertRaises(TypeError, range, IN())
# Test use of user-defined classes in slice indices.
- self.assertEqual(list(range(10)[:I(5)]), list(range(5)))
+ self.assertEqual(range(10)[:I(5)], range(5))
with self.assertRaises(RuntimeError):
range(0, 10)[:IX()]
diff --git a/Lib/test/test_select.py b/Lib/test/test_select.py
index ddb9a0f..8f9a1c9 100644
--- a/Lib/test/test_select.py
+++ b/Lib/test/test_select.py
@@ -5,7 +5,7 @@ import sys
import unittest
from test import support
-@unittest.skipIf(sys.platform[:3] in ('win', 'os2', 'riscos'),
+@unittest.skipIf((sys.platform[:3]=='win'),
"can't easily test on this system")
class SelectTestCase(unittest.TestCase):
@@ -32,7 +32,7 @@ class SelectTestCase(unittest.TestCase):
fp.close()
try:
select.select([fd], [], [], 0)
- except select.error as err:
+ except OSError as err:
self.assertEqual(err.errno, errno.EBADF)
else:
self.fail("exception not raised")
diff --git a/Lib/test/test_set.py b/Lib/test/test_set.py
index 8e9e587..da62723 100644
--- a/Lib/test/test_set.py
+++ b/Lib/test/test_set.py
@@ -848,8 +848,6 @@ class TestBasicOps(unittest.TestCase):
for v in self.set:
self.assertIn(v, self.values)
setiter = iter(self.set)
- # note: __length_hint__ is an internal undocumented API,
- # don't rely on it in your own programs
self.assertEqual(setiter.__length_hint__(), len(self.set))
def test_pickling(self):
diff --git a/Lib/test/test_shelve.py b/Lib/test/test_shelve.py
index 13c1265..bd51d86 100644
--- a/Lib/test/test_shelve.py
+++ b/Lib/test/test_shelve.py
@@ -148,6 +148,19 @@ class TestCase(unittest.TestCase):
p2 = d[encodedkey]
self.assertNotEqual(p1, p2) # Write creates new object in store
+ def test_with(self):
+ d1 = {}
+ with shelve.Shelf(d1, protocol=2, writeback=False) as s:
+ s['key1'] = [1,2,3,4]
+ self.assertEqual(s['key1'], [1,2,3,4])
+ self.assertEqual(len(s), 1)
+ self.assertRaises(ValueError, len, s)
+ try:
+ s['key1']
+ except ValueError:
+ pass
+ else:
+ self.fail('Closed shelf should not find a key')
from test import mapping_tests
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py
index 01b93fc..ad62280 100644
--- a/Lib/test/test_shutil.py
+++ b/Lib/test/test_shutil.py
@@ -18,7 +18,8 @@ from shutil import (_make_tarball, _make_zipfile, make_archive,
register_archive_format, unregister_archive_format,
get_archive_formats, Error, unpack_archive,
register_unpack_format, RegistryError,
- unregister_unpack_format, get_unpack_formats)
+ unregister_unpack_format, get_unpack_formats,
+ SameFileError)
import tarfile
import warnings
@@ -728,7 +729,7 @@ class TestShutil(unittest.TestCase):
with open(src, 'w') as f:
f.write('cheddar')
os.link(src, dst)
- self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
+ self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
@@ -748,7 +749,7 @@ class TestShutil(unittest.TestCase):
# to TESTFN/TESTFN/cheese, while it should point at
# TESTFN/cheese.
os.symlink('cheese', dst)
- self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
+ self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
@@ -1255,6 +1256,16 @@ class TestShutil(unittest.TestCase):
self.assertTrue(os.path.exists(rv))
self.assertEqual(read_file(src_file), read_file(dst_file))
+ def test_copyfile_same_file(self):
+ # copyfile() should raise SameFileError if the source and destination
+ # are the same.
+ src_dir = self.mkdtemp()
+ src_file = os.path.join(src_dir, 'foo')
+ write_file(src_file, 'foo')
+ self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
+ # But Error should work too, to stay backward compatible.
+ self.assertRaises(Error, shutil.copyfile, src_file, src_file)
+
def test_copytree_return_value(self):
# copytree returns its destination path.
src_dir = self.mkdtemp()
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index e87900a..d2fd84b 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -15,9 +15,6 @@ try:
except ImportError:
threading = None
-if sys.platform in ('os2', 'riscos'):
- raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
-
class HandlerBCalled(Exception):
pass
@@ -36,7 +33,7 @@ def exit_subprocess():
def ignoring_eintr(__func, *args, **kwargs):
try:
return __func(*args, **kwargs)
- except EnvironmentError as e:
+ except OSError as e:
if e.errno != errno.EINTR:
raise
return None
@@ -302,10 +299,10 @@ class WakeupSignalTests(unittest.TestCase):
# We attempt to get a signal during the select call
try:
select.select([read], [], [], TIMEOUT_FULL)
- except select.error:
+ except OSError:
pass
else:
- raise Exception("select.error not raised")
+ raise Exception("OSError not raised")
after_time = time.time()
dt = after_time - before_time
if dt >= TIMEOUT_HALF:
diff --git a/Lib/test/test_site.py b/Lib/test/test_site.py
index 29286c7..9c7840f 100644
--- a/Lib/test/test_site.py
+++ b/Lib/test/test_site.py
@@ -222,11 +222,7 @@ class HelperFunctionsTests(unittest.TestCase):
site.PREFIXES = ['xoxo']
dirs = site.getsitepackages()
- if sys.platform in ('os2emx', 'riscos'):
- self.assertEqual(len(dirs), 1)
- wanted = os.path.join('xoxo', 'Lib', 'site-packages')
- self.assertEqual(dirs[0], wanted)
- elif (sys.platform == "darwin" and
+ if (sys.platform == "darwin" and
sysconfig.get_config_var("PYTHONFRAMEWORK")):
# OS X framework builds
site.PREFIXES = ['Python.framework']
diff --git a/Lib/test/test_slice.py b/Lib/test/test_slice.py
index 2df9271..9203d5e 100644
--- a/Lib/test/test_slice.py
+++ b/Lib/test/test_slice.py
@@ -4,8 +4,70 @@ import unittest
from test import support
from pickle import loads, dumps
+import itertools
+import operator
import sys
+
+def evaluate_slice_index(arg):
+ """
+ Helper function to convert a slice argument to an integer, and raise
+ TypeError with a suitable message on failure.
+
+ """
+ if hasattr(arg, '__index__'):
+ return operator.index(arg)
+ else:
+ raise TypeError(
+ "slice indices must be integers or "
+ "None or have an __index__ method")
+
+def slice_indices(slice, length):
+ """
+ Reference implementation for the slice.indices method.
+
+ """
+ # Compute step and length as integers.
+ length = operator.index(length)
+ step = 1 if slice.step is None else evaluate_slice_index(slice.step)
+
+ # Raise ValueError for negative length or zero step.
+ if length < 0:
+ raise ValueError("length should not be negative")
+ if step == 0:
+ raise ValueError("slice step cannot be zero")
+
+ # Find lower and upper bounds for start and stop.
+ lower = -1 if step < 0 else 0
+ upper = length - 1 if step < 0 else length
+
+ # Compute start.
+ if slice.start is None:
+ start = upper if step < 0 else lower
+ else:
+ start = evaluate_slice_index(slice.start)
+ start = max(start + length, lower) if start < 0 else min(start, upper)
+
+ # Compute stop.
+ if slice.stop is None:
+ stop = lower if step < 0 else upper
+ else:
+ stop = evaluate_slice_index(slice.stop)
+ stop = max(stop + length, lower) if stop < 0 else min(stop, upper)
+
+ return start, stop, step
+
+
+# Class providing an __index__ method. Used for testing slice.indices.
+
+class MyIndexable(object):
+ def __init__(self, value):
+ self.value = value
+
+ def __index__(self):
+ return self.value
+
+
class SliceTest(unittest.TestCase):
def test_constructor(self):
@@ -75,6 +137,22 @@ class SliceTest(unittest.TestCase):
s = slice(obj)
self.assertTrue(s.stop is obj)
+ def check_indices(self, slice, length):
+ try:
+ actual = slice.indices(length)
+ except ValueError:
+ actual = "valueerror"
+ try:
+ expected = slice_indices(slice, length)
+ except ValueError:
+ expected = "valueerror"
+ self.assertEqual(actual, expected)
+
+ if length >= 0 and slice.step != 0:
+ actual = range(*slice.indices(length))
+ expected = range(length)[slice]
+ self.assertEqual(actual, expected)
+
def test_indices(self):
self.assertEqual(slice(None ).indices(10), (0, 10, 1))
self.assertEqual(slice(None, None, 2).indices(10), (0, 10, 2))
@@ -108,7 +186,41 @@ class SliceTest(unittest.TestCase):
self.assertEqual(list(range(10))[::sys.maxsize - 1], [0])
- self.assertRaises(OverflowError, slice(None).indices, 1<<100)
+ # Check a variety of start, stop, step and length values, including
+ # values exceeding sys.maxsize (see issue #14794).
+ vals = [None, -2**100, -2**30, -53, -7, -1, 0, 1, 7, 53, 2**30, 2**100]
+ lengths = [0, 1, 7, 53, 2**30, 2**100]
+ for slice_args in itertools.product(vals, repeat=3):
+ s = slice(*slice_args)
+ for length in lengths:
+ self.check_indices(s, length)
+ self.check_indices(slice(0, 10, 1), -3)
+
+ # Negative length should raise ValueError
+ with self.assertRaises(ValueError):
+ slice(None).indices(-1)
+
+ # Zero step should raise ValueError
+ with self.assertRaises(ValueError):
+ slice(0, 10, 0).indices(5)
+
+ # Using a start, stop or step or length that can't be interpreted as an
+ # integer should give a TypeError ...
+ with self.assertRaises(TypeError):
+ slice(0.0, 10, 1).indices(5)
+ with self.assertRaises(TypeError):
+ slice(0, 10.0, 1).indices(5)
+ with self.assertRaises(TypeError):
+ slice(0, 10, 1.0).indices(5)
+ with self.assertRaises(TypeError):
+ slice(0, 10, 1).indices(5.0)
+
+ # ... but it should be fine to use a custom class that provides index.
+ self.assertEqual(slice(0, 10, 1).indices(5), (0, 5, 1))
+ self.assertEqual(slice(MyIndexable(0), 10, 1).indices(5), (0, 5, 1))
+ self.assertEqual(slice(0, MyIndexable(10), 1).indices(5), (0, 5, 1))
+ self.assertEqual(slice(0, 10, MyIndexable(1)).indices(5), (0, 5, 1))
+ self.assertEqual(slice(0, 10, 1).indices(MyIndexable(5)), (0, 5, 1))
def test_setslice_without_getslice(self):
tmp = []
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
index befc49e..5a086c8 100644
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -524,12 +524,6 @@ class DebuggingServerTests(unittest.TestCase):
class NonConnectingTests(unittest.TestCase):
- def setUp(self):
- smtplib.socket = mock_socket
-
- def tearDown(self):
- smtplib.socket = socket
-
def testNotConnected(self):
# Test various operations on an unconnected SMTP object that
# should raise exceptions (at present the attempt in SMTP.send
@@ -542,9 +536,9 @@ class NonConnectingTests(unittest.TestCase):
def testNonnumericPort(self):
# check that non-numeric port raises socket.error
- self.assertRaises(mock_socket.error, smtplib.SMTP,
+ self.assertRaises(OSError, smtplib.SMTP,
"localhost", "bogus")
- self.assertRaises(mock_socket.error, smtplib.SMTP,
+ self.assertRaises(OSError, smtplib.SMTP,
"localhost:bogus")
diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py
index 464057e..3dd0d64 100644
--- a/Lib/test/test_socketserver.py
+++ b/Lib/test/test_socketserver.py
@@ -27,7 +27,7 @@ TEST_STR = b"hello world\n"
HOST = test.support.HOST
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
-HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
+HAVE_FORKING = hasattr(os, "fork")
def signal_alarm(n):
"""Call signal.alarm when it exists (i.e. not on Windows)."""
@@ -93,21 +93,7 @@ class SocketServerTest(unittest.TestCase):
# XXX: We need a way to tell AF_UNIX to pick its own name
# like AF_INET provides port==0.
dir = None
- if os.name == 'os2':
- dir = '\socket'
fn = tempfile.mktemp(prefix='unix_socket.', dir=dir)
- if os.name == 'os2':
- # AF_UNIX socket names on OS/2 require a specific prefix
- # which can't include a drive letter and must also use
- # backslashes as directory separators
- if fn[1] == ':':
- fn = fn[2:]
- if fn[0] in (os.sep, os.altsep):
- fn = fn[1:]
- if os.sep == '/':
- fn = fn.replace(os.sep, os.altsep)
- else:
- fn = fn.replace(os.altsep, os.sep)
self.test_files.append(fn)
return fn
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
index b0cd63c..c5a513e 100644
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -925,8 +925,7 @@ class ProcessTestCase(BaseTestCase):
# value for that limit, but Windows has 2048, so we loop
# 1024 times (each call leaked two fds).
for i in range(1024):
- # Windows raises IOError. Others raise OSError.
- with self.assertRaises(EnvironmentError) as c:
+ with self.assertRaises(OSError) as c:
subprocess.Popen(['nonexisting_i_hope'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
@@ -1179,7 +1178,7 @@ class POSIXProcessTestCase(BaseTestCase):
try:
p = subprocess.Popen([sys.executable, "-c", ""],
preexec_fn=raise_it)
- except RuntimeError as e:
+ except subprocess.SubprocessError as e:
self.assertTrue(
subprocess._posixsubprocess,
"Expected a ValueError from the preexec_fn")
@@ -1218,9 +1217,10 @@ class POSIXProcessTestCase(BaseTestCase):
"""Issue16140: Don't double close pipes on preexec error."""
def raise_it():
- raise RuntimeError("force the _execute_child() errpipe_data path.")
+ raise subprocess.SubprocessError(
+ "force the _execute_child() errpipe_data path.")
- with self.assertRaises(RuntimeError):
+ with self.assertRaises(subprocess.SubprocessError):
self._TestExecuteChildPopen(
self, [sys.executable, "-c", "pass"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
@@ -1581,12 +1581,12 @@ class POSIXProcessTestCase(BaseTestCase):
# Pure Python implementations keeps the message
self.assertIsNone(subprocess._posixsubprocess)
self.assertEqual(str(err), "surrogate:\uDCff")
- except RuntimeError as err:
+ except subprocess.SubprocessError as err:
# _posixsubprocess uses a default message
self.assertIsNotNone(subprocess._posixsubprocess)
self.assertEqual(str(err), "Exception occurred in preexec_fn.")
else:
- self.fail("Expected ValueError or RuntimeError")
+ self.fail("Expected ValueError or subprocess.SubprocessError")
def test_undecodable_env(self):
for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')):
@@ -1873,7 +1873,7 @@ class POSIXProcessTestCase(BaseTestCase):
# let some time for the process to exit, and create a new Popen: this
# should trigger the wait() of p
time.sleep(0.2)
- with self.assertRaises(EnvironmentError) as c:
+ with self.assertRaises(OSError) as c:
with subprocess.Popen(['nonexisting_i_hope'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as proc:
@@ -2154,7 +2154,7 @@ class ContextManagerTests(BaseTestCase):
self.assertEqual(proc.returncode, 1)
def test_invalid_args(self):
- with self.assertRaises(EnvironmentError) as c:
+ with self.assertRaises(OSError) as c:
with subprocess.Popen(['nonexisting_i_hope'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as proc:
diff --git a/Lib/test/test_sundry.py b/Lib/test/test_sundry.py
index fcccdf7..a364194 100644
--- a/Lib/test/test_sundry.py
+++ b/Lib/test/test_sundry.py
@@ -1,19 +1,26 @@
"""Do a minimal test of all the modules that aren't otherwise tested."""
-
-from test import support
+import importlib
import sys
+from test import support
import unittest
class TestUntestedModules(unittest.TestCase):
- def test_at_least_import_untested_modules(self):
+ def test_untested_modules_can_be_imported(self):
+ untested = ('bdb', 'encodings', 'formatter', 'getpass', 'imghdr',
+ 'keyword', 'macurl2path', 'nturl2path', 'tabnanny')
with support.check_warnings(quiet=True):
- import bdb
- import cgitb
+ for name in untested:
+ try:
+ support.import_module('test.test_{}'.format(name))
+ except unittest.SkipTest:
+ importlib.import_module(name)
+ else:
+ self.fail('{} has tests even though test_sundry claims '
+ 'otherwise'.format(name))
import distutils.bcppcompiler
import distutils.ccompiler
import distutils.cygwinccompiler
- import distutils.emxccompiler
import distutils.filelist
if sys.platform.startswith('win'):
import distutils.msvccompiler
@@ -39,22 +46,9 @@ class TestUntestedModules(unittest.TestCase):
import distutils.command.sdist
import distutils.command.upload
- import encodings
- import formatter
- import getpass
import html.entities
- import imghdr
- import keyword
- import macurl2path
- import mailcap
- import nturl2path
- import os2emxpath
- import pstats
- import py_compile
- import sndhdr
- import tabnanny
try:
- import tty # not available on Windows
+ import tty # Not available on Windows
except ImportError:
if support.verbose:
print("skipping tty")
diff --git a/Lib/test/test_syntax.py b/Lib/test/test_syntax.py
index 5926b69..a9d3628 100644
--- a/Lib/test/test_syntax.py
+++ b/Lib/test/test_syntax.py
@@ -33,7 +33,7 @@ SyntaxError: invalid syntax
>>> None = 1
Traceback (most recent call last):
-SyntaxError: assignment to keyword
+SyntaxError: can't assign to keyword
It's a syntax error to assign to the empty tuple. Why isn't it an
error to assign to the empty list? It will always raise some error at
@@ -233,7 +233,7 @@ Traceback (most recent call last):
SyntaxError: can't assign to generator expression
>>> None += 1
Traceback (most recent call last):
-SyntaxError: assignment to keyword
+SyntaxError: can't assign to keyword
>>> f() += 1
Traceback (most recent call last):
SyntaxError: can't assign to function call
diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py
index e5ec85c..b664687 100644
--- a/Lib/test/test_sys.py
+++ b/Lib/test/test_sys.py
@@ -6,6 +6,8 @@ import textwrap
import warnings
import operator
import codecs
+import gc
+import sysconfig
# count the number of test runs, used to create unique
# strings to intern in test_intern()
@@ -482,7 +484,7 @@ class SysModuleTest(unittest.TestCase):
def test_thread_info(self):
info = sys.thread_info
self.assertEqual(len(info), 3)
- self.assertIn(info.name, ('nt', 'os2', 'pthread', 'solaris', None))
+ self.assertIn(info.name, ('nt', 'pthread', 'solaris', None))
self.assertIn(info.lock, ('semaphore', 'mutex+cond', None))
def test_43581(self):
@@ -611,6 +613,36 @@ class SysModuleTest(unittest.TestCase):
ret, out, err = assert_python_ok(*args)
self.assertIn(b"free PyDictObjects", err)
+ @unittest.skipUnless(hasattr(sys, "getallocatedblocks"),
+ "sys.getallocatedblocks unavailable on this build")
+ def test_getallocatedblocks(self):
+ # Some sanity checks
+ with_pymalloc = sysconfig.get_config_var('WITH_PYMALLOC')
+ a = sys.getallocatedblocks()
+ self.assertIs(type(a), int)
+ if with_pymalloc:
+ self.assertGreater(a, 0)
+ else:
+ # When WITH_PYMALLOC isn't available, we don't know anything
+ # about the underlying implementation: the function might
+ # return 0 or something greater.
+ self.assertGreaterEqual(a, 0)
+ try:
+ # While we could imagine a Python session where the number of
+ # multiple buffer objects would exceed the sharing of references,
+ # it is unlikely to happen in a normal test run.
+ self.assertLess(a, sys.gettotalrefcount())
+ except AttributeError:
+ # gettotalrefcount() not available
+ pass
+ gc.collect()
+ b = sys.getallocatedblocks()
+ self.assertLessEqual(b, a)
+ gc.collect()
+ c = sys.getallocatedblocks()
+ self.assertIn(c, range(b - 50, b + 50))
+
+
class SizeofTest(unittest.TestCase):
def setUp(self):
diff --git a/Lib/test/test_sysconfig.py b/Lib/test/test_sysconfig.py
index 9219360..5293649 100644
--- a/Lib/test/test_sysconfig.py
+++ b/Lib/test/test_sysconfig.py
@@ -234,7 +234,7 @@ class TestSysConfig(unittest.TestCase):
self.assertTrue(os.path.isfile(config_h), config_h)
def test_get_scheme_names(self):
- wanted = ('nt', 'nt_user', 'os2', 'os2_home', 'osx_framework_user',
+ wanted = ('nt', 'nt_user', 'osx_framework_user',
'posix_home', 'posix_prefix', 'posix_user')
self.assertEqual(get_scheme_names(), wanted)
@@ -305,14 +305,13 @@ class TestSysConfig(unittest.TestCase):
if 'MACOSX_DEPLOYMENT_TARGET' in env:
del env['MACOSX_DEPLOYMENT_TARGET']
- with open('/dev/null', 'w') as devnull_fp:
- p = subprocess.Popen([
- sys.executable, '-c',
- 'import sysconfig; print(sysconfig.get_platform())',
- ],
- stdout=subprocess.PIPE,
- stderr=devnull_fp,
- env=env)
+ p = subprocess.Popen([
+ sys.executable, '-c',
+ 'import sysconfig; print(sysconfig.get_platform())',
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ env=env)
test_platform = p.communicate()[0].strip()
test_platform = test_platform.decode('utf-8')
status = p.wait()
@@ -325,20 +324,19 @@ class TestSysConfig(unittest.TestCase):
env = os.environ.copy()
env['MACOSX_DEPLOYMENT_TARGET'] = '10.1'
- with open('/dev/null') as dev_null:
- p = subprocess.Popen([
- sys.executable, '-c',
- 'import sysconfig; print(sysconfig.get_platform())',
- ],
- stdout=subprocess.PIPE,
- stderr=dev_null,
- env=env)
- test_platform = p.communicate()[0].strip()
- test_platform = test_platform.decode('utf-8')
- status = p.wait()
-
- self.assertEqual(status, 0)
- self.assertEqual(my_platform, test_platform)
+ p = subprocess.Popen([
+ sys.executable, '-c',
+ 'import sysconfig; print(sysconfig.get_platform())',
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ env=env)
+ test_platform = p.communicate()[0].strip()
+ test_platform = test_platform.decode('utf-8')
+ status = p.wait()
+
+ self.assertEqual(status, 0)
+ self.assertEqual(my_platform, test_platform)
def test_srcdir(self):
# See Issues #15322, #15364.
diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py
index c7bf774..8bbf4ae 100644
--- a/Lib/test/test_tarfile.py
+++ b/Lib/test/test_tarfile.py
@@ -342,7 +342,7 @@ class MiscReadTest(CommonReadTest):
tar.extract("ustar/regtype", TEMPDIR)
try:
tar.extract("ustar/lnktype", TEMPDIR)
- except EnvironmentError as e:
+ except OSError as e:
if e.errno == errno.ENOENT:
self.fail("hardlink not extracted properly")
@@ -352,7 +352,7 @@ class MiscReadTest(CommonReadTest):
try:
tar.extract("ustar/symtype", TEMPDIR)
- except EnvironmentError as e:
+ except OSError as e:
if e.errno == errno.ENOENT:
self.fail("symlink not extracted properly")
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py
index d79f319..f5193dc 100644
--- a/Lib/test/test_tempfile.py
+++ b/Lib/test/test_tempfile.py
@@ -149,7 +149,7 @@ class TestRandomNameSequence(BaseTestCase):
# via any bugs above
try:
os.kill(pid, signal.SIGKILL)
- except EnvironmentError:
+ except OSError:
pass
os.close(read_fd)
os.close(write_fd)
@@ -276,7 +276,7 @@ class TestMkstempInner(BaseTestCase):
file = self.do_create()
mode = stat.S_IMODE(os.stat(file.name).st_mode)
expected = 0o600
- if sys.platform in ('win32', 'os2emx'):
+ if sys.platform == 'win32':
# There's no distinction among 'user', 'group' and 'world';
# replicate the 'user' bits.
user = expected >> 6
@@ -310,7 +310,7 @@ class TestMkstempInner(BaseTestCase):
# On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
# but an arg with embedded spaces should be decorated with double
# quotes on each end
- if sys.platform in ('win32',):
+ if sys.platform == 'win32':
decorated = '"%s"' % sys.executable
tester = '"%s"' % tester
else:
@@ -479,7 +479,7 @@ class TestMkdtemp(BaseTestCase):
mode = stat.S_IMODE(os.stat(dir).st_mode)
mode &= 0o777 # Mask off sticky bits inherited from /tmp
expected = 0o700
- if sys.platform in ('win32', 'os2emx'):
+ if sys.platform == 'win32':
# There's no distinction among 'user', 'group' and 'world';
# replicate the 'user' bits.
user = expected >> 6
diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py
index a191e15..f9a721b 100644
--- a/Lib/test/test_thread.py
+++ b/Lib/test/test_thread.py
@@ -68,7 +68,7 @@ class ThreadRunningTests(BasicThreadTest):
thread.stack_size(0)
self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
- if os.name not in ("nt", "os2", "posix"):
+ if os.name not in ("nt", "posix"):
return
tss_supported = True
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index bb8d9b6..13c017d 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -451,8 +451,7 @@ class ThreadJoinOnShutdown(BaseTestCase):
# #12316 and #11870), and fork() from a worker thread is known to trigger
# problems with some operating systems (issue #3863): skip problematic tests
# on platforms known to behave badly.
- platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
- 'os2emx')
+ platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5')
def _run_and_join(self, script):
script = """if 1:
@@ -754,7 +753,8 @@ class ThreadingExceptionTests(BaseTestCase):
lock = threading.Lock()
self.assertRaises(RuntimeError, lock.release)
- @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem')
+ @unittest.skipUnless(sys.platform == 'darwin' and test.support.python_is_optimized(),
+ 'test macosx problem')
def test_recursion_limit(self):
# Issue 9670
# test that excessive recursion within a non-main thread causes
diff --git a/Lib/test/test_threadsignals.py b/Lib/test/test_threadsignals.py
index f975a75..b1004e6 100644
--- a/Lib/test/test_threadsignals.py
+++ b/Lib/test/test_threadsignals.py
@@ -8,7 +8,7 @@ from test.support import run_unittest, import_module
thread = import_module('_thread')
import time
-if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos':
+if (sys.platform[:3] == 'win'):
raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
process_pid = os.getpid()
diff --git a/Lib/test/test_unicode.py b/Lib/test/test_unicode.py
index 9aaedd3..6ac62fb 100644
--- a/Lib/test/test_unicode.py
+++ b/Lib/test/test_unicode.py
@@ -1983,9 +1983,10 @@ class UnicodeTest(string_tests.CommonTest,
# Test PyUnicode_FromFormat()
def test_from_format(self):
support.import_module('ctypes')
- from ctypes import (pythonapi, py_object,
+ from ctypes import (
+ pythonapi, py_object, sizeof,
c_int, c_long, c_longlong, c_ssize_t,
- c_uint, c_ulong, c_ulonglong, c_size_t)
+ c_uint, c_ulong, c_ulonglong, c_size_t, c_void_p)
name = "PyUnicode_FromFormat"
_PyUnicode_FromFormat = getattr(pythonapi, name)
_PyUnicode_FromFormat.restype = py_object
@@ -2036,6 +2037,31 @@ class UnicodeTest(string_tests.CommonTest,
self.assertEqual(PyUnicode_FromFormat(b'%llu', c_ulonglong(123)), '123')
self.assertEqual(PyUnicode_FromFormat(b'%zu', c_size_t(123)), '123')
+ # test long output
+ min_longlong = -(2 ** (8 * sizeof(c_longlong) - 1))
+ max_longlong = -min_longlong - 1
+ self.assertEqual(PyUnicode_FromFormat(b'%lld', c_longlong(min_longlong)), str(min_longlong))
+ self.assertEqual(PyUnicode_FromFormat(b'%lld', c_longlong(max_longlong)), str(max_longlong))
+ max_ulonglong = 2 ** (8 * sizeof(c_ulonglong)) - 1
+ self.assertEqual(PyUnicode_FromFormat(b'%llu', c_ulonglong(max_ulonglong)), str(max_ulonglong))
+ PyUnicode_FromFormat(b'%p', c_void_p(-1))
+
+ # test padding (width and/or precision)
+ self.assertEqual(PyUnicode_FromFormat(b'%010i', c_int(123)), '123'.rjust(10, '0'))
+ self.assertEqual(PyUnicode_FromFormat(b'%100i', c_int(123)), '123'.rjust(100))
+ self.assertEqual(PyUnicode_FromFormat(b'%.100i', c_int(123)), '123'.rjust(100, '0'))
+ self.assertEqual(PyUnicode_FromFormat(b'%100.80i', c_int(123)), '123'.rjust(80, '0').rjust(100))
+
+ self.assertEqual(PyUnicode_FromFormat(b'%010u', c_uint(123)), '123'.rjust(10, '0'))
+ self.assertEqual(PyUnicode_FromFormat(b'%100u', c_uint(123)), '123'.rjust(100))
+ self.assertEqual(PyUnicode_FromFormat(b'%.100u', c_uint(123)), '123'.rjust(100, '0'))
+ self.assertEqual(PyUnicode_FromFormat(b'%100.80u', c_uint(123)), '123'.rjust(80, '0').rjust(100))
+
+ self.assertEqual(PyUnicode_FromFormat(b'%010x', c_int(0x123)), '123'.rjust(10, '0'))
+ self.assertEqual(PyUnicode_FromFormat(b'%100x', c_int(0x123)), '123'.rjust(100))
+ self.assertEqual(PyUnicode_FromFormat(b'%.100x', c_int(0x123)), '123'.rjust(100, '0'))
+ self.assertEqual(PyUnicode_FromFormat(b'%100.80x', c_int(0x123)), '123'.rjust(80, '0').rjust(100))
+
# test %A
text = PyUnicode_FromFormat(b'%%A:%A', 'abc\xe9\uabcd\U0010ffff')
self.assertEqual(text, r"%A:'abc\xe9\uabcd\U0010ffff'")
diff --git a/Lib/test/test_unicodedata.py b/Lib/test/test_unicodedata.py
index 99aa003..677508b 100644
--- a/Lib/test/test_unicodedata.py
+++ b/Lib/test/test_unicodedata.py
@@ -80,7 +80,7 @@ class UnicodeDatabaseTest(unittest.TestCase):
class UnicodeFunctionsTest(UnicodeDatabaseTest):
# update this, if the database changes
- expectedchecksum = '17fe2f12b788e4fff5479b469c4404bb6ecf841f'
+ expectedchecksum = 'ebd64e81553c9cb37f424f5616254499fcd8849e'
def test_function_checksum(self):
data = []
h = hashlib.sha1()
diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py
index 52e7749..1e30fa6 100644
--- a/Lib/test/test_urllib.py
+++ b/Lib/test/test_urllib.py
@@ -270,9 +270,10 @@ Content-Type: text/html; charset=iso-8859-1
def test_missing_localfile(self):
# Test for #10836
- # 3.3 - URLError is not captured, explicit IOError is raised.
- with self.assertRaises(IOError):
+ with self.assertRaises(urllib.error.URLError) as e:
urlopen('file://localhost/a/file/which/doesnot/exists.py')
+ self.assertTrue(e.exception.filename)
+ self.assertTrue(e.exception.reason)
def test_file_notexists(self):
fd, tmp_file = tempfile.mkstemp()
@@ -285,20 +286,21 @@ Content-Type: text/html; charset=iso-8859-1
os.close(fd)
os.unlink(tmp_file)
self.assertFalse(os.path.exists(tmp_file))
- # 3.3 - IOError instead of URLError
- with self.assertRaises(IOError):
+ with self.assertRaises(urllib.error.URLError):
urlopen(tmp_fileurl)
def test_ftp_nohost(self):
test_ftp_url = 'ftp:///path'
- # 3.3 - IOError instead of URLError
- with self.assertRaises(IOError):
+ with self.assertRaises(urllib.error.URLError) as e:
urlopen(test_ftp_url)
+ self.assertFalse(e.exception.filename)
+ self.assertTrue(e.exception.reason)
def test_ftp_nonexisting(self):
- # 3.3 - IOError instead of URLError
- with self.assertRaises(IOError):
+ with self.assertRaises(urllib.error.URLError) as e:
urlopen('ftp://localhost/a/file/which/doesnot/exists.py')
+ self.assertFalse(e.exception.filename)
+ self.assertTrue(e.exception.reason)
def test_userpass_inurl(self):
@@ -335,6 +337,79 @@ Content-Type: text/html; charset=iso-8859-1
with support.check_warnings(('',DeprecationWarning)):
urllib.request.URLopener()
+class urlopen_DataTests(unittest.TestCase):
+ """Test urlopen() opening a data URL."""
+
+ def setUp(self):
+ # text containing URL special- and unicode-characters
+ self.text = "test data URLs :;,%=& \u00f6 \u00c4 "
+ # 2x1 pixel RGB PNG image with one black and one white pixel
+ self.image = (
+ b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x02\x00\x00\x00'
+ b'\x01\x08\x02\x00\x00\x00{@\xe8\xdd\x00\x00\x00\x01sRGB\x00\xae'
+ b'\xce\x1c\xe9\x00\x00\x00\x0fIDAT\x08\xd7c```\xf8\xff\xff?\x00'
+ b'\x06\x01\x02\xfe\no/\x1e\x00\x00\x00\x00IEND\xaeB`\x82')
+
+ self.text_url = (
+ "data:text/plain;charset=UTF-8,test%20data%20URLs%20%3A%3B%2C%25%3"
+ "D%26%20%C3%B6%20%C3%84%20")
+ self.text_url_base64 = (
+ "data:text/plain;charset=ISO-8859-1;base64,dGVzdCBkYXRhIFVSTHMgOjs"
+ "sJT0mIPYgxCA%3D")
+ # base64 encoded data URL that contains ignorable spaces,
+ # such as "\n", " ", "%0A", and "%20".
+ self.image_url = (
+ "\n"
+ "QOjdAAAAAXNSR0IArs4c6QAAAA9JREFUCNdj%0AYGBg%2BP//PwAGAQL%2BCm8 "
+ "vHgAAAABJRU5ErkJggg%3D%3D%0A%20")
+
+ self.text_url_resp = urllib.request.urlopen(self.text_url)
+ self.text_url_base64_resp = urllib.request.urlopen(
+ self.text_url_base64)
+ self.image_url_resp = urllib.request.urlopen(self.image_url)
+
+ def test_interface(self):
+ # Make sure object returned by urlopen() has the specified methods
+ for attr in ("read", "readline", "readlines",
+ "close", "info", "geturl", "getcode", "__iter__"):
+ self.assertTrue(hasattr(self.text_url_resp, attr),
+ "object returned by urlopen() lacks %s attribute" %
+ attr)
+
+ def test_info(self):
+ self.assertIsInstance(self.text_url_resp.info(), email.message.Message)
+ self.assertEqual(self.text_url_base64_resp.info().get_params(),
+ [('text/plain', ''), ('charset', 'ISO-8859-1')])
+ self.assertEqual(self.image_url_resp.info()['content-length'],
+ str(len(self.image)))
+ self.assertEqual(urllib.request.urlopen("data:,").info().get_params(),
+ [('text/plain', ''), ('charset', 'US-ASCII')])
+
+ def test_geturl(self):
+ self.assertEqual(self.text_url_resp.geturl(), self.text_url)
+ self.assertEqual(self.text_url_base64_resp.geturl(),
+ self.text_url_base64)
+ self.assertEqual(self.image_url_resp.geturl(), self.image_url)
+
+ def test_read_text(self):
+ self.assertEqual(self.text_url_resp.read().decode(
+ dict(self.text_url_resp.info().get_params())['charset']), self.text)
+
+ def test_read_text_base64(self):
+ self.assertEqual(self.text_url_base64_resp.read().decode(
+ dict(self.text_url_base64_resp.info().get_params())['charset']),
+ self.text)
+
+ def test_read_image(self):
+ self.assertEqual(self.image_url_resp.read(), self.image)
+
+ def test_missing_comma(self):
+ self.assertRaises(ValueError,urllib.request.urlopen,'data:text/plain')
+
+ def test_invalid_base64_data(self):
+ # missing padding character
+ self.assertRaises(ValueError,urllib.request.urlopen,'data:;base64,Cg=')
+
class urlretrieve_FileTests(unittest.TestCase):
"""Test urllib.urlretrieve() on local files"""
@@ -1311,6 +1386,7 @@ def test_main():
support.run_unittest(
urlopen_FileTests,
urlopen_HttpTests,
+ urlopen_DataTests,
urlretrieve_FileTests,
urlretrieve_HttpTests,
ProxyTests,
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py
index 7ae1553..33042ed 100644
--- a/Lib/test/test_urllib2.py
+++ b/Lib/test/test_urllib2.py
@@ -124,6 +124,19 @@ def test_request_headers_methods():
>>> r.get_header("Not-there", "default")
'default'
+ Method r.remove_header should remove items both from r.headers and
+ r.unredirected_hdrs dictionaries
+
+ >>> r.remove_header("Spam-eggs")
+ >>> r.has_header("Spam-eggs")
+ False
+ >>> r.add_unredirected_header("Unredirected-spam", "Eggs")
+ >>> r.has_header("Unredirected-spam")
+ True
+ >>> r.remove_header("Unredirected-spam")
+ >>> r.has_header("Unredirected-spam")
+ False
+
"""
@@ -302,6 +315,7 @@ class MockHTTPClass:
self.req_headers = []
self.data = None
self.raise_on_endheaders = False
+ self.sock = None
self._tunnel_headers = {}
def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
@@ -1431,6 +1445,20 @@ class MiscTests(unittest.TestCase):
self.opener_has_handler(o, MyHTTPHandler)
self.opener_has_handler(o, MyOtherHTTPHandler)
+ def test_issue16464(self):
+ opener = urllib.request.build_opener()
+ request = urllib.request.Request("http://www.python.org/~jeremy/")
+ self.assertEqual(None, request.data)
+
+ opener.open(request, "1".encode("us-ascii"))
+ self.assertEqual(b"1", request.data)
+ self.assertEqual("1", request.get_header("Content-length"))
+
+ opener.open(request, "1234567890".encode("us-ascii"))
+ self.assertEqual(b"1234567890", request.data)
+ self.assertEqual("10", request.get_header("Content-length"))
+
+
def opener_has_handler(self, opener, handler_class):
self.assertTrue(any(h.__class__ == handler_class
for h in opener.handlers))
@@ -1454,6 +1482,16 @@ class RequestTests(unittest.TestCase):
self.assertTrue(self.get.data)
self.assertEqual("POST", self.get.get_method())
+ # issue 16464
+ # if we change data we need to remove content-length header
+ # (cause it's most probably calculated for previous value)
+ def test_setting_data_should_remove_content_length(self):
+ self.assertFalse("Content-length" in self.get.unredirected_hdrs)
+ self.get.add_unredirected_header("Content-length", 42)
+ self.assertEqual(42, self.get.unredirected_hdrs["Content-length"])
+ self.get.data = "spam"
+ self.assertFalse("Content-length" in self.get.unredirected_hdrs)
+
def test_get_full_url(self):
self.assertEqual("http://www.python.org/~jeremy/",
self.get.get_full_url())
@@ -1501,11 +1539,15 @@ def test_HTTPError_interface():
interface even though HTTPError is a subclass of URLError.
>>> msg = 'something bad happened'
- >>> url = code = hdrs = fp = None
+ >>> url = code = fp = None
+ >>> hdrs = 'Content-Length: 42'
>>> err = urllib.error.HTTPError(url, code, msg, hdrs, fp)
>>> assert hasattr(err, 'reason')
>>> err.reason
'something bad happened'
+ >>> assert hasattr(err, 'headers')
+ >>> err.headers
+ 'Content-Length: 42'
"""
def test_main(verbose=None):
diff --git a/Lib/test/test_urllib2net.py b/Lib/test/test_urllib2net.py
index 7f3c93a..b3c1a68 100644
--- a/Lib/test/test_urllib2net.py
+++ b/Lib/test/test_urllib2net.py
@@ -216,7 +216,7 @@ class OtherNetworkTests(unittest.TestCase):
debug(url)
try:
f = urlopen(url, req, TIMEOUT)
- except EnvironmentError as err:
+ except OSError as err:
debug(err)
if expected_err:
msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
diff --git a/Lib/test/test_venv.py b/Lib/test/test_venv.py
index 9696844..2a528c9 100644
--- a/Lib/test/test_venv.py
+++ b/Lib/test/test_venv.py
@@ -113,13 +113,68 @@ class BasicTest(BaseTest):
out, err = p.communicate()
self.assertEqual(out.strip(), expected.encode())
+ if sys.platform == 'win32':
+ ENV_SUBDIRS = (
+ ('Scripts',),
+ ('Include',),
+ ('Lib',),
+ ('Lib', 'site-packages'),
+ )
+ else:
+ ENV_SUBDIRS = (
+ ('bin',),
+ ('include',),
+ ('lib',),
+ ('lib', 'python%d.%d' % sys.version_info[:2]),
+ ('lib', 'python%d.%d' % sys.version_info[:2], 'site-packages'),
+ )
+
+ def create_contents(self, paths, filename):
+ """
+ Create some files in the environment which are unrelated
+ to the virtual environment.
+ """
+ for subdirs in paths:
+ d = os.path.join(self.env_dir, *subdirs)
+ os.mkdir(d)
+ fn = os.path.join(d, filename)
+ with open(fn, 'wb') as f:
+ f.write(b'Still here?')
+
def test_overwrite_existing(self):
"""
- Test control of overwriting an existing environment directory.
+ Test creating environment in an existing directory.
"""
- self.assertRaises(ValueError, venv.create, self.env_dir)
+ self.create_contents(self.ENV_SUBDIRS, 'foo')
+ venv.create(self.env_dir)
+ for subdirs in self.ENV_SUBDIRS:
+ fn = os.path.join(self.env_dir, *(subdirs + ('foo',)))
+ self.assertTrue(os.path.exists(fn))
+ with open(fn, 'rb') as f:
+ self.assertEqual(f.read(), b'Still here?')
+
builder = venv.EnvBuilder(clear=True)
builder.create(self.env_dir)
+ for subdirs in self.ENV_SUBDIRS:
+ fn = os.path.join(self.env_dir, *(subdirs + ('foo',)))
+ self.assertFalse(os.path.exists(fn))
+
+ def clear_directory(self, path):
+ for fn in os.listdir(path):
+ fn = os.path.join(path, fn)
+ if os.path.islink(fn) or os.path.isfile(fn):
+ os.remove(fn)
+ elif os.path.isdir(fn):
+ shutil.rmtree(fn)
+
+ def test_unoverwritable_fails(self):
+ #create a file clashing with directories in the env dir
+ for paths in self.ENV_SUBDIRS[:3]:
+ fn = os.path.join(self.env_dir, *paths)
+ with open(fn, 'wb') as f:
+ f.write(b'')
+ self.assertRaises((ValueError, OSError), venv.create, self.env_dir)
+ self.clear_directory(self.env_dir)
def test_upgrade(self):
"""
diff --git a/Lib/test/test_wait3.py b/Lib/test/test_wait3.py
index bd06c8d..f6a065d 100644
--- a/Lib/test/test_wait3.py
+++ b/Lib/test/test_wait3.py
@@ -7,15 +7,11 @@ import unittest
from test.fork_wait import ForkWait
from test.support import run_unittest, reap_children
-try:
- os.fork
-except AttributeError:
- raise unittest.SkipTest("os.fork not defined -- skipping test_wait3")
+if not hasattr(os, 'fork'):
+ raise unittest.SkipTest("os.fork not defined")
-try:
- os.wait3
-except AttributeError:
- raise unittest.SkipTest("os.wait3 not defined -- skipping test_wait3")
+if not hasattr(os, 'wait3'):
+ raise unittest.SkipTest("os.wait3 not defined")
class Wait3Test(ForkWait):
def wait_impl(self, cpid):
diff --git a/Lib/test/test_weakref.py b/Lib/test/test_weakref.py
index 571e33f..cdd26c7 100644
--- a/Lib/test/test_weakref.py
+++ b/Lib/test/test_weakref.py
@@ -47,6 +47,11 @@ class Object:
return NotImplemented
def __hash__(self):
return hash(self.arg)
+ def some_method(self):
+ return 4
+ def other_method(self):
+ return 5
+
class RefCycle:
def __init__(self):
@@ -901,6 +906,140 @@ class SubclassableWeakrefTestCase(TestBase):
self.assertEqual(self.cbcalled, 0)
+class WeakMethodTestCase(unittest.TestCase):
+
+ def _subclass(self):
+ """Return a Object subclass overriding `some_method`."""
+ class C(Object):
+ def some_method(self):
+ return 6
+ return C
+
+ def test_alive(self):
+ o = Object(1)
+ r = weakref.WeakMethod(o.some_method)
+ self.assertIsInstance(r, weakref.ReferenceType)
+ self.assertIsInstance(r(), type(o.some_method))
+ self.assertIs(r().__self__, o)
+ self.assertIs(r().__func__, o.some_method.__func__)
+ self.assertEqual(r()(), 4)
+
+ def test_object_dead(self):
+ o = Object(1)
+ r = weakref.WeakMethod(o.some_method)
+ del o
+ gc.collect()
+ self.assertIs(r(), None)
+
+ def test_method_dead(self):
+ C = self._subclass()
+ o = C(1)
+ r = weakref.WeakMethod(o.some_method)
+ del C.some_method
+ gc.collect()
+ self.assertIs(r(), None)
+
+ def test_callback_when_object_dead(self):
+ # Test callback behaviour when object dies first.
+ C = self._subclass()
+ calls = []
+ def cb(arg):
+ calls.append(arg)
+ o = C(1)
+ r = weakref.WeakMethod(o.some_method, cb)
+ del o
+ gc.collect()
+ self.assertEqual(calls, [r])
+ # Callback is only called once.
+ C.some_method = Object.some_method
+ gc.collect()
+ self.assertEqual(calls, [r])
+
+ def test_callback_when_method_dead(self):
+ # Test callback behaviour when method dies first.
+ C = self._subclass()
+ calls = []
+ def cb(arg):
+ calls.append(arg)
+ o = C(1)
+ r = weakref.WeakMethod(o.some_method, cb)
+ del C.some_method
+ gc.collect()
+ self.assertEqual(calls, [r])
+ # Callback is only called once.
+ del o
+ gc.collect()
+ self.assertEqual(calls, [r])
+
+ @support.cpython_only
+ def test_no_cycles(self):
+ # A WeakMethod doesn't create any reference cycle to itself.
+ o = Object(1)
+ def cb(_):
+ pass
+ r = weakref.WeakMethod(o.some_method, cb)
+ wr = weakref.ref(r)
+ del r
+ self.assertIs(wr(), None)
+
+ def test_equality(self):
+ def _eq(a, b):
+ self.assertTrue(a == b)
+ self.assertFalse(a != b)
+ def _ne(a, b):
+ self.assertTrue(a != b)
+ self.assertFalse(a == b)
+ x = Object(1)
+ y = Object(1)
+ a = weakref.WeakMethod(x.some_method)
+ b = weakref.WeakMethod(y.some_method)
+ c = weakref.WeakMethod(x.other_method)
+ d = weakref.WeakMethod(y.other_method)
+ # Objects equal, same method
+ _eq(a, b)
+ _eq(c, d)
+ # Objects equal, different method
+ _ne(a, c)
+ _ne(a, d)
+ _ne(b, c)
+ _ne(b, d)
+ # Objects unequal, same or different method
+ z = Object(2)
+ e = weakref.WeakMethod(z.some_method)
+ f = weakref.WeakMethod(z.other_method)
+ _ne(a, e)
+ _ne(a, f)
+ _ne(b, e)
+ _ne(b, f)
+ del x, y, z
+ gc.collect()
+ # Dead WeakMethods compare by identity
+ refs = a, b, c, d, e, f
+ for q in refs:
+ for r in refs:
+ self.assertEqual(q == r, q is r)
+ self.assertEqual(q != r, q is not r)
+
+ def test_hashing(self):
+ # Alive WeakMethods are hashable if the underlying object is
+ # hashable.
+ x = Object(1)
+ y = Object(1)
+ a = weakref.WeakMethod(x.some_method)
+ b = weakref.WeakMethod(y.some_method)
+ c = weakref.WeakMethod(y.other_method)
+ # Since WeakMethod objects are equal, the hashes should be equal.
+ self.assertEqual(hash(a), hash(b))
+ ha = hash(a)
+ # Dead WeakMethods retain their old hash value
+ del x, y
+ gc.collect()
+ self.assertEqual(hash(a), ha)
+ self.assertEqual(hash(b), ha)
+ # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed.
+ self.assertRaises(TypeError, hash, c)
+
+
class MappingTestCase(TestBase):
COUNT = 10
@@ -1476,6 +1615,7 @@ __test__ = {'libreftest' : libreftest}
def test_main():
support.run_unittest(
ReferencesTestCase,
+ WeakMethodTestCase,
MappingTestCase,
WeakValueDictionaryTestCase,
WeakKeyDictionaryTestCase,
diff --git a/Lib/test/test_winreg.py b/Lib/test/test_winreg.py
index 918b312..1100737 100644
--- a/Lib/test/test_winreg.py
+++ b/Lib/test/test_winreg.py
@@ -8,7 +8,7 @@ threading = support.import_module("threading")
from platform import machine
# Do this first so test will be skipped if module doesn't exist
-support.import_module('winreg')
+support.import_module('winreg', required_on=['win'])
# Now import everything
from winreg import *
@@ -97,7 +97,7 @@ class BaseWinregTests(unittest.TestCase):
QueryInfoKey(int_sub_key)
self.fail("It appears the CloseKey() function does "
"not close the actual key!")
- except EnvironmentError:
+ except OSError:
pass
# ... and close that key that way :-)
int_key = int(key)
@@ -106,7 +106,7 @@ class BaseWinregTests(unittest.TestCase):
QueryInfoKey(int_key)
self.fail("It appears the key.Close() function "
"does not close the actual key!")
- except EnvironmentError:
+ except OSError:
pass
def _read_test_data(self, root_key, subkeystr="sub_key", OpenKey=OpenKey):
@@ -123,7 +123,7 @@ class BaseWinregTests(unittest.TestCase):
while 1:
try:
data = EnumValue(sub_key, index)
- except EnvironmentError:
+ except OSError:
break
self.assertEqual(data in test_data, True,
"Didn't read back the correct test data")
@@ -144,7 +144,7 @@ class BaseWinregTests(unittest.TestCase):
try:
EnumKey(key, 1)
self.fail("Was able to get a second key when I only have one!")
- except EnvironmentError:
+ except OSError:
pass
key.Close()
@@ -168,7 +168,7 @@ class BaseWinregTests(unittest.TestCase):
# Shouldnt be able to delete it twice!
DeleteKey(key, subkeystr)
self.fail("Deleting the key twice succeeded")
- except EnvironmentError:
+ except OSError:
pass
key.Close()
DeleteKey(root_key, test_key_name)