diff options
Diffstat (limited to 'Lib')
108 files changed, 1917 insertions, 769 deletions
diff --git a/Lib/_dummy_thread.py b/Lib/_dummy_thread.py index ed50520..13b1f26 100644 --- a/Lib/_dummy_thread.py +++ b/Lib/_dummy_thread.py @@ -24,11 +24,7 @@ TIMEOUT_MAX = 2**31 # imports are done when needed on a function-by-function basis. Since threads # are disabled, the import lock should not be an issue anyway (??). -class error(Exception): - """Dummy implementation of _thread.error.""" - - def __init__(self, *args): - self.args = args +error = RuntimeError def start_new_thread(function, args, kwargs={}): """Dummy implementation of _thread.start_new_thread(). diff --git a/Lib/_pyio.py b/Lib/_pyio.py index 35dea41..a3b89e7 100644 --- a/Lib/_pyio.py +++ b/Lib/_pyio.py @@ -1511,6 +1511,7 @@ class TextIOWrapper(TextIOBase): self._decoded_chars_used = 0 # offset into _decoded_chars for read() self._snapshot = None # info for reconstructing decoder state self._seekable = self._telling = self.buffer.seekable() + self._b2cratio = 0.0 if self._seekable and self.writable(): position = self.buffer.tell() @@ -1678,7 +1679,12 @@ class TextIOWrapper(TextIOBase): # Read a chunk, decode it, and put the result in self._decoded_chars. input_chunk = self.buffer.read1(self._CHUNK_SIZE) eof = not input_chunk - self._set_decoded_chars(self._decoder.decode(input_chunk, eof)) + decoded_chars = self._decoder.decode(input_chunk, eof) + self._set_decoded_chars(decoded_chars) + if decoded_chars: + self._b2cratio = len(input_chunk) / len(self._decoded_chars) + else: + self._b2cratio = 0.0 if self._telling: # At the snapshot point, len(dec_buffer) bytes before the read, @@ -1732,20 +1738,56 @@ class TextIOWrapper(TextIOBase): # forward until it gives us enough decoded characters. saved_state = decoder.getstate() try: + # Fast search for an acceptable start point, close to our + # current pos. + # Rationale: calling decoder.decode() has a large overhead + # regardless of chunk size; we want the number of such calls to + # be O(1) in most situations (common decoders, non-crazy input). + # Actually, it will be exactly 1 for fixed-size codecs (all + # 8-bit codecs, also UTF-16 and UTF-32). + skip_bytes = int(self._b2cratio * chars_to_skip) + skip_back = 1 + assert skip_bytes <= len(next_input) + while skip_bytes > 0: + decoder.setstate((b'', dec_flags)) + # Decode up to temptative start point + n = len(decoder.decode(next_input[:skip_bytes])) + if n <= chars_to_skip: + b, d = decoder.getstate() + if not b: + # Before pos and no bytes buffered in decoder => OK + dec_flags = d + chars_to_skip -= n + break + # Skip back by buffered amount and reset heuristic + skip_bytes -= len(b) + skip_back = 1 + else: + # We're too far ahead, skip back a bit + skip_bytes -= skip_back + skip_back = skip_back * 2 + else: + skip_bytes = 0 + decoder.setstate((b'', dec_flags)) + # Note our initial start point. - decoder.setstate((b'', dec_flags)) - start_pos = position - start_flags, bytes_fed, chars_decoded = dec_flags, 0, 0 - need_eof = 0 + start_pos = position + skip_bytes + start_flags = dec_flags + if chars_to_skip == 0: + # We haven't moved from the start point. + return self._pack_cookie(start_pos, start_flags) # Feed the decoder one byte at a time. As we go, note the # nearest "safe start point" before the current location # (a point where the decoder has nothing buffered, so seek() # can safely start from there and advance to this location). - next_byte = bytearray(1) - for next_byte[0] in next_input: + bytes_fed = 0 + need_eof = 0 + # Chars decoded since `start_pos` + chars_decoded = 0 + for i in range(skip_bytes, len(next_input)): bytes_fed += 1 - chars_decoded += len(decoder.decode(next_byte)) + chars_decoded += len(decoder.decode(next_input[i:i+1])) dec_buffer, dec_flags = decoder.getstate() if not dec_buffer and chars_decoded <= chars_to_skip: # Decoder buffer is empty, so this is a safe start point. @@ -133,11 +133,14 @@ class ABCMeta(type): return cls def register(cls, subclass): - """Register a virtual subclass of an ABC.""" + """Register a virtual subclass of an ABC. + + Returns the subclass, to allow usage as a class decorator. + """ if not isinstance(subclass, type): raise TypeError("Can only register classes") if issubclass(subclass, cls): - return # Already a subclass + return subclass # Already a subclass # Subtle: test for cycles *after* testing for "already a subclass"; # this means we allow X.register(X) and interpret it as a no-op. if issubclass(cls, subclass): @@ -145,6 +148,7 @@ class ABCMeta(type): raise RuntimeError("Refusing to create an inheritance cycle") cls._abc_registry.add(subclass) ABCMeta._abc_invalidation_counter += 1 # Invalidate negative cache + return subclass def _dump_registry(cls, file=None): """Debug helper to print the ABC registry.""" diff --git a/Lib/asynchat.py b/Lib/asynchat.py index 6558512..2199d1b 100644 --- a/Lib/asynchat.py +++ b/Lib/asynchat.py @@ -75,7 +75,7 @@ class async_chat (asyncore.dispatcher): # sign of an application bug that we don't want to pass silently use_encoding = 0 - encoding = 'latin1' + encoding = 'latin-1' def __init__ (self, sock=None, map=None): # for string terminator matching diff --git a/Lib/asyncore.py b/Lib/asyncore.py index 5d7bdda..d647606 100644 --- a/Lib/asyncore.py +++ b/Lib/asyncore.py @@ -289,7 +289,7 @@ class dispatcher: del map[fd] self._fileno = None - def create_socket(self, family, type): + def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM): self.family_and_type = family, type sock = socket.socket(family, type) sock.setblocking(0) diff --git a/Lib/collections.py b/Lib/collections/__init__.py index 98c4325..922bba2 100644 --- a/Lib/collections.py +++ b/Lib/collections/__init__.py @@ -1,10 +1,11 @@ __all__ = ['deque', 'defaultdict', 'namedtuple', 'UserDict', 'UserList', 'UserString', 'Counter', 'OrderedDict'] -# For bootstrapping reasons, the collection ABCs are defined in _abcoll.py. -# They should however be considered an integral part of collections.py. -from _abcoll import * -import _abcoll -__all__ += _abcoll.__all__ + +# For backwards compatability, continue to make the collections ABCs +# available through the collections module. +from collections.abc import * +import collections.abc +__all__ += collections.abc.__all__ from _collections import deque, defaultdict from operator import itemgetter as _itemgetter @@ -632,10 +633,10 @@ class Counter(dict): ######################################################################## -### ChainMap (helper for configparser) +### ChainMap (helper for configparser and string.Template) ######################################################################## -class _ChainMap(MutableMapping): +class ChainMap(MutableMapping): ''' A ChainMap groups multiple dicts (or other mappings) together to create a single, updateable view. @@ -678,6 +679,9 @@ class _ChainMap(MutableMapping): def __contains__(self, key): return any(key in m for m in self.maps) + def __bool__(self): + return any(self.maps) + @_recursive_repr() def __repr__(self): return '{0.__class__.__name__}({1})'.format( @@ -843,6 +847,8 @@ class UserList(MutableSequence): def insert(self, i, item): self.data.insert(i, item) def pop(self, i=-1): return self.data.pop(i) def remove(self, item): self.data.remove(item) + def clear(self): self.data.clear() + def copy(self): return self.data.copy() def count(self, item): return self.data.count(item) def index(self, item, *args): return self.data.index(item, *args) def reverse(self): self.data.reverse() diff --git a/Lib/_abcoll.py b/Lib/collections/abc.py index 2417d18..c80c7dd 100644 --- a/Lib/_abcoll.py +++ b/Lib/collections/abc.py @@ -3,9 +3,7 @@ """Abstract Base Classes (ABCs) for collections, according to PEP 3119. -DON'T USE THIS MODULE DIRECTLY! The classes here should be imported -via collections; they are defined here only to alleviate certain -bootstrapping issues. Unit tests are in test_collections. +Unit tests are in test_collections. """ from abc import ABCMeta, abstractmethod @@ -598,6 +596,13 @@ class MutableSequence(Sequence): def append(self, value): self.insert(len(self), value) + def clear(self): + try: + while True: + self.pop() + except IndexError: + pass + def reverse(self): n = len(self) for i in range(n//2): diff --git a/Lib/compileall.py b/Lib/compileall.py index d79a1bb..743be27 100644 --- a/Lib/compileall.py +++ b/Lib/compileall.py @@ -35,11 +35,11 @@ def compile_dir(dir, maxlevels=10, ddir=None, force=False, rx=None, optimize: optimization level or -1 for level of the interpreter """ if not quiet: - print('Listing', dir, '...') + print('Listing {!r}...'.format(dir)) try: names = os.listdir(dir) except os.error: - print("Can't list", dir) + print("Can't list {!r}".format(dir)) names = [] names.sort() success = 1 @@ -109,13 +109,13 @@ def compile_file(fullname, ddir=None, force=False, rx=None, quiet=False, except IOError: pass if not quiet: - print('Compiling', fullname, '...') + print('Compiling {!r}...'.format(fullname)) try: ok = py_compile.compile(fullname, cfile, dfile, True, optimize=optimize) except py_compile.PyCompileError as err: if quiet: - print('*** Error compiling', fullname, '...') + print('*** Error compiling {!r}...'.format(fullname)) else: print('*** ', end='') # escape non-printable characters in msg @@ -126,7 +126,7 @@ def compile_file(fullname, ddir=None, force=False, rx=None, quiet=False, success = 0 except (SyntaxError, UnicodeError, IOError) as e: if quiet: - print('*** Error compiling', fullname, '...') + print('*** Error compiling {!r}...'.format(fullname)) else: print('*** ', end='') print(e.__class__.__name__ + ':', e) diff --git a/Lib/configparser.py b/Lib/configparser.py index 1bfdac8..15fc266 100644 --- a/Lib/configparser.py +++ b/Lib/configparser.py @@ -119,7 +119,8 @@ ConfigParser -- responsible for parsing a list of between keys and values are surrounded by spaces. """ -from collections import MutableMapping, OrderedDict as _default_dict, _ChainMap +from collections.abc import MutableMapping +from collections import OrderedDict as _default_dict, ChainMap as _ChainMap import functools import io import itertools diff --git a/Lib/crypt.py b/Lib/crypt.py new file mode 100644 index 0000000..e65b0cb --- /dev/null +++ b/Lib/crypt.py @@ -0,0 +1,62 @@ +"""Wrapper to the POSIX crypt library call and associated functionality.""" + +import _crypt +import string +from random import choice +from collections import namedtuple + + +_saltchars = string.ascii_letters + string.digits + './' + + +class _Method(namedtuple('_Method', 'name ident salt_chars total_size')): + + """Class representing a salt method per the Modular Crypt Format or the + legacy 2-character crypt method.""" + + def __repr__(self): + return '<crypt.METHOD_{}>'.format(self.name) + + + +def mksalt(method=None): + """Generate a salt for the specified method. + + If not specified, the strongest available method will be used. + + """ + if method is None: + method = methods[0] + s = '${}$'.format(method.ident) if method.ident else '' + s += ''.join(choice(_saltchars) for _ in range(method.salt_chars)) + return s + + +def crypt(word, salt=None): + """Return a string representing the one-way hash of a password, with a salt + prepended. + + If ``salt`` is not specified or is ``None``, the strongest + available method will be selected and a salt generated. Otherwise, + ``salt`` may be one of the ``crypt.METHOD_*`` values, or a string as + returned by ``crypt.mksalt()``. + + """ + if salt is None or isinstance(salt, _Method): + salt = mksalt(salt) + return _crypt.crypt(word, salt) + + +# available salting/crypto methods +METHOD_CRYPT = _Method('CRYPT', None, 2, 13) +METHOD_MD5 = _Method('MD5', '1', 8, 34) +METHOD_SHA256 = _Method('SHA256', '5', 16, 63) +METHOD_SHA512 = _Method('SHA512', '6', 16, 106) + +methods = [] +for _method in (METHOD_SHA512, METHOD_SHA256, METHOD_MD5): + _result = crypt('', _method) + if _result and len(_result) == _method.total_size: + methods.append(_method) +methods.append(METHOD_CRYPT) +del _result, _method diff --git a/Lib/ctypes/test/test_memfunctions.py b/Lib/ctypes/test/test_memfunctions.py index aa2113b..aec4aaa 100644 --- a/Lib/ctypes/test/test_memfunctions.py +++ b/Lib/ctypes/test/test_memfunctions.py @@ -1,4 +1,5 @@ import sys +from test import support import unittest from ctypes import * @@ -49,6 +50,7 @@ class MemFunctionsTest(unittest.TestCase): self.assertEqual(cast(a, POINTER(c_byte))[:7:7], [97]) + @support.refcount_test def test_string_at(self): s = string_at(b"foo bar") # XXX The following may be wrong, depending on how Python diff --git a/Lib/ctypes/test/test_python_api.py b/Lib/ctypes/test/test_python_api.py index 1f4c603..9de3980 100644 --- a/Lib/ctypes/test/test_python_api.py +++ b/Lib/ctypes/test/test_python_api.py @@ -1,5 +1,6 @@ from ctypes import * import unittest, sys +from test import support from ctypes.test import is_resource_enabled ################################################################ @@ -25,6 +26,7 @@ class PythonAPITestCase(unittest.TestCase): self.assertEqual(PyBytes_FromStringAndSize(b"abcdefghi", 3), b"abc") + @support.refcount_test def test_PyString_FromString(self): pythonapi.PyBytes_FromString.restype = py_object pythonapi.PyBytes_FromString.argtypes = (c_char_p,) @@ -56,6 +58,7 @@ class PythonAPITestCase(unittest.TestCase): del res self.assertEqual(grc(42), ref42) + @support.refcount_test def test_PyObj_FromPtr(self): s = "abc def ghi jkl" ref = grc(s) diff --git a/Lib/ctypes/test/test_refcounts.py b/Lib/ctypes/test/test_refcounts.py index 35a81aa..5613e7a 100644 --- a/Lib/ctypes/test/test_refcounts.py +++ b/Lib/ctypes/test/test_refcounts.py @@ -1,4 +1,5 @@ import unittest +from test import support import ctypes import gc @@ -10,6 +11,7 @@ dll = ctypes.CDLL(_ctypes_test.__file__) class RefcountTestCase(unittest.TestCase): + @support.refcount_test def test_1(self): from sys import getrefcount as grc @@ -34,6 +36,7 @@ class RefcountTestCase(unittest.TestCase): self.assertEqual(grc(callback), 2) + @support.refcount_test def test_refcount(self): from sys import getrefcount as grc def func(*args): diff --git a/Lib/ctypes/test/test_stringptr.py b/Lib/ctypes/test/test_stringptr.py index 3d25fa5..95cd161 100644 --- a/Lib/ctypes/test/test_stringptr.py +++ b/Lib/ctypes/test/test_stringptr.py @@ -1,4 +1,5 @@ import unittest +from test import support from ctypes import * import _ctypes_test @@ -7,6 +8,7 @@ lib = CDLL(_ctypes_test.__file__) class StringPtrTestCase(unittest.TestCase): + @support.refcount_test def test__POINTER_c_char(self): class X(Structure): _fields_ = [("str", POINTER(c_char))] diff --git a/Lib/distutils/__init__.py b/Lib/distutils/__init__.py index 49b6d51..bd7d9dd 100644 --- a/Lib/distutils/__init__.py +++ b/Lib/distutils/__init__.py @@ -15,5 +15,5 @@ __revision__ = "$Id$" # Updated automatically by the Python release process. # #--start constants-- -__version__ = "3.2" +__version__ = "3.3a0" #--end constants-- diff --git a/Lib/distutils/command/bdist_wininst.py b/Lib/distutils/command/bdist_wininst.py index b2e2fc6..b886055 100644 --- a/Lib/distutils/command/bdist_wininst.py +++ b/Lib/distutils/command/bdist_wininst.py @@ -263,11 +263,11 @@ class bdist_wininst(Command): cfgdata = cfgdata + b"\0" if self.pre_install_script: # We need to normalize newlines, so we open in text mode and - # convert back to bytes. "latin1" simply avoids any possible + # convert back to bytes. "latin-1" simply avoids any possible # failures. with open(self.pre_install_script, "r", - encoding="latin1") as script: - script_data = script.read().encode("latin1") + encoding="latin-1") as script: + script_data = script.read().encode("latin-1") cfgdata = cfgdata + script_data + b"\n\0" else: # empty pre-install script diff --git a/Lib/doctest.py b/Lib/doctest.py index b79db17..e8f3bcb 100644 --- a/Lib/doctest.py +++ b/Lib/doctest.py @@ -1373,6 +1373,7 @@ class DocTestRunner: # Note that the interactive output will go to *our* # save_stdout, even if that's not the real sys.stdout; this # allows us to write test cases for the set_trace behavior. + save_trace = sys.gettrace() save_set_trace = pdb.set_trace self.debugger = _OutputRedirectingPdb(save_stdout) self.debugger.reset() @@ -1392,6 +1393,7 @@ class DocTestRunner: finally: sys.stdout = save_stdout pdb.set_trace = save_set_trace + sys.settrace(save_trace) linecache.getlines = self.save_linecache_getlines sys.displayhook = save_displayhook if clear_globs: diff --git a/Lib/ftplib.py b/Lib/ftplib.py index 22b5fd2..4836ebe 100644 --- a/Lib/ftplib.py +++ b/Lib/ftplib.py @@ -100,14 +100,15 @@ class FTP: file = None welcome = None passiveserver = 1 - encoding = "latin1" + encoding = "latin-1" # Initialization method (called by class instantiation). # Initialize host to localhost, port to standard ftp port # Optional arguments are host (for connect()), # and user, passwd, acct (for login()) def __init__(self, host='', user='', passwd='', acct='', - timeout=_GLOBAL_DEFAULT_TIMEOUT): + timeout=_GLOBAL_DEFAULT_TIMEOUT, source_address=None): + self.source_address = source_address self.timeout = timeout if host: self.connect(host) @@ -128,10 +129,12 @@ class FTP: if self.sock is not None: self.close() - def connect(self, host='', port=0, timeout=-999): + def connect(self, host='', port=0, timeout=-999, source_address=None): '''Connect to host. Arguments are: - host: hostname to connect to (string, default previous host) - port: port to connect to (integer, default previous port) + - source_address: a 2-tuple (host, port) for the socket to bind + to as its source address before connecting. ''' if host != '': self.host = host @@ -139,7 +142,10 @@ class FTP: self.port = port if timeout != -999: self.timeout = timeout - self.sock = socket.create_connection((self.host, self.port), self.timeout) + if source_address is not None: + self.source_address = source_address + self.sock = socket.create_connection((self.host, self.port), self.timeout, + source_address=self.source_address) self.af = self.sock.family self.file = self.sock.makefile('r', encoding=self.encoding) self.welcome = self.getresp() @@ -334,7 +340,8 @@ class FTP: size = None if self.passiveserver: host, port = self.makepasv() - conn = socket.create_connection((host, port), self.timeout) + conn = socket.create_connection((host, port), self.timeout, + source_address=self.source_address) if rest is not None: self.sendcmd("REST %s" % rest) resp = self.sendcmd(cmd) @@ -589,11 +596,11 @@ class FTP: def close(self): '''Close the connection without assuming anything about it.''' - if self.file: + if self.file is not None: self.file.close() + if self.sock is not None: self.sock.close() - self.file = self.sock = None - + self.file = self.sock = None try: import ssl @@ -637,7 +644,7 @@ else: def __init__(self, host='', user='', passwd='', acct='', keyfile=None, certfile=None, context=None, - timeout=_GLOBAL_DEFAULT_TIMEOUT): + timeout=_GLOBAL_DEFAULT_TIMEOUT, source_address=None): if context is not None and keyfile is not None: raise ValueError("context and keyfile arguments are mutually " "exclusive") @@ -648,7 +655,7 @@ else: self.certfile = certfile self.context = context self._prot_p = False - FTP.__init__(self, host, user, passwd, acct, timeout) + FTP.__init__(self, host, user, passwd, acct, timeout, source_address) def login(self, user='', passwd='', acct='', secure=True): if secure and not isinstance(self.sock, ssl.SSLSocket): diff --git a/Lib/http/client.py b/Lib/http/client.py index 36b7349..8ad7cb6 100644 --- a/Lib/http/client.py +++ b/Lib/http/client.py @@ -697,7 +697,7 @@ class HTTPConnection: self.send(connect_bytes) for header, value in self._tunnel_headers.items(): header_str = "%s: %s\r\n" % (header, value) - header_bytes = header_str.encode("latin1") + header_bytes = header_str.encode("latin-1") self.send(header_bytes) self.send(b'\r\n') @@ -937,7 +937,7 @@ class HTTPConnection: values = list(values) for i, one_value in enumerate(values): if hasattr(one_value, 'encode'): - values[i] = one_value.encode('latin1') + values[i] = one_value.encode('latin-1') elif isinstance(one_value, int): values[i] = str(one_value).encode('ascii') value = b'\r\n\t'.join(values) diff --git a/Lib/http/server.py b/Lib/http/server.py index 543abe0..a35fd9d 100644 --- a/Lib/http/server.py +++ b/Lib/http/server.py @@ -448,7 +448,7 @@ class BaseHTTPRequestHandler(socketserver.StreamRequestHandler): message = '' if self.request_version != 'HTTP/0.9': self.wfile.write(("%s %d %s\r\n" % - (self.protocol_version, code, message)).encode('latin1', 'strict')) + (self.protocol_version, code, message)).encode('latin-1', 'strict')) def send_header(self, keyword, value): """Send a MIME header.""" @@ -456,7 +456,7 @@ class BaseHTTPRequestHandler(socketserver.StreamRequestHandler): if not hasattr(self, '_headers_buffer'): self._headers_buffer = [] self._headers_buffer.append( - ("%s: %s\r\n" % (keyword, value)).encode('latin1', 'strict')) + ("%s: %s\r\n" % (keyword, value)).encode('latin-1', 'strict')) if keyword.lower() == 'connection': if value.lower() == 'close': diff --git a/Lib/idlelib/idlever.py b/Lib/idlelib/idlever.py index 5b0907e..5e9afb1 100644 --- a/Lib/idlelib/idlever.py +++ b/Lib/idlelib/idlever.py @@ -1 +1 @@ -IDLE_VERSION = "3.2" +IDLE_VERSION = "3.3a0" diff --git a/Lib/lib2to3/__main__.py b/Lib/lib2to3/__main__.py new file mode 100644 index 0000000..80688ba --- /dev/null +++ b/Lib/lib2to3/__main__.py @@ -0,0 +1,4 @@ +import sys +from .main import main + +sys.exit(main("lib2to3.fixes")) diff --git a/Lib/lib2to3/patcomp.py b/Lib/lib2to3/patcomp.py index bb538d5..0a259e9 100644 --- a/Lib/lib2to3/patcomp.py +++ b/Lib/lib2to3/patcomp.py @@ -11,6 +11,7 @@ The compiler compiles a pattern to a pytree.*Pattern instance. __author__ = "Guido van Rossum <guido@python.org>" # Python imports +import io import os # Fairly local imports @@ -32,7 +33,7 @@ class PatternSyntaxError(Exception): def tokenize_wrapper(input): """Tokenizes a string suppressing significant whitespace.""" skip = set((token.NEWLINE, token.INDENT, token.DEDENT)) - tokens = tokenize.generate_tokens(driver.generate_lines(input).__next__) + tokens = tokenize.generate_tokens(io.StringIO(input).readline) for quintuple in tokens: type, value, start, end, line_text = quintuple if type not in skip: diff --git a/Lib/lib2to3/pgen2/driver.py b/Lib/lib2to3/pgen2/driver.py index ee77a13..e7828ff 100644 --- a/Lib/lib2to3/pgen2/driver.py +++ b/Lib/lib2to3/pgen2/driver.py @@ -17,6 +17,7 @@ __all__ = ["Driver", "load_grammar"] # Python imports import codecs +import io import os import logging import sys @@ -101,18 +102,10 @@ class Driver(object): def parse_string(self, text, debug=False): """Parse a string and return the syntax tree.""" - tokens = tokenize.generate_tokens(generate_lines(text).__next__) + tokens = tokenize.generate_tokens(io.StringIO(text).readline) return self.parse_tokens(tokens, debug) -def generate_lines(text): - """Generator that behaves like readline without using StringIO.""" - for line in text.splitlines(True): - yield line - while True: - yield "" - - def load_grammar(gt="Grammar.txt", gp=None, save=True, force=False, logger=None): """Load the grammar (maybe from a pickle).""" diff --git a/Lib/lib2to3/tests/test_parser.py b/Lib/lib2to3/tests/test_parser.py index ce39e41..f389795 100644 --- a/Lib/lib2to3/tests/test_parser.py +++ b/Lib/lib2to3/tests/test_parser.py @@ -18,6 +18,16 @@ import os # Local imports from lib2to3.pgen2 import tokenize from ..pgen2.parse import ParseError +from lib2to3.pygram import python_symbols as syms + + +class TestDriver(support.TestCase): + + def test_formfeed(self): + s = """print 1\n\x0Cprint 2\n""" + t = driver.parse_string(s) + self.assertEqual(t.children[0].children[0].type, syms.print_stmt) + self.assertEqual(t.children[1].children[0].type, syms.print_stmt) class GrammarTest(support.TestCase): diff --git a/Lib/logging/handlers.py b/Lib/logging/handlers.py index 96384bd..57b2057 100644 --- a/Lib/logging/handlers.py +++ b/Lib/logging/handlers.py @@ -1307,6 +1307,16 @@ class QueueListener(object): except queue.Empty: break + def enqueue_sentinel(self): + """ + This is used to enqueue the sentinel record. + + The base implementation uses put_nowait. You may want to override this + method if you want to use timeouts or work with custom queue + implementations. + """ + self.queue.put_nowait(self._sentinel) + def stop(self): """ Stop the listener. @@ -1316,6 +1326,6 @@ class QueueListener(object): may be some records still left on the queue, which won't be processed. """ self._stop.set() - self.queue.put_nowait(self._sentinel) + self.enqueue_sentinel() self._thread.join() self._thread = None diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index d6c23fb..d6627e5 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -434,10 +434,10 @@ class ConnectionWrapper(object): return self._loads(s) def _xml_dumps(obj): - return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8') + return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') def _xml_loads(s): - (obj,), method = xmlrpclib.loads(s.decode('utf8')) + (obj,), method = xmlrpclib.loads(s.decode('utf-8')) return obj class XmlListener(Listener): diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index b56a061..3fb9ff6 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -91,12 +91,16 @@ class Process(object): ''' _Popen = None - def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): + def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, + *, daemon=None): assert group is None, 'group argument must be None for now' count = next(_current_process._counter) self._identity = _current_process._identity + (count,) self._authkey = _current_process._authkey - self._daemonic = _current_process._daemonic + if daemon is not None: + self._daemonic = daemon + else: + self._daemonic = _current_process._daemonic self._tempdir = _current_process._tempdir self._parent_pid = os.getpid() self._popen = None diff --git a/Lib/nntplib.py b/Lib/nntplib.py index 70a75b6..ba70192 100644 --- a/Lib/nntplib.py +++ b/Lib/nntplib.py @@ -346,6 +346,20 @@ class _NNTPBase: # Log in and encryption setup order is left to subclasses. self.authenticated = False + def __enter__(self): + return self + + def __exit__(self, *args): + is_connected = lambda: hasattr(self, "file") + if is_connected(): + try: + self.quit() + except (socket.error, EOFError): + pass + finally: + if is_connected(): + self._close() + def getwelcome(self): """Get the welcome message from the server (this is read and squirreled away by __init__()). @@ -434,7 +434,7 @@ def get_exec_path(env=None): # Change environ to automatically call putenv(), unsetenv if they exist. -from _abcoll import MutableMapping # Can't use collections (bootstrap) +from collections.abc import MutableMapping class _Environ(MutableMapping): def __init__(self, data, encodekey, decodekey, encodevalue, decodevalue, putenv, unsetenv): diff --git a/Lib/platform.py b/Lib/platform.py index b9bc303..ab0f92f 100755 --- a/Lib/platform.py +++ b/Lib/platform.py @@ -357,92 +357,11 @@ def dist(distname='',version='',id='', supported_dists=supported_dists, full_distribution_name=0) -class _popen: - - """ Fairly portable (alternative) popen implementation. - - This is mostly needed in case os.popen() is not available, or - doesn't work as advertised, e.g. in Win9X GUI programs like - PythonWin or IDLE. - - Writing to the pipe is currently not supported. - - """ - tmpfile = '' - pipe = None - bufsize = None - mode = 'r' - - def __init__(self,cmd,mode='r',bufsize=None): - - if mode != 'r': - raise ValueError('popen()-emulation only supports read mode') - import tempfile - self.tmpfile = tmpfile = tempfile.mktemp() - os.system(cmd + ' > %s' % tmpfile) - self.pipe = open(tmpfile,'rb') - self.bufsize = bufsize - self.mode = mode - - def read(self): - - return self.pipe.read() - - def readlines(self): - - if self.bufsize is not None: - return self.pipe.readlines() - - def close(self, - - remove=os.unlink,error=os.error): - - if self.pipe: - rc = self.pipe.close() - else: - rc = 255 - if self.tmpfile: - try: - remove(self.tmpfile) - except error: - pass - return rc - - # Alias - __del__ = close - def popen(cmd, mode='r', bufsize=None): """ Portable popen() interface. """ - # Find a working popen implementation preferring win32pipe.popen - # over os.popen over _popen - popen = None - if os.environ.get('OS','') == 'Windows_NT': - # On NT win32pipe should work; on Win9x it hangs due to bugs - # in the MS C lib (see MS KnowledgeBase article Q150956) - try: - import win32pipe - except ImportError: - pass - else: - popen = win32pipe.popen - if popen is None: - if hasattr(os,'popen'): - popen = os.popen - # Check whether it works... it doesn't in GUI programs - # on Windows platforms - if sys.platform == 'win32': # XXX Others too ? - try: - popen('') - except os.error: - popen = _popen - else: - popen = _popen - if bufsize is None: - return popen(cmd,mode) - else: - return popen(cmd,mode,bufsize) + return os.popen(cmd, mode, bufsize) def _norm_version(version, build=''): diff --git a/Lib/poplib.py b/Lib/poplib.py index 84ea88d..d42d9dd 100644 --- a/Lib/poplib.py +++ b/Lib/poplib.py @@ -250,15 +250,18 @@ class POP3: def quit(self): """Signoff: commit changes on server, unlock mailbox, close connection.""" - try: - resp = self._shortcmd('QUIT') - except error_proto as val: - resp = val - self.file.close() - self.sock.close() - del self.file, self.sock + resp = self._shortcmd('QUIT') + self.close() return resp + def close(self): + """Close the connection without assuming anything about it.""" + if self.file is not None: + self.file.close() + if self.sock is not None: + self.sock.close() + self.file = self.sock = None + #__del__ = quit diff --git a/Lib/random.py b/Lib/random.py index 7f63388..f29803e 100644 --- a/Lib/random.py +++ b/Lib/random.py @@ -42,7 +42,7 @@ from types import MethodType as _MethodType, BuiltinMethodType as _BuiltinMethod from math import log as _log, exp as _exp, pi as _pi, e as _e, ceil as _ceil from math import sqrt as _sqrt, acos as _acos, cos as _cos, sin as _sin from os import urandom as _urandom -import collections as _collections +from collections.abc import Set as _Set, Sequence as _Sequence from hashlib import sha512 as _sha512 __all__ = ["Random","seed","random","uniform","randint","choice","sample", @@ -114,7 +114,7 @@ class Random(_random.Random): if version == 2: if isinstance(a, (str, bytes, bytearray)): if isinstance(a, str): - a = a.encode("utf8") + a = a.encode("utf-8") a += _sha512(a).digest() a = int.from_bytes(a, 'big') @@ -293,10 +293,10 @@ class Random(_random.Random): # preferred since the list takes less space than the # set and it doesn't suffer from frequent reselections. - if isinstance(population, _collections.Set): + if isinstance(population, _Set): population = tuple(population) - if not isinstance(population, _collections.Sequence): - raise TypeError("Population must be a sequence or Set. For dicts, use list(d).") + if not isinstance(population, _Sequence): + raise TypeError("Population must be a sequence or set. For dicts, use list(d).") randbelow = self._randbelow n = len(population) if not 0 <= k <= n: diff --git a/Lib/smtpd.py b/Lib/smtpd.py index 599e79b..32f45ae 100755 --- a/Lib/smtpd.py +++ b/Lib/smtpd.py @@ -275,7 +275,7 @@ class SMTPChannel(asynchat.async_chat): return elif limit: self.num_bytes += len(data) - self.received_lines.append(str(data, "utf8")) + self.received_lines.append(str(data, "utf-8")) # Implementation of base class abstract method def found_terminator(self): diff --git a/Lib/sqlite3/test/types.py b/Lib/sqlite3/test/types.py index 29413e1..d214f3d 100644 --- a/Lib/sqlite3/test/types.py +++ b/Lib/sqlite3/test/types.py @@ -85,7 +85,7 @@ class DeclTypesTests(unittest.TestCase): if isinstance(_val, bytes): # sqlite3 always calls __init__ with a bytes created from a # UTF-8 string when __conform__ was used to store the object. - _val = _val.decode('utf8') + _val = _val.decode('utf-8') self.val = _val def __cmp__(self, other): diff --git a/Lib/sre_parse.py b/Lib/sre_parse.py index 13737ca..ae63c31 100644 --- a/Lib/sre_parse.py +++ b/Lib/sre_parse.py @@ -791,7 +791,7 @@ def parse_template(source, pattern): else: # The tokenizer implicitly decodes bytes objects as latin-1, we must # therefore re-encode the final representation. - encode = lambda x: x.encode('latin1') + encode = lambda x: x.encode('latin-1') for c, s in p: if c is MARK: groupsappend((i, s)) diff --git a/Lib/string.py b/Lib/string.py index ef0334c..d4f9cd9 100644 --- a/Lib/string.py +++ b/Lib/string.py @@ -46,23 +46,7 @@ def capwords(s, sep=None): #################################################################### import re as _re - -class _multimap: - """Helper class for combining multiple mappings. - - Used by .{safe_,}substitute() to combine the mapping and keyword - arguments. - """ - def __init__(self, primary, secondary): - self._primary = primary - self._secondary = secondary - - def __getitem__(self, key): - try: - return self._primary[key] - except KeyError: - return self._secondary[key] - +from collections import ChainMap class _TemplateMetaclass(type): pattern = r""" @@ -116,7 +100,7 @@ class Template(metaclass=_TemplateMetaclass): if not args: mapping = kws elif kws: - mapping = _multimap(kws, args[0]) + mapping = ChainMap(kws, args[0]) else: mapping = args[0] # Helper function for .sub() @@ -142,7 +126,7 @@ class Template(metaclass=_TemplateMetaclass): if not args: mapping = kws elif kws: - mapping = _multimap(kws, args[0]) + mapping = ChainMap(kws, args[0]) else: mapping = args[0] # Helper function for .sub() diff --git a/Lib/subprocess.py b/Lib/subprocess.py index c02fb52..359dc96 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -1125,7 +1125,7 @@ class Popen(object): restore_signals, start_new_session): """Execute program (POSIX version)""" - if isinstance(args, str): + if isinstance(args, (str, bytes)): args = [args] else: args = list(args) diff --git a/Lib/tarfile.py b/Lib/tarfile.py index 0f9d1da..6b663f4 100644 --- a/Lib/tarfile.py +++ b/Lib/tarfile.py @@ -1084,7 +1084,7 @@ class TarInfo(object): def create_pax_global_header(cls, pax_headers): """Return the object as a pax global header block sequence. """ - return cls._create_pax_generic_header(pax_headers, XGLTYPE, "utf8") + return cls._create_pax_generic_header(pax_headers, XGLTYPE, "utf-8") def _posix_split_name(self, name): """Split a name longer than 100 chars into a prefix @@ -1167,7 +1167,7 @@ class TarInfo(object): binary = False for keyword, value in pax_headers.items(): try: - value.encode("utf8", "strict") + value.encode("utf-8", "strict") except UnicodeEncodeError: binary = True break @@ -1178,13 +1178,13 @@ class TarInfo(object): records += b"21 hdrcharset=BINARY\n" for keyword, value in pax_headers.items(): - keyword = keyword.encode("utf8") + keyword = keyword.encode("utf-8") if binary: # Try to restore the original byte representation of `value'. # Needless to say, that the encoding must match the string. value = value.encode(encoding, "surrogateescape") else: - value = value.encode("utf8") + value = value.encode("utf-8") l = len(keyword) + len(value) + 3 # ' ' + '=' + '\n' n = p = 0 @@ -1393,7 +1393,7 @@ class TarInfo(object): # the translation to UTF-8 fails. match = re.search(br"\d+ hdrcharset=([^\n]+)\n", buf) if match is not None: - pax_headers["hdrcharset"] = match.group(1).decode("utf8") + pax_headers["hdrcharset"] = match.group(1).decode("utf-8") # For the time being, we don't care about anything other than "BINARY". # The only other value that is currently allowed by the standard is @@ -1402,7 +1402,7 @@ class TarInfo(object): if hdrcharset == "BINARY": encoding = tarfile.encoding else: - encoding = "utf8" + encoding = "utf-8" # Parse pax header information. A record looks like that: # "%d %s=%s\n" % (length, keyword, value). length is the size @@ -1419,20 +1419,20 @@ class TarInfo(object): length = int(length) value = buf[match.end(2) + 1:match.start(1) + length - 1] - # Normally, we could just use "utf8" as the encoding and "strict" + # Normally, we could just use "utf-8" as the encoding and "strict" # as the error handler, but we better not take the risk. For # example, GNU tar <= 1.23 is known to store filenames it cannot # translate to UTF-8 as raw strings (unfortunately without a # hdrcharset=BINARY header). # We first try the strict standard encoding, and if that fails we # fall back on the user's encoding and error handler. - keyword = self._decode_pax_field(keyword, "utf8", "utf8", + keyword = self._decode_pax_field(keyword, "utf-8", "utf-8", tarfile.errors) if keyword in PAX_NAME_FIELDS: value = self._decode_pax_field(value, encoding, tarfile.encoding, tarfile.errors) else: - value = self._decode_pax_field(value, "utf8", "utf8", + value = self._decode_pax_field(value, "utf-8", "utf-8", tarfile.errors) pax_headers[keyword] = value diff --git a/Lib/test/crashers/compiler_recursion.py b/Lib/test/crashers/compiler_recursion.py index 4954bdd..8fd93fc 100644 --- a/Lib/test/crashers/compiler_recursion.py +++ b/Lib/test/crashers/compiler_recursion.py @@ -1,5 +1,13 @@ """ -The compiler (>= 2.5) recurses happily. +The compiler (>= 2.5) recurses happily until it blows the stack. + +Recorded on the tracker as http://bugs.python.org/issue11383 """ -compile('()'*9**5, '?', 'exec') +# The variant below blows up in compiler_call, but there are assorted +# other variations that blow up in other functions +# e.g. '1*'*10**5+'1' will die in compiler_visit_expr + +# The exact limit to destroy the stack will vary by platform +# but 100k should do the trick most places +compile('()'*10**5, '?', 'exec') diff --git a/Lib/test/list_tests.py b/Lib/test/list_tests.py index e3a7845..0c656fd 100644 --- a/Lib/test/list_tests.py +++ b/Lib/test/list_tests.py @@ -425,6 +425,47 @@ class CommonTest(seq_tests.CommonTest): self.assertRaises(TypeError, u.reverse, 42) + def test_clear(self): + u = self.type2test([2, 3, 4]) + u.clear() + self.assertEqual(u, []) + + u = self.type2test([]) + u.clear() + self.assertEqual(u, []) + + u = self.type2test([]) + u.append(1) + u.clear() + u.append(2) + self.assertEqual(u, [2]) + + self.assertRaises(TypeError, u.clear, None) + + def test_copy(self): + u = self.type2test([1, 2, 3]) + v = u.copy() + self.assertEqual(v, [1, 2, 3]) + + u = self.type2test([]) + v = u.copy() + self.assertEqual(v, []) + + # test that it's indeed a copy and not a reference + u = self.type2test(['a', 'b']) + v = u.copy() + v.append('i') + self.assertEqual(u, ['a', 'b']) + self.assertEqual(v, u + ['i']) + + # test that it's a shallow, not a deep copy + u = self.type2test([1, 2, [3, 4], 5]) + v = u.copy() + self.assertEqual(u, v) + self.assertIs(v[3], u[3]) + + self.assertRaises(TypeError, u.copy, None) + def test_sort(self): u = self.type2test([1, 0]) u.sort() diff --git a/Lib/test/pickletester.py b/Lib/test/pickletester.py index a843486..c2fe633 100644 --- a/Lib/test/pickletester.py +++ b/Lib/test/pickletester.py @@ -5,7 +5,7 @@ import pickletools import copyreg from http.cookies import SimpleCookie -from test.support import TestFailed, TESTFN, run_with_locale +from test.support import TestFailed, TESTFN, run_with_locale, no_tracing from pickle import bytes_types @@ -1002,6 +1002,7 @@ class AbstractPickleTests(unittest.TestCase): y = self.loads(s) self.assertEqual(y._reduce_called, 1) + @no_tracing def test_bad_getattr(self): x = BadGetattr() for proto in 0, 1: diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py index b5288b5..6267702 100755 --- a/Lib/test/regrtest.py +++ b/Lib/test/regrtest.py @@ -827,7 +827,7 @@ class saved_test_environment: resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr', 'os.environ', 'sys.path', 'sys.path_hooks', '__import__', 'warnings.filters', 'asyncore.socket_map', - 'logging._handlers', 'logging._handlerList') + 'logging._handlers', 'logging._handlerList', 'sys.gettrace') def get_sys_argv(self): return id(sys.argv), sys.argv, sys.argv[:] @@ -874,6 +874,11 @@ class saved_test_environment: sys.path_hooks = saved_hooks[1] sys.path_hooks[:] = saved_hooks[2] + def get_sys_gettrace(self): + return sys.gettrace() + def restore_sys_gettrace(self, trace_fxn): + sys.settrace(trace_fxn) + def get___import__(self): return builtins.__import__ def restore___import__(self, import_): @@ -1051,7 +1056,8 @@ def dash_R(the_module, test, indirect_test, huntrleaks): False if the test didn't leak references; True if we detected refleaks. """ # This code is hackish and inelegant, but it seems to do the job. - import copyreg, _abcoll + import copyreg + import collections.abc if not hasattr(sys, 'gettotalrefcount'): raise Exception("Tracking reference leaks requires a debug build " @@ -1068,7 +1074,7 @@ def dash_R(the_module, test, indirect_test, huntrleaks): else: zdc = zipimport._zip_directory_cache.copy() abcs = {} - for abc in [getattr(_abcoll, a) for a in _abcoll.__all__]: + for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]: if not isabstract(abc): continue for obj in abc.__subclasses__() + [abc]: @@ -1114,7 +1120,7 @@ def dash_R_cleanup(fs, ps, pic, zdc, abcs): import gc, copyreg import _strptime, linecache import urllib.parse, urllib.request, mimetypes, doctest - import struct, filecmp, _abcoll + import struct, filecmp, collections.abc from distutils.dir_util import _path_created from weakref import WeakSet @@ -1141,7 +1147,7 @@ def dash_R_cleanup(fs, ps, pic, zdc, abcs): sys._clear_type_cache() # Clear ABC registries, restoring previously saved ABC registries. - for abc in [getattr(_abcoll, a) for a in _abcoll.__all__]: + for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]: if not isabstract(abc): continue for obj in abc.__subclasses__() + [abc]: diff --git a/Lib/test/support.py b/Lib/test/support.py index 7a43f97..53c2956 100644 --- a/Lib/test/support.py +++ b/Lib/test/support.py @@ -15,7 +15,7 @@ import shutil import warnings import unittest import importlib -import collections +import collections.abc import re import subprocess import imp @@ -33,7 +33,7 @@ __all__ = [ "verbose", "use_resources", "max_memuse", "record_original_stdout", "get_original_stdout", "unload", "unlink", "rmtree", "forget", "is_resource_enabled", "requires", "find_unused_port", "bind_port", - "fcmp", "is_jython", "TESTFN", "HOST", "FUZZ", "SAVEDCWD", "temp_cwd", + "is_jython", "TESTFN", "HOST", "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error", "open_urlresource", "check_warnings", "CleanImport", "EnvironmentVarGuard", "TransientResource", "captured_output", "captured_stdout", @@ -381,24 +381,6 @@ def bind_port(sock, host=HOST): port = sock.getsockname()[1] return port -FUZZ = 1e-6 - -def fcmp(x, y): # fuzzy comparison function - if isinstance(x, float) or isinstance(y, float): - try: - fuzz = (abs(x) + abs(y)) * FUZZ - if abs(x-y) <= fuzz: - return 0 - except: - pass - elif type(x) == type(y) and isinstance(x, (tuple, list)): - for i in range(min(len(x), len(y))): - outcome = fcmp(x[i], y[i]) - if outcome != 0: - return outcome - return (len(x) > len(y)) - (len(x) < len(y)) - return (x > y) - (x < y) - # decorator for skipping tests on non-IEEE 754 platforms requires_IEEE_754 = unittest.skipUnless( float.__getformat__("double").startswith("IEEE"), @@ -714,7 +696,7 @@ class CleanImport(object): sys.modules.update(self.original_modules) -class EnvironmentVarGuard(collections.MutableMapping): +class EnvironmentVarGuard(collections.abc.MutableMapping): """Class to help protect the environment variable properly. Can be used as a context manager.""" @@ -1142,6 +1124,32 @@ def check_impl_detail(**guards): return guards.get(platform.python_implementation().lower(), default) +def no_tracing(func): + """Decorator to temporarily turn off tracing for the duration of a test.""" + if not hasattr(sys, 'gettrace'): + return func + else: + @functools.wraps(func) + def wrapper(*args, **kwargs): + original_trace = sys.gettrace() + try: + sys.settrace(None) + return func(*args, **kwargs) + finally: + sys.settrace(original_trace) + return wrapper + + +def refcount_test(test): + """Decorator for tests which involve reference counting. + + To start, the decorator does not run the test if is not run by CPython. + After that, any trace function is unset during the test to prevent + unexpected refcounts caused by the trace function. + + """ + return no_tracing(cpython_only(test)) + def _run_suite(suite): """Run tests from a unittest.TestSuite-derived class.""" diff --git a/Lib/test/test_abc.py b/Lib/test/test_abc.py index d86f97c..1319a64 100644 --- a/Lib/test/test_abc.py +++ b/Lib/test/test_abc.py @@ -121,11 +121,32 @@ class TestABC(unittest.TestCase): self.assertFalse(issubclass(B, (A,))) self.assertNotIsInstance(b, A) self.assertNotIsInstance(b, (A,)) - A.register(B) + B1 = A.register(B) + self.assertTrue(issubclass(B, A)) + self.assertTrue(issubclass(B, (A,))) + self.assertIsInstance(b, A) + self.assertIsInstance(b, (A,)) + self.assertIs(B1, B) + class C(B): + pass + c = C() + self.assertTrue(issubclass(C, A)) + self.assertTrue(issubclass(C, (A,))) + self.assertIsInstance(c, A) + self.assertIsInstance(c, (A,)) + + def test_register_as_class_deco(self): + class A(metaclass=abc.ABCMeta): + pass + @A.register + class B(object): + pass + b = B() self.assertTrue(issubclass(B, A)) self.assertTrue(issubclass(B, (A,))) self.assertIsInstance(b, A) self.assertIsInstance(b, (A,)) + @A.register class C(B): pass c = C() @@ -133,6 +154,7 @@ class TestABC(unittest.TestCase): self.assertTrue(issubclass(C, (A,))) self.assertIsInstance(c, A) self.assertIsInstance(c, (A,)) + self.assertIs(C, A.register(C)) def test_isinstance_invalidation(self): class A(metaclass=abc.ABCMeta): diff --git a/Lib/test/test_argparse.py b/Lib/test/test_argparse.py index 03c95fa..8d80336 100644 --- a/Lib/test/test_argparse.py +++ b/Lib/test/test_argparse.py @@ -4328,7 +4328,7 @@ class TestEncoding(TestCase): def _test_module_encoding(self, path): path, _ = os.path.splitext(path) path += ".py" - with codecs.open(path, 'r', 'utf8') as f: + with codecs.open(path, 'r', 'utf-8') as f: f.read() def test_argparse_module_encoding(self): diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py index 53c49a8..389dc8a 100644 --- a/Lib/test/test_asyncore.py +++ b/Lib/test/test_asyncore.py @@ -352,7 +352,7 @@ class DispatcherWithSendTests(unittest.TestCase): @support.reap_threads def test_send(self): evt = threading.Event() - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket() sock.settimeout(3) port = support.bind_port(sock) @@ -367,7 +367,7 @@ class DispatcherWithSendTests(unittest.TestCase): data = b"Suppose there isn't a 16-ton weight?" d = dispatcherwithsend_noread() - d.create_socket(socket.AF_INET, socket.SOCK_STREAM) + d.create_socket() d.connect((HOST, port)) # give time for socket to connect @@ -474,7 +474,7 @@ class TCPServer(asyncore.dispatcher): def __init__(self, handler=BaseTestHandler, host=HOST, port=0): asyncore.dispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.create_socket() self.set_reuse_addr() self.bind((host, port)) self.listen(5) @@ -495,7 +495,7 @@ class BaseClient(BaseTestHandler): def __init__(self, address): BaseTestHandler.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.create_socket() self.connect(address) def handle_connect(self): @@ -536,7 +536,7 @@ class BaseTestAPI(unittest.TestCase): def __init__(self): BaseTestHandler.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.create_socket() self.bind((HOST, 0)) self.listen(5) self.address = self.socket.getsockname()[:2] @@ -555,7 +555,7 @@ class BaseTestAPI(unittest.TestCase): def __init__(self): BaseTestHandler.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.create_socket() self.bind((HOST, 0)) self.listen(5) self.address = self.socket.getsockname()[:2] @@ -693,20 +693,20 @@ class BaseTestAPI(unittest.TestCase): def test_create_socket(self): s = asyncore.dispatcher() - s.create_socket(socket.AF_INET, socket.SOCK_STREAM) + s.create_socket() self.assertEqual(s.socket.family, socket.AF_INET) SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0) self.assertEqual(s.socket.type, socket.SOCK_STREAM | SOCK_NONBLOCK) def test_bind(self): s1 = asyncore.dispatcher() - s1.create_socket(socket.AF_INET, socket.SOCK_STREAM) + s1.create_socket() s1.bind((HOST, 0)) s1.listen(5) port = s1.socket.getsockname()[1] s2 = asyncore.dispatcher() - s2.create_socket(socket.AF_INET, socket.SOCK_STREAM) + s2.create_socket() # EADDRINUSE indicates the socket was correctly bound self.assertRaises(socket.error, s2.bind, (HOST, port)) @@ -723,7 +723,7 @@ class BaseTestAPI(unittest.TestCase): self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)) s.socket.close() - s.create_socket(socket.AF_INET, socket.SOCK_STREAM) + s.create_socket() s.set_reuse_addr() self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)) diff --git a/Lib/test/test_bigmem.py b/Lib/test/test_bigmem.py index ac6b109..f9a0a3d 100644 --- a/Lib/test/test_bigmem.py +++ b/Lib/test/test_bigmem.py @@ -707,7 +707,7 @@ class StrTest(unittest.TestCase, BaseStrTest): class BytesTest(unittest.TestCase, BaseStrTest): def from_latin1(self, s): - return s.encode("latin1") + return s.encode("latin-1") @bigmemtest(minsize=_2G + 2, memuse=1 + character_size) def test_decode(self, size): @@ -718,7 +718,7 @@ class BytesTest(unittest.TestCase, BaseStrTest): class BytearrayTest(unittest.TestCase, BaseStrTest): def from_latin1(self, s): - return bytearray(s.encode("latin1")) + return bytearray(s.encode("latin-1")) @bigmemtest(minsize=_2G + 2, memuse=1 + character_size) def test_decode(self, size): diff --git a/Lib/test/test_builtin.py b/Lib/test/test_builtin.py index 1469e36..61d4046 100644 --- a/Lib/test/test_builtin.py +++ b/Lib/test/test_builtin.py @@ -10,7 +10,7 @@ import ast import types import builtins import random -from test.support import fcmp, TESTFN, unlink, run_unittest, check_warnings +from test.support import TESTFN, unlink, run_unittest, check_warnings from operator import neg @@ -394,10 +394,13 @@ class BuiltinTest(unittest.TestCase): self.assertEqual(divmod(-sys.maxsize-1, -1), (sys.maxsize+1, 0)) - self.assertTrue(not fcmp(divmod(3.25, 1.0), (3.0, 0.25))) - self.assertTrue(not fcmp(divmod(-3.25, 1.0), (-4.0, 0.75))) - self.assertTrue(not fcmp(divmod(3.25, -1.0), (-4.0, -0.75))) - self.assertTrue(not fcmp(divmod(-3.25, -1.0), (3.0, -0.25))) + for num, denom, exp_result in [ (3.25, 1.0, (3.0, 0.25)), + (-3.25, 1.0, (-4.0, 0.75)), + (3.25, -1.0, (-4.0, -0.75)), + (-3.25, -1.0, (3.0, -0.25))]: + result = divmod(num, denom) + self.assertAlmostEqual(result[0], exp_result[0]) + self.assertAlmostEqual(result[1], exp_result[1]) self.assertRaises(TypeError, divmod) diff --git a/Lib/test/test_bytes.py b/Lib/test/test_bytes.py index 9be1008..0b70c3a 100644 --- a/Lib/test/test_bytes.py +++ b/Lib/test/test_bytes.py @@ -188,24 +188,26 @@ class BaseBytesTest(unittest.TestCase): def test_encoding(self): sample = "Hello world\n\u1234\u5678\u9abc" - for enc in ("utf8", "utf16"): + for enc in ("utf-8", "utf-16"): b = self.type2test(sample, enc) self.assertEqual(b, self.type2test(sample.encode(enc))) - self.assertRaises(UnicodeEncodeError, self.type2test, sample, "latin1") - b = self.type2test(sample, "latin1", "ignore") + self.assertRaises(UnicodeEncodeError, self.type2test, sample, "latin-1") + b = self.type2test(sample, "latin-1", "ignore") self.assertEqual(b, self.type2test(sample[:-3], "utf-8")) def test_decode(self): sample = "Hello world\n\u1234\u5678\u9abc\def0\def0" - for enc in ("utf8", "utf16"): + for enc in ("utf-8", "utf-16"): b = self.type2test(sample, enc) self.assertEqual(b.decode(enc), sample) sample = "Hello world\n\x80\x81\xfe\xff" - b = self.type2test(sample, "latin1") - self.assertRaises(UnicodeDecodeError, b.decode, "utf8") - self.assertEqual(b.decode("utf8", "ignore"), "Hello world\n") - self.assertEqual(b.decode(errors="ignore", encoding="utf8"), + b = self.type2test(sample, "latin-1") + self.assertRaises(UnicodeDecodeError, b.decode, "utf-8") + self.assertEqual(b.decode("utf-8", "ignore"), "Hello world\n") + self.assertEqual(b.decode(errors="ignore", encoding="utf-8"), "Hello world\n") + # Default encoding is utf-8 + self.assertEqual(self.type2test(b'\xe2\x98\x83').decode(), '\u2603') def test_from_int(self): b = self.type2test(0) @@ -562,6 +564,39 @@ class ByteArrayTest(BaseBytesTest): b.reverse() self.assertFalse(b) + def test_clear(self): + b = bytearray(b'python') + b.clear() + self.assertEqual(b, b'') + + b = bytearray(b'') + b.clear() + self.assertEqual(b, b'') + + b = bytearray(b'') + b.append(ord('r')) + b.clear() + b.append(ord('p')) + self.assertEqual(b, b'p') + + def test_copy(self): + b = bytearray(b'abc') + bb = b.copy() + self.assertEqual(bb, b'abc') + + b = bytearray(b'') + bb = b.copy() + self.assertEqual(bb, b'') + + # test that it's indeed a copy and not a reference + b = bytearray(b'abc') + bb = b.copy() + self.assertEqual(b, bb) + self.assertIsNot(b, bb) + bb.append(ord('d')) + self.assertEqual(bb, b'abcd') + self.assertEqual(b, b'abc') + def test_regexps(self): def by(s): return bytearray(map(ord, s)) diff --git a/Lib/test/test_cmd_line.py b/Lib/test/test_cmd_line.py index c4e3adf..a0a85ae 100644 --- a/Lib/test/test_cmd_line.py +++ b/Lib/test/test_cmd_line.py @@ -151,7 +151,7 @@ class CmdLineTest(unittest.TestCase): @unittest.skipUnless(sys.platform == 'darwin', 'test specific to Mac OS X') def test_osx_utf8(self): def check_output(text): - decoded = text.decode('utf8', 'surrogateescape') + decoded = text.decode('utf-8', 'surrogateescape') expected = ascii(decoded).encode('ascii') + b'\n' env = os.environ.copy() @@ -223,7 +223,7 @@ class CmdLineTest(unittest.TestCase): self.assertIn(path2.encode('ascii'), out) def test_displayhook_unencodable(self): - for encoding in ('ascii', 'latin1', 'utf8'): + for encoding in ('ascii', 'latin-1', 'utf-8'): env = os.environ.copy() env['PYTHONIOENCODING'] = encoding p = subprocess.Popen( diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index d560d7a..67a5aed 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -1250,7 +1250,7 @@ class EncodedFileTest(unittest.TestCase): self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae') f = io.BytesIO() - ef = codecs.EncodedFile(f, 'utf-8', 'latin1') + ef = codecs.EncodedFile(f, 'utf-8', 'latin-1') ef.write(b'\xc3\xbc') self.assertEqual(f.getvalue(), b'\xfc') @@ -1611,7 +1611,7 @@ class SurrogateEscapeTest(unittest.TestCase): def test_latin1(self): # Issue6373 - self.assertEqual("\udce4\udceb\udcef\udcf6\udcfc".encode("latin1", "surrogateescape"), + self.assertEqual("\udce4\udceb\udcef\udcf6\udcfc".encode("latin-1", "surrogateescape"), b"\xe4\xeb\xef\xf6\xfc") diff --git a/Lib/test/test_collections.py b/Lib/test/test_collections.py index b3e0907..d71fb01 100644 --- a/Lib/test/test_collections.py +++ b/Lib/test/test_collections.py @@ -10,17 +10,18 @@ from random import randrange, shuffle import keyword import re import sys -from collections import _ChainMap as ChainMap -from collections import Hashable, Iterable, Iterator -from collections import Sized, Container, Callable -from collections import Set, MutableSet -from collections import Mapping, MutableMapping, KeysView, ItemsView, UserDict -from collections import Sequence, MutableSequence -from collections import ByteString +from collections import UserDict +from collections import ChainMap +from collections.abc import Hashable, Iterable, Iterator +from collections.abc import Sized, Container, Callable +from collections.abc import Set, MutableSet +from collections.abc import Mapping, MutableMapping, KeysView, ItemsView +from collections.abc import Sequence, MutableSequence +from collections.abc import ByteString ################################################################################ -### _ChainMap (helper class for configparser) +### ChainMap (helper class for configparser and the string module) ################################################################################ class TestChainMap(unittest.TestCase): @@ -71,17 +72,23 @@ class TestChainMap(unittest.TestCase): for m1, m2 in zip(d.maps, e.maps): self.assertIsNot(m1, m2, e) - d.new_child() - d['b'] = 5 - self.assertEqual(d.maps, [{'b': 5}, {'c':30}, {'a':1, 'b':2}]) - self.assertEqual(d.parents.maps, [{'c':30}, {'a':1, 'b':2}]) # check parents - self.assertEqual(d['b'], 5) # find first in chain - self.assertEqual(d.parents['b'], 2) # look beyond maps[0] + f = d.new_child() + f['b'] = 5 + self.assertEqual(f.maps, [{'b': 5}, {'c':30}, {'a':1, 'b':2}]) + self.assertEqual(f.parents.maps, [{'c':30}, {'a':1, 'b':2}]) # check parents + self.assertEqual(f['b'], 5) # find first in chain + self.assertEqual(f.parents['b'], 2) # look beyond maps[0] def test_contructor(self): - self.assertEqual(ChainedContext().maps, [{}]) # no-args --> one new dict + self.assertEqual(ChainMap().maps, [{}]) # no-args --> one new dict self.assertEqual(ChainMap({1:2}).maps, [{1:2}]) # 1 arg --> list + def test_bool(self): + self.assertFalse(ChainMap()) + self.assertFalse(ChainMap({}, {})) + self.assertTrue(ChainMap({1:2}, {})) + self.assertTrue(ChainMap({}, {1:2})) + def test_missing(self): class DefaultChainMap(ChainMap): def __missing__(self, key): @@ -721,6 +728,44 @@ class TestCollectionABCs(ABCTestCase): self.validate_abstract_methods(MutableSequence, '__contains__', '__iter__', '__len__', '__getitem__', '__setitem__', '__delitem__', 'insert') + def test_MutableSequence_mixins(self): + # Test the mixins of MutableSequence by creating a miminal concrete + # class inherited from it. + class MutableSequenceSubclass(MutableSequence): + def __init__(self): + self.lst = [] + + def __setitem__(self, index, value): + self.lst[index] = value + + def __getitem__(self, index): + return self.lst[index] + + def __len__(self): + return len(self.lst) + + def __delitem__(self, index): + del self.lst[index] + + def insert(self, index, value): + self.lst.insert(index, value) + + mss = MutableSequenceSubclass() + mss.append(0) + mss.extend((1, 2, 3, 4)) + self.assertEqual(len(mss), 5) + self.assertEqual(mss[3], 3) + mss.reverse() + self.assertEqual(mss[3], 1) + mss.pop() + self.assertEqual(len(mss), 4) + mss.remove(3) + self.assertEqual(len(mss), 3) + mss += (10, 20, 30) + self.assertEqual(len(mss), 6) + self.assertEqual(mss[-1], 30) + mss.clear() + self.assertEqual(len(mss), 0) ################################################################################ ### Counter @@ -1181,7 +1226,7 @@ import doctest, collections def test_main(verbose=None): NamedTupleDocs = doctest.DocTestSuite(module=collections) test_classes = [TestNamedTuple, NamedTupleDocs, TestOneTrickPonyABCs, - TestCollectionABCs, TestCounter, + TestCollectionABCs, TestCounter, TestChainMap, TestOrderedDict, GeneralMappingTests, SubclassMappingTests] support.run_unittest(*test_classes) support.run_doctest(collections, verbose) diff --git a/Lib/test/test_compileall.py b/Lib/test/test_compileall.py index 250d31b..a63af4c 100644 --- a/Lib/test/test_compileall.py +++ b/Lib/test/test_compileall.py @@ -345,7 +345,7 @@ class CommandLineTests(unittest.TestCase): def test_invalid_arg_produces_message(self): out = self.assertRunOK('badfilename') - self.assertRegex(out, b"Can't list badfilename") + self.assertRegex(out, b"Can't list 'badfilename'") def test_main(): diff --git a/Lib/test/test_cfgparser.py b/Lib/test/test_configparser.py index f7d9a26..f7d9a26 100644 --- a/Lib/test/test_cfgparser.py +++ b/Lib/test/test_configparser.py diff --git a/Lib/test/test_crypt.py b/Lib/test/test_crypt.py index 2adb28d..dc107d8 100644 --- a/Lib/test/test_crypt.py +++ b/Lib/test/test_crypt.py @@ -10,6 +10,25 @@ class CryptTestCase(unittest.TestCase): if support.verbose: print('Test encryption: ', c) + def test_salt(self): + self.assertEqual(len(crypt._saltchars), 64) + for method in crypt.methods: + salt = crypt.mksalt(method) + self.assertEqual(len(salt), + method.salt_chars + (3 if method.ident else 0)) + + def test_saltedcrypt(self): + for method in crypt.methods: + pw = crypt.crypt('assword', method) + self.assertEqual(len(pw), method.total_size) + pw = crypt.crypt('assword', crypt.mksalt(method)) + self.assertEqual(len(pw), method.total_size) + + def test_methods(self): + # Gurantee that METHOD_CRYPT is the last method in crypt.methods. + self.assertTrue(len(crypt.methods) >= 1) + self.assertEqual(crypt.METHOD_CRYPT, crypt.methods[-1]) + def test_main(): support.run_unittest(CryptTestCase) diff --git a/Lib/test/test_descr.py b/Lib/test/test_descr.py index 914b249..746d1ed 100644 --- a/Lib/test/test_descr.py +++ b/Lib/test/test_descr.py @@ -4243,6 +4243,8 @@ class DictProxyTests(unittest.TestCase): pass self.C = C + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'trace function introduces __local__') def test_iter_keys(self): # Testing dict-proxy keys... it = self.C.__dict__.keys() @@ -4252,6 +4254,8 @@ class DictProxyTests(unittest.TestCase): self.assertEqual(keys, ['__dict__', '__doc__', '__module__', '__weakref__', 'meth']) + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'trace function introduces __local__') def test_iter_values(self): # Testing dict-proxy values... it = self.C.__dict__.values() @@ -4259,6 +4263,8 @@ class DictProxyTests(unittest.TestCase): values = list(it) self.assertEqual(len(values), 5) + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'trace function introduces __local__') def test_iter_items(self): # Testing dict-proxy iteritems... it = self.C.__dict__.items() diff --git a/Lib/test/test_descrtut.py b/Lib/test/test_descrtut.py index 2db3d33..c3355b9 100644 --- a/Lib/test/test_descrtut.py +++ b/Lib/test/test_descrtut.py @@ -199,6 +199,8 @@ You can get the information from the list type: '__str__', '__subclasshook__', 'append', + 'clear', + 'copy', 'count', 'extend', 'index', diff --git a/Lib/test/test_doctest.py b/Lib/test/test_doctest.py index 484a9b8..b839556 100644 --- a/Lib/test/test_doctest.py +++ b/Lib/test/test_doctest.py @@ -5,6 +5,7 @@ Test script for doctest. from test import support import doctest import os +import sys # NOTE: There are some additional tests relating to interaction with @@ -373,7 +374,7 @@ We'll simulate a __file__ attr that ends in pyc: >>> tests = finder.find(sample_func) >>> print(tests) # doctest: +ELLIPSIS - [<DocTest sample_func from ...:17 (1 example)>] + [<DocTest sample_func from ...:18 (1 example)>] The exact name depends on how test_doctest was invoked, so allow for leading path components. @@ -1686,226 +1687,227 @@ Run the debugger on the docstring, and then restore sys.stdin. """ -def test_pdb_set_trace(): - """Using pdb.set_trace from a doctest. - - You can use pdb.set_trace from a doctest. To do so, you must - retrieve the set_trace function from the pdb module at the time - you use it. The doctest module changes sys.stdout so that it can - capture program output. It also temporarily replaces pdb.set_trace - with a version that restores stdout. This is necessary for you to - see debugger output. - - >>> doc = ''' - ... >>> x = 42 - ... >>> raise Exception('clé') - ... Traceback (most recent call last): - ... Exception: clé - ... >>> import pdb; pdb.set_trace() - ... ''' - >>> parser = doctest.DocTestParser() - >>> test = parser.get_doctest(doc, {}, "foo-bar@baz", "foo-bar@baz.py", 0) - >>> runner = doctest.DocTestRunner(verbose=False) - - To demonstrate this, we'll create a fake standard input that - captures our debugger input: - - >>> import tempfile - >>> real_stdin = sys.stdin - >>> sys.stdin = _FakeInput([ - ... 'print(x)', # print data defined by the example - ... 'continue', # stop debugging - ... '']) - - >>> try: runner.run(test) - ... finally: sys.stdin = real_stdin - --Return-- - > <doctest foo-bar@baz[2]>(1)<module>()->None - -> import pdb; pdb.set_trace() - (Pdb) print(x) - 42 - (Pdb) continue - TestResults(failed=0, attempted=3) - - You can also put pdb.set_trace in a function called from a test: - - >>> def calls_set_trace(): - ... y=2 - ... import pdb; pdb.set_trace() - - >>> doc = ''' - ... >>> x=1 - ... >>> calls_set_trace() - ... ''' - >>> test = parser.get_doctest(doc, globals(), "foo-bar@baz", "foo-bar@baz.py", 0) - >>> real_stdin = sys.stdin - >>> sys.stdin = _FakeInput([ - ... 'print(y)', # print data defined in the function - ... 'up', # out of function - ... 'print(x)', # print data defined by the example - ... 'continue', # stop debugging - ... '']) - - >>> try: - ... runner.run(test) - ... finally: - ... sys.stdin = real_stdin - --Return-- - > <doctest test.test_doctest.test_pdb_set_trace[8]>(3)calls_set_trace()->None - -> import pdb; pdb.set_trace() - (Pdb) print(y) - 2 - (Pdb) up - > <doctest foo-bar@baz[1]>(1)<module>() - -> calls_set_trace() - (Pdb) print(x) - 1 - (Pdb) continue - TestResults(failed=0, attempted=2) - - During interactive debugging, source code is shown, even for - doctest examples: - - >>> doc = ''' - ... >>> def f(x): - ... ... g(x*2) - ... >>> def g(x): - ... ... print(x+3) - ... ... import pdb; pdb.set_trace() - ... >>> f(3) - ... ''' - >>> test = parser.get_doctest(doc, globals(), "foo-bar@baz", "foo-bar@baz.py", 0) - >>> real_stdin = sys.stdin - >>> sys.stdin = _FakeInput([ - ... 'list', # list source from example 2 - ... 'next', # return from g() - ... 'list', # list source from example 1 - ... 'next', # return from f() - ... 'list', # list source from example 3 - ... 'continue', # stop debugging - ... '']) - >>> try: runner.run(test) - ... finally: sys.stdin = real_stdin - ... # doctest: +NORMALIZE_WHITESPACE - --Return-- - > <doctest foo-bar@baz[1]>(3)g()->None - -> import pdb; pdb.set_trace() - (Pdb) list - 1 def g(x): - 2 print(x+3) - 3 -> import pdb; pdb.set_trace() - [EOF] - (Pdb) next - --Return-- - > <doctest foo-bar@baz[0]>(2)f()->None - -> g(x*2) - (Pdb) list - 1 def f(x): - 2 -> g(x*2) - [EOF] - (Pdb) next - --Return-- - > <doctest foo-bar@baz[2]>(1)<module>()->None - -> f(3) - (Pdb) list - 1 -> f(3) - [EOF] - (Pdb) continue - ********************************************************************** - File "foo-bar@baz.py", line 7, in foo-bar@baz - Failed example: - f(3) - Expected nothing - Got: - 9 - TestResults(failed=1, attempted=3) - """ - -def test_pdb_set_trace_nested(): - """This illustrates more-demanding use of set_trace with nested functions. - - >>> class C(object): - ... def calls_set_trace(self): - ... y = 1 - ... import pdb; pdb.set_trace() - ... self.f1() - ... y = 2 - ... def f1(self): - ... x = 1 - ... self.f2() - ... x = 2 - ... def f2(self): - ... z = 1 - ... z = 2 - - >>> calls_set_trace = C().calls_set_trace - - >>> doc = ''' - ... >>> a = 1 - ... >>> calls_set_trace() - ... ''' - >>> parser = doctest.DocTestParser() - >>> runner = doctest.DocTestRunner(verbose=False) - >>> test = parser.get_doctest(doc, globals(), "foo-bar@baz", "foo-bar@baz.py", 0) - >>> real_stdin = sys.stdin - >>> sys.stdin = _FakeInput([ - ... 'print(y)', # print data defined in the function - ... 'step', 'step', 'step', 'step', 'step', 'step', 'print(z)', - ... 'up', 'print(x)', - ... 'up', 'print(y)', - ... 'up', 'print(foo)', - ... 'continue', # stop debugging - ... '']) - - >>> try: - ... runner.run(test) - ... finally: - ... sys.stdin = real_stdin - ... # doctest: +REPORT_NDIFF - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(5)calls_set_trace() - -> self.f1() - (Pdb) print(y) - 1 - (Pdb) step - --Call-- - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(7)f1() - -> def f1(self): - (Pdb) step - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(8)f1() - -> x = 1 - (Pdb) step - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(9)f1() - -> self.f2() - (Pdb) step - --Call-- - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(11)f2() - -> def f2(self): - (Pdb) step - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(12)f2() - -> z = 1 - (Pdb) step - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(13)f2() - -> z = 2 - (Pdb) print(z) - 1 - (Pdb) up - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(9)f1() - -> self.f2() - (Pdb) print(x) - 1 - (Pdb) up - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(5)calls_set_trace() - -> self.f1() - (Pdb) print(y) - 1 - (Pdb) up - > <doctest foo-bar@baz[1]>(1)<module>() - -> calls_set_trace() - (Pdb) print(foo) - *** NameError: name 'foo' is not defined - (Pdb) continue - TestResults(failed=0, attempted=2) -""" +if not hasattr(sys, 'gettrace') or not sys.gettrace(): + def test_pdb_set_trace(): + """Using pdb.set_trace from a doctest. + + You can use pdb.set_trace from a doctest. To do so, you must + retrieve the set_trace function from the pdb module at the time + you use it. The doctest module changes sys.stdout so that it can + capture program output. It also temporarily replaces pdb.set_trace + with a version that restores stdout. This is necessary for you to + see debugger output. + + >>> doc = ''' + ... >>> x = 42 + ... >>> raise Exception('clé') + ... Traceback (most recent call last): + ... Exception: clé + ... >>> import pdb; pdb.set_trace() + ... ''' + >>> parser = doctest.DocTestParser() + >>> test = parser.get_doctest(doc, {}, "foo-bar@baz", "foo-bar@baz.py", 0) + >>> runner = doctest.DocTestRunner(verbose=False) + + To demonstrate this, we'll create a fake standard input that + captures our debugger input: + + >>> import tempfile + >>> real_stdin = sys.stdin + >>> sys.stdin = _FakeInput([ + ... 'print(x)', # print data defined by the example + ... 'continue', # stop debugging + ... '']) + + >>> try: runner.run(test) + ... finally: sys.stdin = real_stdin + --Return-- + > <doctest foo-bar@baz[2]>(1)<module>()->None + -> import pdb; pdb.set_trace() + (Pdb) print(x) + 42 + (Pdb) continue + TestResults(failed=0, attempted=3) + + You can also put pdb.set_trace in a function called from a test: + + >>> def calls_set_trace(): + ... y=2 + ... import pdb; pdb.set_trace() + + >>> doc = ''' + ... >>> x=1 + ... >>> calls_set_trace() + ... ''' + >>> test = parser.get_doctest(doc, globals(), "foo-bar@baz", "foo-bar@baz.py", 0) + >>> real_stdin = sys.stdin + >>> sys.stdin = _FakeInput([ + ... 'print(y)', # print data defined in the function + ... 'up', # out of function + ... 'print(x)', # print data defined by the example + ... 'continue', # stop debugging + ... '']) + + >>> try: + ... runner.run(test) + ... finally: + ... sys.stdin = real_stdin + --Return-- + > <doctest test.test_doctest.test_pdb_set_trace[8]>(3)calls_set_trace()->None + -> import pdb; pdb.set_trace() + (Pdb) print(y) + 2 + (Pdb) up + > <doctest foo-bar@baz[1]>(1)<module>() + -> calls_set_trace() + (Pdb) print(x) + 1 + (Pdb) continue + TestResults(failed=0, attempted=2) + + During interactive debugging, source code is shown, even for + doctest examples: + + >>> doc = ''' + ... >>> def f(x): + ... ... g(x*2) + ... >>> def g(x): + ... ... print(x+3) + ... ... import pdb; pdb.set_trace() + ... >>> f(3) + ... ''' + >>> test = parser.get_doctest(doc, globals(), "foo-bar@baz", "foo-bar@baz.py", 0) + >>> real_stdin = sys.stdin + >>> sys.stdin = _FakeInput([ + ... 'list', # list source from example 2 + ... 'next', # return from g() + ... 'list', # list source from example 1 + ... 'next', # return from f() + ... 'list', # list source from example 3 + ... 'continue', # stop debugging + ... '']) + >>> try: runner.run(test) + ... finally: sys.stdin = real_stdin + ... # doctest: +NORMALIZE_WHITESPACE + --Return-- + > <doctest foo-bar@baz[1]>(3)g()->None + -> import pdb; pdb.set_trace() + (Pdb) list + 1 def g(x): + 2 print(x+3) + 3 -> import pdb; pdb.set_trace() + [EOF] + (Pdb) next + --Return-- + > <doctest foo-bar@baz[0]>(2)f()->None + -> g(x*2) + (Pdb) list + 1 def f(x): + 2 -> g(x*2) + [EOF] + (Pdb) next + --Return-- + > <doctest foo-bar@baz[2]>(1)<module>()->None + -> f(3) + (Pdb) list + 1 -> f(3) + [EOF] + (Pdb) continue + ********************************************************************** + File "foo-bar@baz.py", line 7, in foo-bar@baz + Failed example: + f(3) + Expected nothing + Got: + 9 + TestResults(failed=1, attempted=3) + """ + + def test_pdb_set_trace_nested(): + """This illustrates more-demanding use of set_trace with nested functions. + + >>> class C(object): + ... def calls_set_trace(self): + ... y = 1 + ... import pdb; pdb.set_trace() + ... self.f1() + ... y = 2 + ... def f1(self): + ... x = 1 + ... self.f2() + ... x = 2 + ... def f2(self): + ... z = 1 + ... z = 2 + + >>> calls_set_trace = C().calls_set_trace + + >>> doc = ''' + ... >>> a = 1 + ... >>> calls_set_trace() + ... ''' + >>> parser = doctest.DocTestParser() + >>> runner = doctest.DocTestRunner(verbose=False) + >>> test = parser.get_doctest(doc, globals(), "foo-bar@baz", "foo-bar@baz.py", 0) + >>> real_stdin = sys.stdin + >>> sys.stdin = _FakeInput([ + ... 'print(y)', # print data defined in the function + ... 'step', 'step', 'step', 'step', 'step', 'step', 'print(z)', + ... 'up', 'print(x)', + ... 'up', 'print(y)', + ... 'up', 'print(foo)', + ... 'continue', # stop debugging + ... '']) + + >>> try: + ... runner.run(test) + ... finally: + ... sys.stdin = real_stdin + ... # doctest: +REPORT_NDIFF + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(5)calls_set_trace() + -> self.f1() + (Pdb) print(y) + 1 + (Pdb) step + --Call-- + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(7)f1() + -> def f1(self): + (Pdb) step + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(8)f1() + -> x = 1 + (Pdb) step + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(9)f1() + -> self.f2() + (Pdb) step + --Call-- + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(11)f2() + -> def f2(self): + (Pdb) step + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(12)f2() + -> z = 1 + (Pdb) step + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(13)f2() + -> z = 2 + (Pdb) print(z) + 1 + (Pdb) up + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(9)f1() + -> self.f2() + (Pdb) print(x) + 1 + (Pdb) up + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(5)calls_set_trace() + -> self.f1() + (Pdb) print(y) + 1 + (Pdb) up + > <doctest foo-bar@baz[1]>(1)<module>() + -> calls_set_trace() + (Pdb) print(foo) + *** NameError: name 'foo' is not defined + (Pdb) continue + TestResults(failed=0, attempted=2) + """ def test_DocTestSuite(): """DocTestSuite creates a unittest test suite from a doctest. diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py index c61078d..2fafe1d 100644 --- a/Lib/test/test_dummy_thread.py +++ b/Lib/test/test_dummy_thread.py @@ -35,8 +35,8 @@ class LockTests(unittest.TestCase): "Lock object did not release properly.") def test_improper_release(self): - #Make sure release of an unlocked thread raises _thread.error - self.assertRaises(_thread.error, self.lock.release) + #Make sure release of an unlocked thread raises RuntimeError + self.assertRaises(RuntimeError, self.lock.release) def test_cond_acquire_success(self): #Make sure the conditional acquiring of the lock works. diff --git a/Lib/test/test_exceptions.py b/Lib/test/test_exceptions.py index 76f4249..592c765 100644 --- a/Lib/test/test_exceptions.py +++ b/Lib/test/test_exceptions.py @@ -7,7 +7,7 @@ import pickle import weakref from test.support import (TESTFN, unlink, run_unittest, captured_output, - gc_collect, cpython_only) + gc_collect, cpython_only, no_tracing) # XXX This is not really enough, each *operation* should be tested! @@ -388,6 +388,7 @@ class ExceptionTests(unittest.TestCase): x = DerivedException(fancy_arg=42) self.assertEqual(x.fancy_arg, 42) + @no_tracing def testInfiniteRecursion(self): def f(): return f() @@ -631,6 +632,7 @@ class ExceptionTests(unittest.TestCase): u.start = 1000 self.assertEqual(str(u), "can't translate characters in position 1000-4: 965230951443685724997") + @no_tracing def test_badisinstance(self): # Bug #2542: if issubclass(e, MyException) raises an exception, # it should be ignored @@ -741,6 +743,7 @@ class ExceptionTests(unittest.TestCase): self.fail("MemoryError not raised") self.assertEqual(wr(), None) + @no_tracing def test_recursion_error_cleanup(self): # Same test as above, but with "recursion exceeded" errors class C: diff --git a/Lib/test/test_float.py b/Lib/test/test_float.py index 30cb4b9..72a2643 100644 --- a/Lib/test/test_float.py +++ b/Lib/test/test_float.py @@ -88,7 +88,7 @@ class GeneralFloatCases(unittest.TestCase): self.assertRaises(ValueError, float, " -0x3.p-1 ") self.assertRaises(ValueError, float, " +0x3.p-1 ") self.assertEqual(float(" 25.e-1 "), 2.5) - self.assertEqual(support.fcmp(float(" .25e-1 "), .025), 0) + self.assertAlmostEqual(float(" .25e-1 "), .025) def test_floatconversion(self): # Make sure that calls to __float__() work properly diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py index 9d2eab7..5f8a8bd 100644 --- a/Lib/test/test_ftplib.py +++ b/Lib/test/test_ftplib.py @@ -608,6 +608,20 @@ class TestFTPClass(TestCase): self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit') self.assertFalse(is_client_connected()) + def test_source_address(self): + self.client.quit() + port = support.find_unused_port() + self.client.connect(self.server.host, self.server.port, + source_address=(HOST, port)) + self.assertEqual(self.client.sock.getsockname()[1], port) + self.client.quit() + + def test_source_address_passive_connection(self): + port = support.find_unused_port() + self.client.source_address = (HOST, port) + sock = self.client.transfercmd('list') + self.assertEqual(sock.getsockname()[1], port) + def test_parse257(self): self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar') self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar') diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py index 0e5a397..f60d5d9 100644 --- a/Lib/test/test_gc.py +++ b/Lib/test/test_gc.py @@ -1,5 +1,6 @@ import unittest -from test.support import verbose, run_unittest, strip_python_stderr +from test.support import (verbose, refcount_test, run_unittest, + strip_python_stderr) import sys import gc import weakref @@ -175,6 +176,7 @@ class GCTests(unittest.TestCase): del d self.assertEqual(gc.collect(), 2) + @refcount_test def test_frame(self): def f(): frame = sys._getframe() @@ -242,6 +244,7 @@ class GCTests(unittest.TestCase): # For example: # - disposed tuples are not freed, but reused # - the call to assertEqual somehow avoids building its args tuple + @refcount_test def test_get_count(self): # Avoid future allocation of method object assertEqual = self._baseAssertEqual @@ -252,6 +255,7 @@ class GCTests(unittest.TestCase): # the dict, and the tuple returned by get_count() assertEqual(gc.get_count(), (2, 0, 0)) + @refcount_test def test_collect_generations(self): # Avoid future allocation of method object assertEqual = self.assertEqual diff --git a/Lib/test/test_genexps.py b/Lib/test/test_genexps.py index 1f46af1..413043c 100644 --- a/Lib/test/test_genexps.py +++ b/Lib/test/test_genexps.py @@ -257,11 +257,15 @@ Verify that genexps are weakly referencable """ +import sys -__test__ = {'doctests' : doctests} +# Trace function can throw off the tuple reuse test. +if hasattr(sys, 'gettrace') and sys.gettrace(): + __test__ = {} +else: + __test__ = {'doctests' : doctests} def test_main(verbose=None): - import sys from test import support from test import test_genexps support.run_doctest(test_genexps, verbose) diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index b665a6c..1821492 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -46,7 +46,7 @@ except ImportError: def _default_chunk_size(): """Get the default TextIOWrapper chunk size""" - with open(__file__, "r", encoding="latin1") as f: + with open(__file__, "r", encoding="latin-1") as f: return f._CHUNK_SIZE @@ -1684,11 +1684,11 @@ class TextIOWrapperTest(unittest.TestCase): r = self.BytesIO(b"\xc3\xa9\n\n") b = self.BufferedReader(r, 1000) t = self.TextIOWrapper(b) - t.__init__(b, encoding="latin1", newline="\r\n") - self.assertEqual(t.encoding, "latin1") + t.__init__(b, encoding="latin-1", newline="\r\n") + self.assertEqual(t.encoding, "latin-1") self.assertEqual(t.line_buffering, False) - t.__init__(b, encoding="utf8", line_buffering=True) - self.assertEqual(t.encoding, "utf8") + t.__init__(b, encoding="utf-8", line_buffering=True) + self.assertEqual(t.encoding, "utf-8") self.assertEqual(t.line_buffering, True) self.assertEqual("\xe9\n", t.readline()) self.assertRaises(TypeError, t.__init__, b, newline=42) @@ -1738,8 +1738,8 @@ class TextIOWrapperTest(unittest.TestCase): def test_encoding(self): # Check the encoding attribute is always set, and valid b = self.BytesIO() - t = self.TextIOWrapper(b, encoding="utf8") - self.assertEqual(t.encoding, "utf8") + t = self.TextIOWrapper(b, encoding="utf-8") + self.assertEqual(t.encoding, "utf-8") t = self.TextIOWrapper(b) self.assertTrue(t.encoding is not None) codecs.lookup(t.encoding) @@ -1918,7 +1918,7 @@ class TextIOWrapperTest(unittest.TestCase): def test_basic_io(self): for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): - for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": + for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": f = self.open(support.TESTFN, "w+", encoding=enc) f._CHUNK_SIZE = chunksize self.assertEqual(f.write("abc"), 3) @@ -1968,7 +1968,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEqual(rlines, wlines) def test_telling(self): - f = self.open(support.TESTFN, "w+", encoding="utf8") + f = self.open(support.TESTFN, "w+", encoding="utf-8") p0 = f.tell() f.write("\xff\n") p1 = f.tell() @@ -2214,6 +2214,7 @@ class TextIOWrapperTest(unittest.TestCase): with self.open(support.TESTFN, "w", errors="replace") as f: self.assertEqual(f.errors, "replace") + @support.no_tracing @unittest.skipUnless(threading, 'Threading required for this test.') def test_threads_write(self): # Issue6750: concurrent writes could duplicate data @@ -2670,6 +2671,7 @@ class SignalsTest(unittest.TestCase): def test_interrupted_write_text(self): self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") + @support.no_tracing def check_reentrant_write(self, data, **fdopen_kwargs): def on_alarm(*args): # Will be called reentrantly from the same thread diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py index 1e4f887..03f814a 100644 --- a/Lib/test/test_mailbox.py +++ b/Lib/test/test_mailbox.py @@ -95,14 +95,14 @@ class TestMailbox(TestBase): """) def test_add_invalid_8bit_bytes_header(self): - key = self._box.add(self._nonascii_msg.encode('latin1')) + key = self._box.add(self._nonascii_msg.encode('latin-1')) self.assertEqual(len(self._box), 1) self.assertEqual(self._box.get_bytes(key), - self._nonascii_msg.encode('latin1')) + self._nonascii_msg.encode('latin-1')) def test_invalid_nonascii_header_as_string(self): subj = self._nonascii_msg.splitlines()[1] - key = self._box.add(subj.encode('latin1')) + key = self._box.add(subj.encode('latin-1')) self.assertEqual(self._box.get_string(key), 'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz' 'YWwuIE3hciByZW5kZWx06Ww/?=\n\n') diff --git a/Lib/test/test_metaclass.py b/Lib/test/test_metaclass.py index 219ab99..6862900 100644 --- a/Lib/test/test_metaclass.py +++ b/Lib/test/test_metaclass.py @@ -246,7 +246,13 @@ Test failures in looking up the __prepare__ method work. """ -__test__ = {'doctests' : doctests} +import sys + +# Trace function introduces __locals__ which causes various tests to fail. +if hasattr(sys, 'gettrace') and sys.gettrace(): + __test__ = {} +else: + __test__ = {'doctests' : doctests} def test_main(verbose=False): from test import support diff --git a/Lib/test/test_minidom.py b/Lib/test/test_minidom.py index 4c2b34a..acc4819 100644 --- a/Lib/test/test_minidom.py +++ b/Lib/test/test_minidom.py @@ -4,9 +4,7 @@ import pickle from test.support import verbose, run_unittest, findfile import unittest -import xml.dom import xml.dom.minidom -import xml.parsers.expat from xml.dom.minidom import parse, Node, Document, parseString from xml.dom.minidom import getDOMImplementation @@ -14,7 +12,6 @@ from xml.dom.minidom import getDOMImplementation tstfile = findfile("test.xml", subdir="xmltestdata") - # The tests of DocumentType importing use these helpers to construct # the documents to work with, since not all DOM builders actually # create the DocumentType nodes. @@ -1009,41 +1006,6 @@ class MinidomTest(unittest.TestCase): "test NodeList.item()") doc.unlink() - def testSAX2DOM(self): - from xml.dom import pulldom - - sax2dom = pulldom.SAX2DOM() - sax2dom.startDocument() - sax2dom.startElement("doc", {}) - sax2dom.characters("text") - sax2dom.startElement("subelm", {}) - sax2dom.characters("text") - sax2dom.endElement("subelm") - sax2dom.characters("text") - sax2dom.endElement("doc") - sax2dom.endDocument() - - doc = sax2dom.document - root = doc.documentElement - (text1, elm1, text2) = root.childNodes - text3 = elm1.childNodes[0] - - self.confirm(text1.previousSibling is None and - text1.nextSibling is elm1 and - elm1.previousSibling is text1 and - elm1.nextSibling is text2 and - text2.previousSibling is elm1 and - text2.nextSibling is None and - text3.previousSibling is None and - text3.nextSibling is None, "testSAX2DOM - siblings") - - self.confirm(root.parentNode is doc and - text1.parentNode is root and - elm1.parentNode is root and - text2.parentNode is root and - text3.parentNode is elm1, "testSAX2DOM - parents") - doc.unlink() - def testEncodings(self): doc = parseString('<foo>€</foo>') self.assertEqual(doc.toxml(), @@ -1490,6 +1452,7 @@ class MinidomTest(unittest.TestCase): doc.appendChild(doc.createComment("foo--bar")) self.assertRaises(ValueError, doc.toxml) + def testEmptyXMLNSValue(self): doc = parseString("<element xmlns=''>\n" "<foo/>\n</element>") diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index 465a831..6f36c19 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -163,6 +163,18 @@ class _TestProcess(BaseTestCase): self.assertEqual(current.ident, os.getpid()) self.assertEqual(current.exitcode, None) + def test_daemon_argument(self): + if self.TYPE == "threads": + return + + # By default uses the current process's daemon flag. + proc0 = self.Process(target=self._test) + self.assertEqual(proc0.daemon, self.current_process().daemon) + proc1 = self.Process(target=self._test, daemon=True) + self.assertTrue(proc1.daemon) + proc2 = self.Process(target=self._test, daemon=False) + self.assertFalse(proc2.daemon) + @classmethod def _test(cls, q, *args, **kwds): current = cls.current_process() diff --git a/Lib/test/test_nntplib.py b/Lib/test/test_nntplib.py index a387f61..ec790ad 100644 --- a/Lib/test/test_nntplib.py +++ b/Lib/test/test_nntplib.py @@ -1,10 +1,11 @@ import io +import socket import datetime import textwrap import unittest import functools import contextlib -import collections +import collections.abc from test import support from nntplib import NNTP, GroupInfo, _have_ssl import nntplib @@ -246,12 +247,32 @@ class NetworkedNNTPTestsMixin: if not name.startswith('test_'): continue meth = getattr(cls, name) - if not isinstance(meth, collections.Callable): + if not isinstance(meth, collections.abc.Callable): continue # Need to use a closure so that meth remains bound to its current # value setattr(cls, name, wrap_meth(meth)) + def test_with_statement(self): + def is_connected(): + if not hasattr(server, 'file'): + return False + try: + server.help() + except (socket.error, EOFError): + return False + return True + + with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: + self.assertTrue(is_connected()) + self.assertTrue(server.help()) + self.assertFalse(is_connected()) + + with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: + server.quit() + self.assertFalse(is_connected()) + + NetworkedNNTPTestsMixin.wrap_methods() @@ -813,7 +834,7 @@ class NNTPv1v2TestsMixin: def _check_article_body(self, lines): self.assertEqual(len(lines), 4) - self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.") + self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.") self.assertEqual(lines[-2], b"") self.assertEqual(lines[-3], b".Here is a dot-starting line.") self.assertEqual(lines[-4], b"This is just a test article.") diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 544eee1..22e6c51 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -15,6 +15,13 @@ from test import support import contextlib import mmap import uuid +import asyncore +import asynchat +import socket +try: + import threading +except ImportError: + threading = None # Detect whether we're on a Linux system that uses the (now outdated # and unmaintained) linuxthreads threading library. There's an issue @@ -1261,6 +1268,289 @@ class LoginTests(unittest.TestCase): self.assertNotEqual(len(user_name), 0) +@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'), + "needs os.getpriority and os.setpriority") +class ProgramPriorityTests(unittest.TestCase): + """Tests for os.getpriority() and os.setpriority().""" + + def test_set_get_priority(self): + + base = os.getpriority(os.PRIO_PROCESS, os.getpid()) + os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1) + try: + new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid()) + if base >= 19 and new_prio <= 19: + raise unittest.SkipTest( + "unable to reliably test setpriority at current nice level of %s" % base) + else: + self.assertEqual(new_prio, base + 1) + finally: + try: + os.setpriority(os.PRIO_PROCESS, os.getpid(), base) + except OSError as err: + if err.errno != errno.EACCES: + raise + + +class SendfileTestServer(asyncore.dispatcher, threading.Thread): + + class Handler(asynchat.async_chat): + + def __init__(self, conn): + asynchat.async_chat.__init__(self, conn) + self.in_buffer = [] + self.closed = False + self.push(b"220 ready\r\n") + + def handle_read(self): + data = self.recv(4096) + self.in_buffer.append(data) + + def get_data(self): + return b''.join(self.in_buffer) + + def handle_close(self): + self.close() + self.closed = True + + def handle_error(self): + raise + + def __init__(self, address): + threading.Thread.__init__(self) + asyncore.dispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.bind(address) + self.listen(5) + self.host, self.port = self.socket.getsockname()[:2] + self.handler_instance = None + self._active = False + self._active_lock = threading.Lock() + + # --- public API + + @property + def running(self): + return self._active + + def start(self): + assert not self.running + self.__flag = threading.Event() + threading.Thread.start(self) + self.__flag.wait() + + def stop(self): + assert self.running + self._active = False + self.join() + + def wait(self): + # wait for handler connection to be closed, then stop the server + while not getattr(self.handler_instance, "closed", False): + time.sleep(0.001) + self.stop() + + # --- internals + + def run(self): + self._active = True + self.__flag.set() + while self._active and asyncore.socket_map: + self._active_lock.acquire() + asyncore.loop(timeout=0.001, count=1) + self._active_lock.release() + asyncore.close_all() + + def handle_accept(self): + conn, addr = self.accept() + self.handler_instance = self.Handler(conn) + + def handle_connect(self): + self.close() + handle_read = handle_connect + + def writable(self): + return 0 + + def handle_error(self): + raise + + +@unittest.skipUnless(threading is not None, "test needs threading module") +@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()") +class TestSendfile(unittest.TestCase): + + DATA = b"12345abcde" * 16 * 1024 # 160 KB + SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ + not sys.platform.startswith("solaris") and \ + not sys.platform.startswith("sunos") + + @classmethod + def setUpClass(cls): + with open(support.TESTFN, "wb") as f: + f.write(cls.DATA) + + @classmethod + def tearDownClass(cls): + support.unlink(support.TESTFN) + + def setUp(self): + self.server = SendfileTestServer((support.HOST, 0)) + self.server.start() + self.client = socket.socket() + self.client.connect((self.server.host, self.server.port)) + self.client.settimeout(1) + # synchronize by waiting for "220 ready" response + self.client.recv(1024) + self.sockno = self.client.fileno() + self.file = open(support.TESTFN, 'rb') + self.fileno = self.file.fileno() + + def tearDown(self): + self.file.close() + self.client.close() + if self.server.running: + self.server.stop() + + def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]): + """A higher level wrapper representing how an application is + supposed to use sendfile(). + """ + while 1: + try: + if self.SUPPORT_HEADERS_TRAILERS: + return os.sendfile(sock, file, offset, nbytes, headers, + trailers) + else: + return os.sendfile(sock, file, offset, nbytes) + except OSError as err: + if err.errno == errno.ECONNRESET: + # disconnected + raise + elif err.errno in (errno.EAGAIN, errno.EBUSY): + # we have to retry send data + continue + else: + raise + + def test_send_whole_file(self): + # normal send + total_sent = 0 + offset = 0 + nbytes = 4096 + while total_sent < len(self.DATA): + sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes) + if sent == 0: + break + offset += sent + total_sent += sent + self.assertTrue(sent <= nbytes) + self.assertEqual(offset, total_sent) + + self.assertEqual(total_sent, len(self.DATA)) + self.client.shutdown(socket.SHUT_RDWR) + self.client.close() + self.server.wait() + data = self.server.handler_instance.get_data() + self.assertEqual(len(data), len(self.DATA)) + self.assertEqual(data, self.DATA) + + def test_send_at_certain_offset(self): + # start sending a file at a certain offset + total_sent = 0 + offset = len(self.DATA) // 2 + must_send = len(self.DATA) - offset + nbytes = 4096 + while total_sent < must_send: + sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes) + if sent == 0: + break + offset += sent + total_sent += sent + self.assertTrue(sent <= nbytes) + + self.client.shutdown(socket.SHUT_RDWR) + self.client.close() + self.server.wait() + data = self.server.handler_instance.get_data() + expected = self.DATA[len(self.DATA) // 2:] + self.assertEqual(total_sent, len(expected)) + self.assertEqual(len(data), len(expected)) + self.assertEqual(data, expected) + + def test_offset_overflow(self): + # specify an offset > file size + offset = len(self.DATA) + 4096 + try: + sent = os.sendfile(self.sockno, self.fileno, offset, 4096) + except OSError as e: + # Solaris can raise EINVAL if offset >= file length, ignore. + if e.errno != errno.EINVAL: + raise + else: + self.assertEqual(sent, 0) + self.client.shutdown(socket.SHUT_RDWR) + self.client.close() + self.server.wait() + data = self.server.handler_instance.get_data() + self.assertEqual(data, b'') + + def test_invalid_offset(self): + with self.assertRaises(OSError) as cm: + os.sendfile(self.sockno, self.fileno, -1, 4096) + self.assertEqual(cm.exception.errno, errno.EINVAL) + + # --- headers / trailers tests + + if SUPPORT_HEADERS_TRAILERS: + + def test_headers(self): + total_sent = 0 + sent = os.sendfile(self.sockno, self.fileno, 0, 4096, + headers=[b"x" * 512]) + total_sent += sent + offset = 4096 + nbytes = 4096 + while 1: + sent = self.sendfile_wrapper(self.sockno, self.fileno, + offset, nbytes) + if sent == 0: + break + total_sent += sent + offset += sent + + expected_data = b"x" * 512 + self.DATA + self.assertEqual(total_sent, len(expected_data)) + self.client.close() + self.server.wait() + data = self.server.handler_instance.get_data() + self.assertEqual(hash(data), hash(expected_data)) + + def test_trailers(self): + TESTFN2 = support.TESTFN + "2" + f = open(TESTFN2, 'wb') + f.write(b"abcde") + f.close() + f = open(TESTFN2, 'rb') + try: + os.sendfile(self.sockno, f.fileno(), 0, 4096, trailers=[b"12345"]) + self.client.close() + self.server.wait() + data = self.server.handler_instance.get_data() + self.assertEqual(data, b"abcde12345") + finally: + os.remove(TESTFN2) + + if hasattr(os, "SF_NODISKIO"): + def test_flags(self): + try: + os.sendfile(self.sockno, self.fileno, 0, 4096, + flags=os.SF_NODISKIO) + except OSError as err: + if err.errno not in (errno.EBUSY, errno.EAGAIN): + raise + + def test_main(): support.run_unittest( FileTests, @@ -1281,6 +1571,8 @@ def test_main(): PidTests, LoginTests, LinkTests, + TestSendfile, + ProgramPriorityTests, ) if __name__ == "__main__": diff --git a/Lib/test/test_osx_env.py b/Lib/test/test_osx_env.py index 8b3df37..24ec2b4 100644 --- a/Lib/test/test_osx_env.py +++ b/Lib/test/test_osx_env.py @@ -5,6 +5,7 @@ Test suite for OS X interpreter environment variables. from test.support import EnvironmentVarGuard, run_unittest import subprocess import sys +import sysconfig import unittest class OSXEnvironmentVariableTestCase(unittest.TestCase): @@ -27,8 +28,6 @@ class OSXEnvironmentVariableTestCase(unittest.TestCase): self._check_sys('PYTHONEXECUTABLE', '==', 'sys.executable') def test_main(): - from distutils import sysconfig - if sys.platform == 'darwin' and sysconfig.get_config_var('WITH_NEXT_FRAMEWORK'): run_unittest(OSXEnvironmentVariableTestCase) diff --git a/Lib/test/test_pdb.py b/Lib/test/test_pdb.py index d861df5..c197aff 100644 --- a/Lib/test/test_pdb.py +++ b/Lib/test/test_pdb.py @@ -20,9 +20,12 @@ class PdbTestInput(object): def __enter__(self): self.real_stdin = sys.stdin sys.stdin = _FakeInput(self.input) + self.orig_trace = sys.gettrace() if hasattr(sys, 'gettrace') else None def __exit__(self, *exc): sys.stdin = self.real_stdin + if self.orig_trace: + sys.settrace(self.orig_trace) def test_pdb_displayhook(): diff --git a/Lib/test/test_pep3120.py b/Lib/test/test_pep3120.py index 09fedf0..496f8da 100644 --- a/Lib/test/test_pep3120.py +++ b/Lib/test/test_pep3120.py @@ -19,8 +19,8 @@ class PEP3120Test(unittest.TestCase): try: import test.badsyntax_pep3120 except SyntaxError as msg: - msg = str(msg) - self.assertTrue('UTF-8' in msg or 'utf8' in msg) + msg = str(msg).lower() + self.assertTrue('utf-8' in msg or 'utf8' in msg) else: self.fail("expected exception didn't occur") diff --git a/Lib/test/test_platform.py b/Lib/test/test_platform.py index 7dd7eef..2b58ebf 100644 --- a/Lib/test/test_platform.py +++ b/Lib/test/test_platform.py @@ -243,6 +243,34 @@ class PlatformTest(unittest.TestCase): ): self.assertEqual(platform._parse_release_file(input), output) + def test_popen(self): + mswindows = (sys.platform == "win32") + + if mswindows: + command = '"{}" -c "print(\'Hello\')"'.format(sys.executable) + else: + command = "'{}' -c 'print(\"Hello\")'".format(sys.executable) + with platform.popen(command) as stdout: + hello = stdout.read().strip() + stdout.close() + self.assertEqual(hello, "Hello") + + data = 'plop' + if mswindows: + command = '"{}" -c "import sys; data=sys.stdin.read(); exit(len(data))"' + else: + command = "'{}' -c 'import sys; data=sys.stdin.read(); exit(len(data))'" + command = command.format(sys.executable) + with platform.popen(command, 'w') as stdin: + stdout = stdin.write(data) + ret = stdin.close() + self.assertIsNotNone(ret) + if os.name == 'nt': + returncode = ret + else: + returncode = ret >> 8 + self.assertEqual(returncode, len(data)) + def test_main(): support.run_unittest( diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py index 81af569..0a3adcc 100644 --- a/Lib/test/test_poplib.py +++ b/Lib/test/test_poplib.py @@ -108,6 +108,10 @@ class DummyPOP3Handler(asynchat.async_chat): def cmd_apop(self, arg): self.push('+OK done nothing.') + def cmd_quit(self, arg): + self.push('+OK closing.') + self.close_when_done() + class DummyPOP3Server(asyncore.dispatcher, threading.Thread): @@ -165,10 +169,10 @@ class TestPOP3Class(TestCase): def setUp(self): self.server = DummyPOP3Server((HOST, PORT)) self.server.start() - self.client = poplib.POP3(self.server.host, self.server.port) + self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) def tearDown(self): - self.client.quit() + self.client.close() self.server.stop() def test_getwelcome(self): @@ -228,6 +232,12 @@ class TestPOP3Class(TestCase): self.client.uidl() self.client.uidl('foo') + def test_quit(self): + resp = self.client.quit() + self.assertTrue(resp) + self.assertIsNone(self.client.sock) + self.assertIsNone(self.client.file) + SUPPORTS_SSL = False if hasattr(poplib, 'POP3_SSL'): @@ -274,6 +284,7 @@ if hasattr(poplib, 'POP3_SSL'): else: DummyPOP3Handler.handle_read(self) + class TestPOP3_SSLClass(TestPOP3Class): # repeat previous tests by using poplib.POP3_SSL diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py index 45b3afc..23c2100 100644 --- a/Lib/test/test_posix.py +++ b/Lib/test/test_posix.py @@ -285,6 +285,18 @@ class PosixTester(unittest.TestCase): if hasattr(posix, 'listdir'): self.assertTrue(support.TESTFN in posix.listdir()) + @unittest.skipUnless(hasattr(posix, 'fdlistdir'), "test needs posix.fdlistdir()") + def test_fdlistdir(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + self.assertEqual( + sorted(posix.listdir('.')), + sorted(posix.fdlistdir(f)) + ) + # Check the fd was closed by fdlistdir + with self.assertRaises(OSError) as ctx: + posix.close(f) + self.assertEqual(ctx.exception.errno, errno.EBADF) + def test_access(self): if hasattr(posix, 'access'): self.assertTrue(posix.access(support.TESTFN, os.R_OK)) @@ -389,6 +401,198 @@ class PosixTester(unittest.TestCase): set([int(x) for x in groups.split()]), set(posix.getgroups() + [posix.getegid()])) + # tests for the posix *at functions follow + + @unittest.skipUnless(hasattr(posix, 'faccessat'), "test needs posix.faccessat()") + def test_faccessat(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + self.assertTrue(posix.faccessat(f, support.TESTFN, os.R_OK)) + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'fchmodat'), "test needs posix.fchmodat()") + def test_fchmodat(self): + os.chmod(support.TESTFN, stat.S_IRUSR) + + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.fchmodat(f, support.TESTFN, stat.S_IRUSR | stat.S_IWUSR) + + s = posix.stat(support.TESTFN) + self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR) + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'fchownat'), "test needs posix.fchownat()") + def test_fchownat(self): + support.unlink(support.TESTFN) + open(support.TESTFN, 'w').close() + + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.fchownat(f, support.TESTFN, os.getuid(), os.getgid()) + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'fstatat'), "test needs posix.fstatat()") + def test_fstatat(self): + support.unlink(support.TESTFN) + with open(support.TESTFN, 'w') as outfile: + outfile.write("testline\n") + + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + s1 = posix.stat(support.TESTFN) + s2 = posix.fstatat(f, support.TESTFN) + self.assertEqual(s1, s2) + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'futimesat'), "test needs posix.futimesat()") + def test_futimesat(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + now = time.time() + posix.futimesat(f, support.TESTFN, None) + self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (None, None)) + self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (now, None)) + self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (None, now)) + posix.futimesat(f, support.TESTFN, (int(now), int(now))) + posix.futimesat(f, support.TESTFN, (now, now)) + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'linkat'), "test needs posix.linkat()") + def test_linkat(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.linkat(f, support.TESTFN, f, support.TESTFN + 'link') + # should have same inodes + self.assertEqual(posix.stat(support.TESTFN)[1], + posix.stat(support.TESTFN + 'link')[1]) + finally: + posix.close(f) + support.unlink(support.TESTFN + 'link') + + @unittest.skipUnless(hasattr(posix, 'mkdirat'), "test needs posix.mkdirat()") + def test_mkdirat(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.mkdirat(f, support.TESTFN + 'dir') + posix.stat(support.TESTFN + 'dir') # should not raise exception + finally: + posix.close(f) + support.rmtree(support.TESTFN + 'dir') + + @unittest.skipUnless(hasattr(posix, 'mknodat') and hasattr(stat, 'S_IFIFO'), + "don't have mknodat()/S_IFIFO") + def test_mknodat(self): + # Test using mknodat() to create a FIFO (the only use specified + # by POSIX). + support.unlink(support.TESTFN) + mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.mknodat(f, support.TESTFN, mode, 0) + except OSError as e: + # Some old systems don't allow unprivileged users to use + # mknod(), or only support creating device nodes. + self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) + else: + self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'openat'), "test needs posix.openat()") + def test_openat(self): + support.unlink(support.TESTFN) + with open(support.TESTFN, 'w') as outfile: + outfile.write("testline\n") + a = posix.open(posix.getcwd(), posix.O_RDONLY) + b = posix.openat(a, support.TESTFN, posix.O_RDONLY) + try: + res = posix.read(b, 9).decode(encoding="utf-8") + self.assertEqual("testline\n", res) + finally: + posix.close(a) + posix.close(b) + + @unittest.skipUnless(hasattr(posix, 'readlinkat'), "test needs posix.readlinkat()") + def test_readlinkat(self): + os.symlink(support.TESTFN, support.TESTFN + 'link') + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + self.assertEqual(posix.readlink(support.TESTFN + 'link'), + posix.readlinkat(f, support.TESTFN + 'link')) + finally: + support.unlink(support.TESTFN + 'link') + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'renameat'), "test needs posix.renameat()") + def test_renameat(self): + support.unlink(support.TESTFN) + open(support.TESTFN + 'ren', 'w').close() + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.renameat(f, support.TESTFN + 'ren', f, support.TESTFN) + except: + posix.rename(support.TESTFN + 'ren', support.TESTFN) + raise + else: + posix.stat(support.TESTFN) # should not throw exception + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'symlinkat'), "test needs posix.symlinkat()") + def test_symlinkat(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.symlinkat(support.TESTFN, f, support.TESTFN + 'link') + self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN) + finally: + posix.close(f) + support.unlink(support.TESTFN + 'link') + + @unittest.skipUnless(hasattr(posix, 'unlinkat'), "test needs posix.unlinkat()") + def test_unlinkat(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + open(support.TESTFN + 'del', 'w').close() + posix.stat(support.TESTFN + 'del') # should not throw exception + try: + posix.unlinkat(f, support.TESTFN + 'del') + except: + support.unlink(support.TESTFN + 'del') + raise + else: + self.assertRaises(OSError, posix.stat, support.TESTFN + 'link') + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'utimensat'), "test needs posix.utimensat()") + def test_utimensat(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + now = time.time() + posix.utimensat(f, support.TESTFN, None, None) + self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, (None, None), (None, None)) + self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, (now, 0), None) + self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, None, (now, 0)) + posix.utimensat(f, support.TESTFN, (int(now), int((now - int(now)) * 1e9)), + (int(now), int((now - int(now)) * 1e9))) + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'mkfifoat'), "don't have mkfifoat()") + def test_mkfifoat(self): + support.unlink(support.TESTFN) + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.mkfifoat(f, support.TESTFN, stat.S_IRUSR | stat.S_IWUSR) + self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) + finally: + posix.close(f) + class PosixGroupsTester(unittest.TestCase): def setUp(self): diff --git a/Lib/test/test_pulldom.py b/Lib/test/test_pulldom.py new file mode 100644 index 0000000..4171526 --- /dev/null +++ b/Lib/test/test_pulldom.py @@ -0,0 +1,345 @@ +import io +import unittest +import sys +import xml.sax + +from xml.sax.xmlreader import AttributesImpl +from xml.dom import pulldom + +from test.support import run_unittest, findfile + + +tstfile = findfile("test.xml", subdir="xmltestdata") + +# A handy XML snippet, containing attributes, a namespace prefix, and a +# self-closing tag: +SMALL_SAMPLE = """<?xml version="1.0"?> +<html xmlns="http://www.w3.org/1999/xhtml" xmlns:xdc="http://www.xml.com/books"> +<!-- A comment --> +<title>Introduction to XSL</title> +<hr/> +<p><xdc:author xdc:attrib="prefixed attribute" attrib="other attrib">A. Namespace</xdc:author></p> +</html>""" + + +class PullDOMTestCase(unittest.TestCase): + + def test_parse(self): + """Minimal test of DOMEventStream.parse()""" + + # This just tests that parsing from a stream works. Actual parser + # semantics are tested using parseString with a more focused XML + # fragment. + + # Test with a filename: + list(pulldom.parse(tstfile)) + + # Test with a file object: + with open(tstfile, "rb") as fin: + list(pulldom.parse(fin)) + + def test_parse_semantics(self): + """Test DOMEventStream parsing semantics.""" + + items = pulldom.parseString(SMALL_SAMPLE) + evt, node = next(items) + # Just check the node is a Document: + self.assertTrue(hasattr(node, "createElement")) + self.assertEqual(pulldom.START_DOCUMENT, evt) + evt, node = next(items) + self.assertEqual(pulldom.START_ELEMENT, evt) + self.assertEqual("html", node.tagName) + self.assertEqual(2, len(node.attributes)) + self.assertEqual(node.attributes.getNamedItem("xmlns:xdc").value, + "http://www.xml.com/books") + evt, node = next(items) + self.assertEqual(pulldom.CHARACTERS, evt) # Line break + evt, node = next(items) + # XXX - A comment should be reported here! + # self.assertEqual(pulldom.COMMENT, evt) + # Line break after swallowed comment: + self.assertEqual(pulldom.CHARACTERS, evt) + evt, node = next(items) + self.assertEqual("title", node.tagName) + title_node = node + evt, node = next(items) + self.assertEqual(pulldom.CHARACTERS, evt) + self.assertEqual("Introduction to XSL", node.data) + evt, node = next(items) + self.assertEqual(pulldom.END_ELEMENT, evt) + self.assertEqual("title", node.tagName) + self.assertTrue(title_node is node) + evt, node = next(items) + self.assertEqual(pulldom.CHARACTERS, evt) + evt, node = next(items) + self.assertEqual(pulldom.START_ELEMENT, evt) + self.assertEqual("hr", node.tagName) + evt, node = next(items) + self.assertEqual(pulldom.END_ELEMENT, evt) + self.assertEqual("hr", node.tagName) + evt, node = next(items) + self.assertEqual(pulldom.CHARACTERS, evt) + evt, node = next(items) + self.assertEqual(pulldom.START_ELEMENT, evt) + self.assertEqual("p", node.tagName) + evt, node = next(items) + self.assertEqual(pulldom.START_ELEMENT, evt) + self.assertEqual("xdc:author", node.tagName) + evt, node = next(items) + self.assertEqual(pulldom.CHARACTERS, evt) + evt, node = next(items) + self.assertEqual(pulldom.END_ELEMENT, evt) + self.assertEqual("xdc:author", node.tagName) + evt, node = next(items) + self.assertEqual(pulldom.END_ELEMENT, evt) + evt, node = next(items) + self.assertEqual(pulldom.CHARACTERS, evt) + evt, node = next(items) + self.assertEqual(pulldom.END_ELEMENT, evt) + # XXX No END_DOCUMENT item is ever obtained: + #evt, node = next(items) + #self.assertEqual(pulldom.END_DOCUMENT, evt) + + def test_expandItem(self): + """Ensure expandItem works as expected.""" + items = pulldom.parseString(SMALL_SAMPLE) + # Loop through the nodes until we get to a "title" start tag: + for evt, item in items: + if evt == pulldom.START_ELEMENT and item.tagName == "title": + items.expandNode(item) + self.assertEqual(1, len(item.childNodes)) + break + else: + self.fail("No \"title\" element detected in SMALL_SAMPLE!") + # Loop until we get to the next start-element: + for evt, node in items: + if evt == pulldom.START_ELEMENT: + break + self.assertEqual("hr", node.tagName, + "expandNode did not leave DOMEventStream in the correct state.") + # Attempt to expand a standalone element: + items.expandNode(node) + self.assertEqual(next(items)[0], pulldom.CHARACTERS) + evt, node = next(items) + self.assertEqual(node.tagName, "p") + items.expandNode(node) + next(items) # Skip character data + evt, node = next(items) + self.assertEqual(node.tagName, "html") + with self.assertRaises(StopIteration): + next(items) + items.clear() + self.assertIsNone(items.parser) + self.assertIsNone(items.stream) + + @unittest.expectedFailure + def test_comment(self): + """PullDOM does not receive "comment" events.""" + items = pulldom.parseString(SMALL_SAMPLE) + for evt, _ in items: + if evt == pulldom.COMMENT: + break + else: + self.fail("No comment was encountered") + + @unittest.expectedFailure + def test_end_document(self): + """PullDOM does not receive "end-document" events.""" + items = pulldom.parseString(SMALL_SAMPLE) + # Read all of the nodes up to and including </html>: + for evt, node in items: + if evt == pulldom.END_ELEMENT and node.tagName == "html": + break + try: + # Assert that the next node is END_DOCUMENT: + evt, node = next(items) + self.assertEqual(pulldom.END_DOCUMENT, evt) + except StopIteration: + self.fail( + "Ran out of events, but should have received END_DOCUMENT") + + +class ThoroughTestCase(unittest.TestCase): + """Test the hard-to-reach parts of pulldom.""" + + def test_thorough_parse(self): + """Test some of the hard-to-reach parts of PullDOM.""" + self._test_thorough(pulldom.parse(None, parser=SAXExerciser())) + + @unittest.expectedFailure + def test_sax2dom_fail(self): + """SAX2DOM can"t handle a PI before the root element.""" + pd = SAX2DOMTestHelper(None, SAXExerciser(), 12) + self._test_thorough(pd) + + def test_thorough_sax2dom(self): + """Test some of the hard-to-reach parts of SAX2DOM.""" + pd = SAX2DOMTestHelper(None, SAX2DOMExerciser(), 12) + self._test_thorough(pd, False) + + def _test_thorough(self, pd, before_root=True): + """Test some of the hard-to-reach parts of the parser, using a mock + parser.""" + + evt, node = next(pd) + self.assertEqual(pulldom.START_DOCUMENT, evt) + # Just check the node is a Document: + self.assertTrue(hasattr(node, "createElement")) + + if before_root: + evt, node = next(pd) + self.assertEqual(pulldom.COMMENT, evt) + self.assertEqual("a comment", node.data) + evt, node = next(pd) + self.assertEqual(pulldom.PROCESSING_INSTRUCTION, evt) + self.assertEqual("target", node.target) + self.assertEqual("data", node.data) + + evt, node = next(pd) + self.assertEqual(pulldom.START_ELEMENT, evt) + self.assertEqual("html", node.tagName) + + evt, node = next(pd) + self.assertEqual(pulldom.COMMENT, evt) + self.assertEqual("a comment", node.data) + evt, node = next(pd) + self.assertEqual(pulldom.PROCESSING_INSTRUCTION, evt) + self.assertEqual("target", node.target) + self.assertEqual("data", node.data) + + evt, node = next(pd) + self.assertEqual(pulldom.START_ELEMENT, evt) + self.assertEqual("p", node.tagName) + + evt, node = next(pd) + self.assertEqual(pulldom.CHARACTERS, evt) + self.assertEqual("text", node.data) + evt, node = next(pd) + self.assertEqual(pulldom.END_ELEMENT, evt) + self.assertEqual("p", node.tagName) + evt, node = next(pd) + self.assertEqual(pulldom.END_ELEMENT, evt) + self.assertEqual("html", node.tagName) + evt, node = next(pd) + self.assertEqual(pulldom.END_DOCUMENT, evt) + + +class SAXExerciser(object): + """A fake sax parser that calls some of the harder-to-reach sax methods to + ensure it emits the correct events""" + + def setContentHandler(self, handler): + self._handler = handler + + def parse(self, _): + h = self._handler + h.startDocument() + + # The next two items ensure that items preceding the first + # start_element are properly stored and emitted: + h.comment("a comment") + h.processingInstruction("target", "data") + + h.startElement("html", AttributesImpl({})) + + h.comment("a comment") + h.processingInstruction("target", "data") + + h.startElement("p", AttributesImpl({"class": "paraclass"})) + h.characters("text") + h.endElement("p") + h.endElement("html") + h.endDocument() + + def stub(self, *args, **kwargs): + """Stub method. Does nothing.""" + pass + setProperty = stub + setFeature = stub + + +class SAX2DOMExerciser(SAXExerciser): + """The same as SAXExerciser, but without the processing instruction and + comment before the root element, because S2D can"t handle it""" + + def parse(self, _): + h = self._handler + h.startDocument() + h.startElement("html", AttributesImpl({})) + h.comment("a comment") + h.processingInstruction("target", "data") + h.startElement("p", AttributesImpl({"class": "paraclass"})) + h.characters("text") + h.endElement("p") + h.endElement("html") + h.endDocument() + + +class SAX2DOMTestHelper(pulldom.DOMEventStream): + """Allows us to drive SAX2DOM from a DOMEventStream.""" + + def reset(self): + self.pulldom = pulldom.SAX2DOM() + # This content handler relies on namespace support + self.parser.setFeature(xml.sax.handler.feature_namespaces, 1) + self.parser.setContentHandler(self.pulldom) + + +class SAX2DOMTestCase(unittest.TestCase): + + def confirm(self, test, testname="Test"): + self.assertTrue(test, testname) + + def test_basic(self): + """Ensure SAX2DOM can parse from a stream.""" + with io.StringIO(SMALL_SAMPLE) as fin: + sd = SAX2DOMTestHelper(fin, xml.sax.make_parser(), + len(SMALL_SAMPLE)) + for evt, node in sd: + if evt == pulldom.START_ELEMENT and node.tagName == "html": + break + # Because the buffer is the same length as the XML, all the + # nodes should have been parsed and added: + self.assertGreater(len(node.childNodes), 0) + + def testSAX2DOM(self): + """Ensure SAX2DOM expands nodes as expected.""" + sax2dom = pulldom.SAX2DOM() + sax2dom.startDocument() + sax2dom.startElement("doc", {}) + sax2dom.characters("text") + sax2dom.startElement("subelm", {}) + sax2dom.characters("text") + sax2dom.endElement("subelm") + sax2dom.characters("text") + sax2dom.endElement("doc") + sax2dom.endDocument() + + doc = sax2dom.document + root = doc.documentElement + (text1, elm1, text2) = root.childNodes + text3 = elm1.childNodes[0] + + self.assertIsNone(text1.previousSibling) + self.assertIs(text1.nextSibling, elm1) + self.assertIs(elm1.previousSibling, text1) + self.assertIs(elm1.nextSibling, text2) + self.assertIs(text2.previousSibling, elm1) + self.assertIsNone(text2.nextSibling) + self.assertIsNone(text3.previousSibling) + self.assertIsNone(text3.nextSibling) + + self.assertIs(root.parentNode, doc) + self.assertIs(text1.parentNode, root) + self.assertIs(elm1.parentNode, root) + self.assertIs(text2.parentNode, root) + self.assertIs(text3.parentNode, elm1) + doc.unlink() + + +def test_main(): + run_unittest(PullDOMTestCase, ThoroughTestCase, SAX2DOMTestCase) + + +if __name__ == "__main__": + test_main() diff --git a/Lib/test/test_pydoc.py b/Lib/test/test_pydoc.py index 1d575cd..fc4146c 100644 --- a/Lib/test/test_pydoc.py +++ b/Lib/test/test_pydoc.py @@ -248,6 +248,8 @@ class PydocDocTest(unittest.TestCase): @unittest.skipIf(sys.flags.optimize >= 2, "Docstrings are omitted with -O2 and above") + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'trace function introduces __locals__ unexpectedly') def test_html_doc(self): result, doc_loc = get_pydoc_html(pydoc_mod) mod_file = inspect.getabsfile(pydoc_mod) @@ -263,6 +265,8 @@ class PydocDocTest(unittest.TestCase): @unittest.skipIf(sys.flags.optimize >= 2, "Docstrings are omitted with -O2 and above") + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'trace function introduces __locals__ unexpectedly') def test_text_doc(self): result, doc_loc = get_pydoc_text(pydoc_mod) expected_text = expected_text_pattern % \ @@ -340,6 +344,8 @@ class PydocDocTest(unittest.TestCase): @unittest.skipIf(sys.flags.optimize >= 2, 'Docstrings are omitted with -O2 and above') + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'trace function introduces __locals__ unexpectedly') def test_help_output_redirect(self): # issue 940286, if output is set in Helper, then all output from # Helper.help should be redirected diff --git a/Lib/test/test_reprlib.py b/Lib/test/test_reprlib.py index b0dc4d7..e476941 100644 --- a/Lib/test/test_reprlib.py +++ b/Lib/test/test_reprlib.py @@ -234,7 +234,7 @@ class LongReprTest(unittest.TestCase): touch(os.path.join(self.subpkgname, self.pkgname + '.py')) from areallylongpackageandmodulenametotestreprtruncation.areallylongpackageandmodulenametotestreprtruncation import areallylongpackageandmodulenametotestreprtruncation eq(repr(areallylongpackageandmodulenametotestreprtruncation), - "<module '%s' from '%s'>" % (areallylongpackageandmodulenametotestreprtruncation.__name__, areallylongpackageandmodulenametotestreprtruncation.__file__)) + "<module %r from %r>" % (areallylongpackageandmodulenametotestreprtruncation.__name__, areallylongpackageandmodulenametotestreprtruncation.__file__)) eq(repr(sys), "<module 'sys' (built-in)>") def test_type(self): diff --git a/Lib/test/test_richcmp.py b/Lib/test/test_richcmp.py index f8f3717..0b629dc 100644 --- a/Lib/test/test_richcmp.py +++ b/Lib/test/test_richcmp.py @@ -220,6 +220,7 @@ class MiscTest(unittest.TestCase): for func in (do, operator.not_): self.assertRaises(Exc, func, Bad()) + @support.no_tracing def test_recursion(self): # Check that comparison for recursive objects fails gracefully from collections import UserList diff --git a/Lib/test/test_runpy.py b/Lib/test/test_runpy.py index ad3ab39..00f34b1 100644 --- a/Lib/test/test_runpy.py +++ b/Lib/test/test_runpy.py @@ -6,7 +6,8 @@ import sys import re import tempfile import py_compile -from test.support import forget, make_legacy_pyc, run_unittest, unload, verbose +from test.support import ( + forget, make_legacy_pyc, run_unittest, unload, verbose, no_tracing) from test.script_helper import ( make_pkg, make_script, make_zip_pkg, make_zip_script, temp_dir) @@ -395,6 +396,7 @@ argv0 = sys.argv[0] msg = "can't find '__main__' module in %r" % zip_name self._check_import_error(zip_name, msg) + @no_tracing def test_main_recursion_error(self): with temp_dir() as script_dir, temp_dir() as dummy_dir: mod_name = '__main__' diff --git a/Lib/test/test_sax.py b/Lib/test/test_sax.py index 0f6a1ca..bddb375 100644 --- a/Lib/test/test_sax.py +++ b/Lib/test/test_sax.py @@ -20,8 +20,8 @@ import unittest TEST_XMLFILE = findfile("test.xml", subdir="xmltestdata") TEST_XMLFILE_OUT = findfile("test.xml.out", subdir="xmltestdata") try: - TEST_XMLFILE.encode("utf8") - TEST_XMLFILE_OUT.encode("utf8") + TEST_XMLFILE.encode("utf-8") + TEST_XMLFILE_OUT.encode("utf-8") except UnicodeEncodeError: raise unittest.SkipTest("filename is not encodable to utf8") diff --git a/Lib/test/test_scope.py b/Lib/test/test_scope.py index fbc87aa..129a18a 100644 --- a/Lib/test/test_scope.py +++ b/Lib/test/test_scope.py @@ -1,5 +1,5 @@ import unittest -from test.support import check_syntax_error, run_unittest +from test.support import check_syntax_error, cpython_only, run_unittest class ScopeTests(unittest.TestCase): @@ -496,23 +496,22 @@ class ScopeTests(unittest.TestCase): self.assertNotIn("x", varnames) self.assertIn("y", varnames) + @cpython_only def testLocalsClass_WithTrace(self): # Issue23728: after the trace function returns, the locals() # dictionary is used to update all variables, this used to # include free variables. But in class statements, free # variables are not inserted... import sys + self.addCleanup(sys.settrace, sys.gettrace()) sys.settrace(lambda a,b,c:None) - try: - x = 12 + x = 12 - class C: - def f(self): - return x + class C: + def f(self): + return x - self.assertEqual(x, 12) # Used to raise UnboundLocalError - finally: - sys.settrace(None) + self.assertEqual(x, 12) # Used to raise UnboundLocalError def testBoundAndFree(self): # var is bound and free in class @@ -527,6 +526,7 @@ class ScopeTests(unittest.TestCase): inst = f(3)() self.assertEqual(inst.a, inst.m()) + @cpython_only def testInteractionWithTraceFunc(self): import sys @@ -543,6 +543,7 @@ class ScopeTests(unittest.TestCase): class TestClass: pass + self.addCleanup(sys.settrace, sys.gettrace()) sys.settrace(tracer) adaptgetter("foo", TestClass, (1, "")) sys.settrace(None) diff --git a/Lib/test/test_shelve.py b/Lib/test/test_shelve.py index 3e73f52..13c1265 100644 --- a/Lib/test/test_shelve.py +++ b/Lib/test/test_shelve.py @@ -2,7 +2,7 @@ import unittest import shelve import glob from test import support -from collections import MutableMapping +from collections.abc import MutableMapping from test.test_dbm import dbm_iterator def L1(s): @@ -129,8 +129,8 @@ class TestCase(unittest.TestCase): shelve.Shelf(d)[key] = [1] self.assertIn(key.encode('utf-8'), d) # but a different one can be given - shelve.Shelf(d, keyencoding='latin1')[key] = [1] - self.assertIn(key.encode('latin1'), d) + shelve.Shelf(d, keyencoding='latin-1')[key] = [1] + self.assertIn(key.encode('latin-1'), d) # with all consequences s = shelve.Shelf(d, keyencoding='ascii') self.assertRaises(UnicodeEncodeError, s.__setitem__, key, [1]) diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 9ba391e..d761a73 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -44,7 +44,7 @@ def linux_version(): return 0, 0, 0 HOST = support.HOST -MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf8') ## test unicode string and carriage return +MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') ## test unicode string and carriage return SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6) try: @@ -325,6 +325,26 @@ class GeneralModuleTests(unittest.TestCase): if not fqhn in all_host_names: self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) + @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()") + @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()") + def test_sethostname(self): + oldhn = socket.gethostname() + try: + socket.sethostname('new') + except socket.error as e: + if e.errno == errno.EPERM: + self.skipTest("test should be run as root") + else: + raise + try: + # running test as root! + self.assertEqual(socket.gethostname(), 'new') + # Should work with bytes objects too + socket.sethostname(b'bar') + self.assertEqual(socket.gethostname(), 'bar') + finally: + socket.sethostname(oldhn) + def testRefCountGetNameInfo(self): # Testing reference count for getnameinfo if hasattr(sys, "getrefcount"): @@ -1065,7 +1085,7 @@ class FileObjectClassTestCase(SocketConnectedTest): """ bufsize = -1 # Use default buffer size - encoding = 'utf8' + encoding = 'utf-8' errors = 'strict' newline = None @@ -1286,7 +1306,7 @@ class FileObjectInterruptedTestCase(unittest.TestCase): data = b'' else: data = '' - expecting = expecting.decode('utf8') + expecting = expecting.decode('utf-8') while len(data) != len(expecting): part = fo.read(size) if not part: @@ -1444,7 +1464,7 @@ class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase): """Tests for socket.makefile() in text mode (rather than binary)""" read_mode = 'r' - read_msg = MSG.decode('utf8') + read_msg = MSG.decode('utf-8') write_mode = 'wb' write_msg = MSG newline = '' @@ -1456,7 +1476,7 @@ class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase): read_mode = 'rb' read_msg = MSG write_mode = 'w' - write_msg = MSG.decode('utf8') + write_msg = MSG.decode('utf-8') newline = '' @@ -1464,9 +1484,9 @@ class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase): """Tests for socket.makefile() in text mode (rather than binary)""" read_mode = 'r' - read_msg = MSG.decode('utf8') + read_msg = MSG.decode('utf-8') write_mode = 'w' - write_msg = MSG.decode('utf8') + write_msg = MSG.decode('utf-8') newline = '' diff --git a/Lib/test/test_strlit.py b/Lib/test/test_strlit.py index 23d96f8..2bcf4d1 100644 --- a/Lib/test/test_strlit.py +++ b/Lib/test/test_strlit.py @@ -130,7 +130,7 @@ class TestLiterals(unittest.TestCase): self.assertRaises(SyntaxError, self.check_encoding, "utf-8", extra) def test_file_utf8(self): - self.check_encoding("utf8") + self.check_encoding("utf-8") def test_file_iso_8859_1(self): self.check_encoding("iso-8859-1") diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 4b58308..d91a4a6 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -1059,6 +1059,11 @@ class POSIXProcessTestCase(BaseTestCase): exitcode = subprocess.call([abs_program, "-c", "pass"]) self.assertEqual(exitcode, 0) + # absolute bytes path as a string + cmd = b"'" + abs_program + b"' -c pass" + exitcode = subprocess.call(cmd, shell=True) + self.assertEqual(exitcode, 0) + # bytes program, unicode PATH env = os.environ.copy() env["PATH"] = path @@ -1191,7 +1196,7 @@ class POSIXProcessTestCase(BaseTestCase): stdout, stderr = p.communicate() self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" " non-zero with this error:\n%s" % - stderr.decode('utf8')) + stderr.decode('utf-8')) @unittest.skipUnless(mswindows, "Windows specific tests") diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index d37651e..e2137ab 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -303,6 +303,7 @@ class SysModuleTest(unittest.TestCase): self.assertEqual(sys.getdlopenflags(), oldflags+1) sys.setdlopenflags(oldflags) + @test.support.refcount_test def test_refcount(self): # n here must be a global in order for this test to pass while # tracing with a python function. Tracing calls PyFrame_FastToLocals diff --git a/Lib/test/test_sys_settrace.py b/Lib/test/test_sys_settrace.py index 43134e9..acb8e29 100644 --- a/Lib/test/test_sys_settrace.py +++ b/Lib/test/test_sys_settrace.py @@ -251,6 +251,7 @@ class TraceTestCase(unittest.TestCase): def setUp(self): self.using_gc = gc.isenabled() gc.disable() + self.addCleanup(sys.settrace, sys.gettrace()) def tearDown(self): if self.using_gc: @@ -389,6 +390,9 @@ class TraceTestCase(unittest.TestCase): class RaisingTraceFuncTestCase(unittest.TestCase): + def setUp(self): + self.addCleanup(sys.settrace, sys.gettrace()) + def trace(self, frame, event, arg): """A trace function that raises an exception in response to a specific trace event.""" @@ -688,6 +692,10 @@ def no_jump_without_trace_function(): class JumpTestCase(unittest.TestCase): + def setUp(self): + self.addCleanup(sys.settrace, sys.gettrace()) + sys.settrace(None) + def compare_jump_output(self, expected, received): if received != expected: self.fail( "Outputs don't match:\n" + @@ -739,6 +747,8 @@ class JumpTestCase(unittest.TestCase): def test_18_no_jump_to_non_integers(self): self.run_test(no_jump_to_non_integers) def test_19_no_jump_without_trace_function(self): + # Must set sys.settrace(None) in setUp(), else condition is not + # triggered. no_jump_without_trace_function() def test_20_large_function(self): diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index 68e094d..a645bf2 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -1289,7 +1289,7 @@ class UstarUnicodeTest(unittest.TestCase): self._test_unicode_filename("utf7") def test_utf8_filename(self): - self._test_unicode_filename("utf8") + self._test_unicode_filename("utf-8") def _test_unicode_filename(self, encoding): tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") @@ -1368,7 +1368,7 @@ class GNUUnicodeTest(UstarUnicodeTest): def test_bad_pax_header(self): # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields # without a hdrcharset=BINARY header. - for encoding, name in (("utf8", "pax/bad-pax-\udce4\udcf6\udcfc"), + for encoding, name in (("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: try: @@ -1383,7 +1383,7 @@ class PAXUnicodeTest(UstarUnicodeTest): def test_binary_header(self): # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. - for encoding, name in (("utf8", "pax/hdrcharset-\udce4\udcf6\udcfc"), + for encoding, name in (("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: try: diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index ecbbdbf..029218d 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -427,6 +427,14 @@ class ThreadTests(BaseTestCase): t.daemon = True self.assertTrue('daemon' in repr(t)) + def test_deamon_param(self): + t = threading.Thread() + self.assertFalse(t.daemon) + t = threading.Thread(daemon=False) + self.assertFalse(t.daemon) + t = threading.Thread(daemon=True) + self.assertTrue(t.daemon) + class ThreadJoinOnShutdown(BaseTestCase): @@ -677,6 +685,10 @@ class ThreadingExceptionTests(BaseTestCase): thread.start() self.assertRaises(RuntimeError, setattr, thread, "daemon", True) + def test_releasing_unacquired_lock(self): + lock = threading.Lock() + self.assertRaises(RuntimeError, lock.release) + class LockTests(lock_tests.LockTests): locktype = staticmethod(threading.Lock) diff --git a/Lib/test/test_trace.py b/Lib/test/test_trace.py index 93df84f..d2d15f2 100644 --- a/Lib/test/test_trace.py +++ b/Lib/test/test_trace.py @@ -102,6 +102,7 @@ class TracedClass(object): class TestLineCounts(unittest.TestCase): """White-box testing of line-counting, via runfunc""" def setUp(self): + self.addCleanup(sys.settrace, sys.gettrace()) self.tracer = Trace(count=1, trace=0, countfuncs=0, countcallers=0) self.my_py_filename = fix_ext_py(__file__) @@ -192,6 +193,7 @@ class TestRunExecCounts(unittest.TestCase): """A simple sanity test of line-counting, via runctx (exec)""" def setUp(self): self.my_py_filename = fix_ext_py(__file__) + self.addCleanup(sys.settrace, sys.gettrace()) def test_exec_counts(self): self.tracer = Trace(count=1, trace=0, countfuncs=0, countcallers=0) @@ -218,6 +220,7 @@ class TestRunExecCounts(unittest.TestCase): class TestFuncs(unittest.TestCase): """White-box testing of funcs tracing""" def setUp(self): + self.addCleanup(sys.settrace, sys.gettrace()) self.tracer = Trace(count=0, trace=0, countfuncs=1) self.filemod = my_file_and_modname() @@ -242,6 +245,8 @@ class TestFuncs(unittest.TestCase): } self.assertEqual(self.tracer.results().calledfuncs, expected) + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'pre-existing trace function throws off measurements') def test_inst_method_calling(self): obj = TracedClass(20) self.tracer.runfunc(obj.inst_method_calling, 1) @@ -257,9 +262,12 @@ class TestFuncs(unittest.TestCase): class TestCallers(unittest.TestCase): """White-box testing of callers tracing""" def setUp(self): + self.addCleanup(sys.settrace, sys.gettrace()) self.tracer = Trace(count=0, trace=0, countcallers=1) self.filemod = my_file_and_modname() + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'pre-existing trace function throws off measurements') def test_loop_caller_importing(self): self.tracer.runfunc(traced_func_importing_caller, 1) @@ -280,6 +288,9 @@ class TestCallers(unittest.TestCase): # Created separately for issue #3821 class TestCoverage(unittest.TestCase): + def setUp(self): + self.addCleanup(sys.settrace, sys.gettrace()) + def tearDown(self): rmtree(TESTFN) unlink(TESTFN) diff --git a/Lib/test/test_unicode.py b/Lib/test/test_unicode.py index 65b26c5..5fc30db 100644 --- a/Lib/test/test_unicode.py +++ b/Lib/test/test_unicode.py @@ -1182,11 +1182,14 @@ class UnicodeTest(string_tests.CommonTest, self.assertEqual('hello'.encode('ascii'), b'hello') self.assertEqual('hello'.encode('utf-7'), b'hello') self.assertEqual('hello'.encode('utf-8'), b'hello') - self.assertEqual('hello'.encode('utf8'), b'hello') + self.assertEqual('hello'.encode('utf-8'), b'hello') self.assertEqual('hello'.encode('utf-16-le'), b'h\000e\000l\000l\000o\000') self.assertEqual('hello'.encode('utf-16-be'), b'\000h\000e\000l\000l\000o') self.assertEqual('hello'.encode('latin-1'), b'hello') + # Default encoding is utf-8 + self.assertEqual('\u2603'.encode(), b'\xe2\x98\x83') + # Roundtrip safety for BMP (just the first 1024 chars) for c in range(1024): u = chr(c) @@ -1427,7 +1430,9 @@ class UnicodeTest(string_tests.CommonTest, # Test PyUnicode_FromFormat() def test_from_format(self): support.import_module('ctypes') - from ctypes import pythonapi, py_object, c_int + from ctypes import (pythonapi, py_object, + c_int, c_long, c_longlong, c_ssize_t, + c_uint, c_ulong, c_ulonglong, c_size_t) if sys.maxunicode == 65535: name = "PyUnicodeUCS2_FromFormat" else: @@ -1452,13 +1457,40 @@ class UnicodeTest(string_tests.CommonTest, 'string, got a non-ASCII byte: 0xe9$', PyUnicode_FromFormat, b'unicode\xe9=%s', 'ascii') + # test "%c" self.assertEqual(PyUnicode_FromFormat(b'%c', c_int(0xabcd)), '\uabcd') self.assertEqual(PyUnicode_FromFormat(b'%c', c_int(0x10ffff)), '\U0010ffff') - # other tests + # test "%" + self.assertEqual(PyUnicode_FromFormat(b'%'), '%') + self.assertEqual(PyUnicode_FromFormat(b'%%'), '%') + self.assertEqual(PyUnicode_FromFormat(b'%%s'), '%s') + self.assertEqual(PyUnicode_FromFormat(b'[%%]'), '[%]') + self.assertEqual(PyUnicode_FromFormat(b'%%%s', b'abc'), '%abc') + + # test integer formats (%i, %d, %u) + self.assertEqual(PyUnicode_FromFormat(b'%03i', c_int(10)), '010') + self.assertEqual(PyUnicode_FromFormat(b'%0.4i', c_int(10)), '0010') + self.assertEqual(PyUnicode_FromFormat(b'%i', c_int(-123)), '-123') + self.assertEqual(PyUnicode_FromFormat(b'%li', c_long(-123)), '-123') + self.assertEqual(PyUnicode_FromFormat(b'%lli', c_longlong(-123)), '-123') + self.assertEqual(PyUnicode_FromFormat(b'%zi', c_ssize_t(-123)), '-123') + + self.assertEqual(PyUnicode_FromFormat(b'%d', c_int(-123)), '-123') + self.assertEqual(PyUnicode_FromFormat(b'%ld', c_long(-123)), '-123') + self.assertEqual(PyUnicode_FromFormat(b'%lld', c_longlong(-123)), '-123') + self.assertEqual(PyUnicode_FromFormat(b'%zd', c_ssize_t(-123)), '-123') + + self.assertEqual(PyUnicode_FromFormat(b'%u', c_uint(123)), '123') + self.assertEqual(PyUnicode_FromFormat(b'%lu', c_ulong(123)), '123') + self.assertEqual(PyUnicode_FromFormat(b'%llu', c_ulonglong(123)), '123') + self.assertEqual(PyUnicode_FromFormat(b'%zu', c_size_t(123)), '123') + + # test %A text = PyUnicode_FromFormat(b'%%A:%A', 'abc\xe9\uabcd\U0010ffff') self.assertEqual(text, r"%A:'abc\xe9\uabcd\U0010ffff'") + # test %V text = PyUnicode_FromFormat(b'repr=%V', 'abc', b'xyz') self.assertEqual(text, 'repr=abc') @@ -1472,6 +1504,13 @@ class UnicodeTest(string_tests.CommonTest, text = PyUnicode_FromFormat(b'repr=%V', None, b'abc\xff') self.assertEqual(text, 'repr=abc\ufffd') + # not supported: copy the raw format string. these tests are just here + # to check for crashs and should not be considered as specifications + self.assertEqual(PyUnicode_FromFormat(b'%1%s', b'abc'), '%s') + self.assertEqual(PyUnicode_FromFormat(b'%1abc'), '%1abc') + self.assertEqual(PyUnicode_FromFormat(b'%+i', c_int(10)), '%+i') + self.assertEqual(PyUnicode_FromFormat(b'%.%s', b'abc'), '%.%s') + # Test PyUnicode_AsWideChar() def test_aswidechar(self): from _testcapi import unicode_aswidechar diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py index 3003331..e148c62 100644 --- a/Lib/test/test_urllib.py +++ b/Lib/test/test_urllib.py @@ -251,7 +251,7 @@ class urlretrieve_FileTests(unittest.TestCase): def constructLocalFileUrl(self, filePath): filePath = os.path.abspath(filePath) try: - filePath.encode("utf8") + filePath.encode("utf-8") except UnicodeEncodeError: raise unittest.SkipTest("filePath is not encodable to utf8") return "file://%s" % urllib.request.pathname2url(filePath) diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py index 1433670..4ddbe3f 100644 --- a/Lib/test/test_urllib2.py +++ b/Lib/test/test_urllib2.py @@ -599,7 +599,7 @@ class OpenerDirectorTests(unittest.TestCase): def sanepathname2url(path): try: - path.encode("utf8") + path.encode("utf-8") except UnicodeEncodeError: raise unittest.SkipTest("path is not encodable to utf8") urlpath = urllib.request.pathname2url(path) diff --git a/Lib/test/test_uuid.py b/Lib/test/test_uuid.py index 43fa656..7bc59ed 100644 --- a/Lib/test/test_uuid.py +++ b/Lib/test/test_uuid.py @@ -471,14 +471,14 @@ class TestUUID(TestCase): if pid == 0: os.close(fds[0]) value = uuid.uuid4() - os.write(fds[1], value.hex.encode('latin1')) + os.write(fds[1], value.hex.encode('latin-1')) os._exit(0) else: os.close(fds[1]) parent_value = uuid.uuid4().hex os.waitpid(pid, 0) - child_value = os.read(fds[0], 100).decode('latin1') + child_value = os.read(fds[0], 100).decode('latin-1') self.assertNotEqual(parent_value, child_value) diff --git a/Lib/test/test_xml_etree.py b/Lib/test/test_xml_etree.py index 22fafa9..40c2291 100644 --- a/Lib/test/test_xml_etree.py +++ b/Lib/test/test_xml_etree.py @@ -22,7 +22,7 @@ from xml.etree import ElementTree as ET SIMPLE_XMLFILE = findfile("simple.xml", subdir="xmltestdata") try: - SIMPLE_XMLFILE.encode("utf8") + SIMPLE_XMLFILE.encode("utf-8") except UnicodeEncodeError: raise unittest.SkipTest("filename is not encodable to utf8") SIMPLE_NS_XMLFILE = findfile("simple-ns.xml", subdir="xmltestdata") @@ -1255,8 +1255,8 @@ def processinginstruction(): >>> ET.tostring(ET.PI('test', '<testing&>')) b'<?test <testing&>?>' - >>> ET.tostring(ET.PI('test', '<testing&>\xe3'), 'latin1') - b"<?xml version='1.0' encoding='latin1'?>\\n<?test <testing&>\\xe3?>" + >>> ET.tostring(ET.PI('test', '<testing&>\xe3'), 'latin-1') + b"<?xml version='1.0' encoding='latin-1'?>\\n<?test <testing&>\\xe3?>" """ # diff --git a/Lib/test/test_xmlrpc_net.py b/Lib/test/test_xmlrpc_net.py index 5df79f0..b9853ed 100644 --- a/Lib/test/test_xmlrpc_net.py +++ b/Lib/test/test_xmlrpc_net.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -import collections +import collections.abc import errno import socket import sys @@ -48,7 +48,7 @@ class CurrentTimeTest(unittest.TestCase): # Perform a minimal sanity check on the result, just to be sure # the request means what we think it means. - self.assertIsInstance(builders, collections.Sequence) + self.assertIsInstance(builders, collections.abc.Sequence) self.assertTrue([x for x in builders if "3.x" in x], builders) diff --git a/Lib/test/test_zipimport_support.py b/Lib/test/test_zipimport_support.py index 610dfa0..e3dd3e9 100644 --- a/Lib/test/test_zipimport_support.py +++ b/Lib/test/test_zipimport_support.py @@ -163,20 +163,24 @@ class ZipSupportTests(unittest.TestCase): test_zipped_doctest.test_DocTestRunner.verbose_flag, test_zipped_doctest.test_Example, test_zipped_doctest.test_debug, - test_zipped_doctest.test_pdb_set_trace, - test_zipped_doctest.test_pdb_set_trace_nested, test_zipped_doctest.test_testsource, test_zipped_doctest.test_trailing_space_in_test, test_zipped_doctest.test_DocTestSuite, test_zipped_doctest.test_DocTestFinder, ] - # These remaining tests are the ones which need access + # These tests are the ones which need access # to the data files, so we don't run them fail_due_to_missing_data_files = [ test_zipped_doctest.test_DocFileSuite, test_zipped_doctest.test_testfile, test_zipped_doctest.test_unittest_reportflags, ] + # These tests are skipped when a trace funciton is set + can_fail_due_to_tracing = [ + test_zipped_doctest.test_pdb_set_trace, + test_zipped_doctest.test_pdb_set_trace_nested, + ] + for obj in known_good_tests: _run_object_doctest(obj, test_zipped_doctest) finally: diff --git a/Lib/threading.py b/Lib/threading.py index fd93f74..cb09afa 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -622,7 +622,7 @@ class Thread(_Verbose): #XXX __exc_clear = _sys.exc_clear def __init__(self, group=None, target=None, name=None, - args=(), kwargs=None, verbose=None): + args=(), kwargs=None, verbose=None, *, daemon=None): assert group is None, "group argument must be None for now" _Verbose.__init__(self, verbose) if kwargs is None: @@ -631,7 +631,10 @@ class Thread(_Verbose): self._name = str(name or _newname()) self._args = args self._kwargs = kwargs - self._daemonic = self._set_daemon() + if daemon is not None: + self._daemonic = daemon + else: + self._daemonic = current_thread().daemon self._ident = None self._started = Event() self._stopped = False @@ -648,10 +651,6 @@ class Thread(_Verbose): self._block.__init__() self._started._reset_internal_locks() - def _set_daemon(self): - # Overridden in _MainThread and _DummyThread - return current_thread().daemon - def __repr__(self): assert self._initialized, "Thread.__init__() was not called" status = "initial" @@ -948,15 +947,12 @@ class _Timer(Thread): class _MainThread(Thread): def __init__(self): - Thread.__init__(self, name="MainThread") + Thread.__init__(self, name="MainThread", daemon=False) self._started.set() self._set_ident() with _active_limbo_lock: _active[self._ident] = self - def _set_daemon(self): - return False - def _exitfunc(self): self._stop() t = _pickSomeNonDaemonThread() @@ -988,7 +984,7 @@ def _pickSomeNonDaemonThread(): class _DummyThread(Thread): def __init__(self): - Thread.__init__(self, name=_newname("Dummy-%d")) + Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True) # Thread._block consumes an OS-level locking primitive, which # can never be used by a _DummyThread. Since a _DummyThread @@ -1000,9 +996,6 @@ class _DummyThread(Thread): with _active_limbo_lock: _active[self._ident] = self - def _set_daemon(self): - return True - def join(self, timeout=None): assert False, "cannot join a dummy thread" diff --git a/Lib/unittest/case.py b/Lib/unittest/case.py index 65af16b..270e5e8 100644 --- a/Lib/unittest/case.py +++ b/Lib/unittest/case.py @@ -938,77 +938,6 @@ class TestCase(object): standardMsg = self._truncateMessage(standardMsg, diff) self.fail(self._formatMessage(msg, standardMsg)) - def assertDictContainsSubset(self, subset, dictionary, msg=None): - """Checks whether dictionary is a superset of subset.""" - warnings.warn('assertDictContainsSubset is deprecated', - DeprecationWarning) - missing = [] - mismatched = [] - for key, value in subset.items(): - if key not in dictionary: - missing.append(key) - elif value != dictionary[key]: - mismatched.append('%s, expected: %s, actual: %s' % - (safe_repr(key), safe_repr(value), - safe_repr(dictionary[key]))) - - if not (missing or mismatched): - return - - standardMsg = '' - if missing: - standardMsg = 'Missing: %s' % ','.join(safe_repr(m) for m in - missing) - if mismatched: - if standardMsg: - standardMsg += '; ' - standardMsg += 'Mismatched values: %s' % ','.join(mismatched) - - self.fail(self._formatMessage(msg, standardMsg)) - - def assertSameElements(self, expected_seq, actual_seq, msg=None): - """An unordered sequence specific comparison. - - Raises with an error message listing which elements of expected_seq - are missing from actual_seq and vice versa if any. - - Duplicate elements are ignored when comparing *expected_seq* and - *actual_seq*. It is the equivalent of ``assertEqual(set(expected), - set(actual))`` but it works with sequences of unhashable objects as - well. - """ - warnings.warn('assertSameElements is deprecated', - DeprecationWarning) - try: - expected = set(expected_seq) - actual = set(actual_seq) - missing = sorted(expected.difference(actual)) - unexpected = sorted(actual.difference(expected)) - except TypeError: - # Fall back to slower list-compare if any of the objects are - # not hashable. - expected = list(expected_seq) - actual = list(actual_seq) - try: - expected.sort() - actual.sort() - except TypeError: - missing, unexpected = unorderable_list_difference(expected, - actual) - else: - missing, unexpected = sorted_list_difference(expected, actual) - errors = [] - if missing: - errors.append('Expected, but missing:\n %s' % - safe_repr(missing)) - if unexpected: - errors.append('Unexpected, but present:\n %s' % - safe_repr(unexpected)) - if errors: - standardMsg = '\n'.join(errors) - self.fail(self._formatMessage(msg, standardMsg)) - - def assertCountEqual(self, first, second, msg=None): """An unordered sequence comparison asserting that the same elements, regardless of order. If the same element occurs more than once, @@ -1183,13 +1112,11 @@ class TestCase(object): # The fail* methods can be removed in 3.3, the 5 assert* methods will # have to stay around for a few more versions. See #9424. - failUnlessEqual = assertEquals = _deprecate(assertEqual) - failIfEqual = assertNotEquals = _deprecate(assertNotEqual) - failUnlessAlmostEqual = assertAlmostEquals = _deprecate(assertAlmostEqual) - failIfAlmostEqual = assertNotAlmostEquals = _deprecate(assertNotAlmostEqual) - failUnless = assert_ = _deprecate(assertTrue) - failUnlessRaises = _deprecate(assertRaises) - failIf = _deprecate(assertFalse) + assertEquals = _deprecate(assertEqual) + assertNotEquals = _deprecate(assertNotEqual) + assertAlmostEquals = _deprecate(assertAlmostEqual) + assertNotAlmostEquals = _deprecate(assertNotAlmostEqual) + assert_ = _deprecate(assertTrue) assertRaisesRegexp = _deprecate(assertRaisesRegex) assertRegexpMatches = _deprecate(assertRegex) diff --git a/Lib/unittest/test/_test_warnings.py b/Lib/unittest/test/_test_warnings.py index d0be18d..bfabb02 100644 --- a/Lib/unittest/test/_test_warnings.py +++ b/Lib/unittest/test/_test_warnings.py @@ -19,17 +19,12 @@ def warnfun(): warnings.warn('rw', RuntimeWarning) class TestWarnings(unittest.TestCase): - # unittest warnings will be printed at most once per type (max one message - # for the fail* methods, and one for the assert* methods) + # unittest warnings will be printed at most once per type def test_assert(self): self.assertEquals(2+2, 4) self.assertEquals(2*2, 4) self.assertEquals(2**2, 4) - def test_fail(self): - self.failUnless(1) - self.failUnless(True) - def test_other_unittest(self): self.assertAlmostEqual(2+2, 4) self.assertNotAlmostEqual(4+4, 2) diff --git a/Lib/unittest/test/test_assertions.py b/Lib/unittest/test/test_assertions.py index a1d20eb..4a646c3 100644 --- a/Lib/unittest/test/test_assertions.py +++ b/Lib/unittest/test/test_assertions.py @@ -223,15 +223,6 @@ class TestLongMessage(unittest.TestCase): "\+ \{'key': 'value'\}$", "\+ \{'key': 'value'\} : oops$"]) - def testAssertDictContainsSubset(self): - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - - self.assertMessages('assertDictContainsSubset', ({'key': 'value'}, {}), - ["^Missing: 'key'$", "^oops$", - "^Missing: 'key'$", - "^Missing: 'key' : oops$"]) - def testAssertMultiLineEqual(self): self.assertMessages('assertMultiLineEqual', ("", "foo"), [r"\+ foo$", "^oops$", diff --git a/Lib/unittest/test/test_case.py b/Lib/unittest/test/test_case.py index 314aef3..351b6f8 100644 --- a/Lib/unittest/test/test_case.py +++ b/Lib/unittest/test/test_case.py @@ -488,36 +488,6 @@ class Test_TestCase(unittest.TestCase, TestEquality, TestHashing): self.assertRaises(self.failureException, self.assertNotIn, 'cow', animals) - def testAssertDictContainsSubset(self): - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - - self.assertDictContainsSubset({}, {}) - self.assertDictContainsSubset({}, {'a': 1}) - self.assertDictContainsSubset({'a': 1}, {'a': 1}) - self.assertDictContainsSubset({'a': 1}, {'a': 1, 'b': 2}) - self.assertDictContainsSubset({'a': 1, 'b': 2}, {'a': 1, 'b': 2}) - - with self.assertRaises(self.failureException): - self.assertDictContainsSubset({1: "one"}, {}) - - with self.assertRaises(self.failureException): - self.assertDictContainsSubset({'a': 2}, {'a': 1}) - - with self.assertRaises(self.failureException): - self.assertDictContainsSubset({'c': 1}, {'a': 1}) - - with self.assertRaises(self.failureException): - self.assertDictContainsSubset({'a': 1, 'c': 1}, {'a': 1}) - - with self.assertRaises(self.failureException): - self.assertDictContainsSubset({'a': 1, 'c': 1}, {'a': 1}) - - one = ''.join(chr(i) for i in range(255)) - # this used to cause a UnicodeDecodeError constructing the failure msg - with self.assertRaises(self.failureException): - self.assertDictContainsSubset({'foo': one}, {'foo': '\uFFFD'}) - def testAssertEqual(self): equal_pairs = [ ((), ()), @@ -1094,20 +1064,11 @@ test case have to stay around for a few more versions. See #9424. """ old = ( - (self.failIfEqual, (3, 5)), (self.assertNotEquals, (3, 5)), - (self.failUnlessEqual, (3, 3)), (self.assertEquals, (3, 3)), - (self.failUnlessAlmostEqual, (2.0, 2.0)), (self.assertAlmostEquals, (2.0, 2.0)), - (self.failIfAlmostEqual, (3.0, 5.0)), (self.assertNotAlmostEquals, (3.0, 5.0)), - (self.failUnless, (True,)), (self.assert_, (True,)), - (self.failUnlessRaises, (TypeError, lambda _: 3.14 + 'spam')), - (self.failIf, (False,)), - (self.assertSameElements, ([1, 1, 2, 3], [1, 2, 3])), - (self.assertDictContainsSubset, (dict(a=1, b=2), dict(a=1, b=2, c=3))), (self.assertRaisesRegexp, (KeyError, 'foo', lambda: {}['foo'])), (self.assertRegexpMatches, ('bar', 'bar')), ) @@ -1115,19 +1076,6 @@ test case with self.assertWarns(DeprecationWarning): meth(*args) - def testDeprecatedFailMethods(self): - """Test that the deprecated fail* methods get removed in 3.3""" - if sys.version_info[:2] < (3, 3): - return - deprecated_names = [ - 'failIfEqual', 'failUnlessEqual', 'failUnlessAlmostEqual', - 'failIfAlmostEqual', 'failUnless', 'failUnlessRaises', 'failIf', - 'assertSameElements', 'assertDictContainsSubset', - ] - for deprecated_name in deprecated_names: - with self.assertRaises(AttributeError): - getattr(self, deprecated_name) # remove these in 3.3 - def testDeepcopy(self): # Issue: 5660 class TestableTest(unittest.TestCase): diff --git a/Lib/unittest/test/test_runner.py b/Lib/unittest/test/test_runner.py index 8e95410..1bd5183 100644 --- a/Lib/unittest/test/test_runner.py +++ b/Lib/unittest/test/test_runner.py @@ -257,19 +257,17 @@ class Test_TextTestRunner(unittest.TestCase): return [b.splitlines() for b in p.communicate()] opts = dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(__file__)) - ae_msg = b'Please use assertEqual instead.' - at_msg = b'Please use assertTrue instead.' # no args -> all the warnings are printed, unittest warnings only once p = subprocess.Popen([sys.executable, '_test_warnings.py'], **opts) out, err = get_parse_out_err(p) self.assertIn(b'OK', err) # check that the total number of warnings in the output is correct - self.assertEqual(len(out), 12) + self.assertEqual(len(out), 11) # check that the numbers of the different kind of warnings is correct for msg in [b'dw', b'iw', b'uw']: self.assertEqual(out.count(msg), 3) - for msg in [ae_msg, at_msg, b'rw']: + for msg in [b'rw']: self.assertEqual(out.count(msg), 1) args_list = ( @@ -294,11 +292,9 @@ class Test_TextTestRunner(unittest.TestCase): **opts) out, err = get_parse_out_err(p) self.assertIn(b'OK', err) - self.assertEqual(len(out), 14) + self.assertEqual(len(out), 13) for msg in [b'dw', b'iw', b'uw', b'rw']: self.assertEqual(out.count(msg), 3) - for msg in [ae_msg, at_msg]: - self.assertEqual(out.count(msg), 1) def testStdErrLookedUpAtInstantiationTime(self): # see issue 10786 diff --git a/Lib/urllib/request.py b/Lib/urllib/request.py index dfdbdec..d583a82 100644 --- a/Lib/urllib/request.py +++ b/Lib/urllib/request.py @@ -1846,7 +1846,7 @@ class URLopener: if encoding == 'base64': import base64 # XXX is this encoding/decoding ok? - data = base64.decodebytes(data.encode('ascii')).decode('latin1') + data = base64.decodebytes(data.encode('ascii')).decode('latin-1') else: data = unquote(data) msg.append('Content-Length: %d' % len(data)) |